Python multi-process import CSV to database
This article shares with you the idea and method of using python to implement multi-process import of CSV file data into MySQL and the specific code sharing. Friends who have the same needs can refer to the following
Helped colleagues to deal with it some time ago A requirement to import CSV data into MySQL. Two large CSV files, 3GB with 21 million records and 7GB with 35 million records respectively. For data of this magnitude, simple single-process/single-thread import would take a long time, and finally a multi-process approach was used to implement it. I won’t go into details about the specific process, but record a few key points:
Insert in batches instead of inserting one by one
In order to speed up the insertion speed, do not create # first ##Index
- Producers and consumers
Model, the main process reads the file, and multiple worker processes perform insertion
- Pay attention to controlling the number of workers to avoid putting too much pressure on MySQL
- Pay attention to exceptions caused by processing dirty data ##Original The data is GBK encoded, so pay attention to converting it to UTF-8
- Use click to encapsulate the command line tool
#!/usr/bin/env python # -*- coding: utf-8 -*- import codecs import csv import logging import multiprocessing import os import warnings import click import MySQLdb import sqlalchemy warnings.filterwarnings('ignore', category=MySQLdb.Warning) # 批量插入的记录数量 BATCH = 5000 DB_URI = 'mysql://root@localhost:3306/example?charset=utf8' engine = sqlalchemy.create_engine(DB_URI) def get_table_cols(table): sql = 'SELECT * FROM `{table}` LIMIT 0'.format(table=table) res = engine.execute(sql) return res.keys() def insert_many(table, cols, rows, cursor): sql = 'INSERT INTO `{table}` ({cols}) VALUES ({marks})'.format( table=table, cols=', '.join(cols), marks=', '.join(['%s'] * len(cols))) cursor.execute(sql, *rows) logging.info('process %s inserted %s rows into table %s', os.getpid(), len(rows), table) def insert_worker(table, cols, queue): rows = [] # 每个子进程创建自己的 engine 对象 cursor = sqlalchemy.create_engine(DB_URI) while True: row = queue.get() if row is None: if rows: insert_many(table, cols, rows, cursor) break rows.append(row) if len(rows) == BATCH: insert_many(table, cols, rows, cursor) rows = [] def insert_parallel(table, reader, w=10): cols = get_table_cols(table) # 数据队列,主进程读文件并往里写数据,worker 进程从队列读数据 # 注意一下控制队列的大小,避免消费太慢导致堆积太多数据,占用过多内存 queue = multiprocessing.Queue(maxsize=w*BATCH*2) workers = [] for i in range(w): p = multiprocessing.Process(target=insert_worker, args=(table, cols, queue)) p.start() workers.append(p) logging.info('starting # %s worker process, pid: %s...', i + 1, p.pid) dirty_data_file = './{}_dirty_rows.csv'.format(table) xf = open(dirty_data_file, 'w') writer = csv.writer(xf, delimiter=reader.dialect.delimiter) for line in reader: # 记录并跳过脏数据: 键值数量不一致 if len(line) != len(cols): writer.writerow(line) continue # 把 None 值替换为 'NULL' clean_line = [None if x == 'NULL' else x for x in line] # 往队列里写数据 queue.put(tuple(clean_line)) if reader.line_num % 500000 == 0: logging.info('put %s tasks into queue.', reader.line_num) xf.close() # 给每个 worker 发送任务结束的信号 logging.info('send close signal to worker processes') for i in range(w): queue.put(None) for p in workers: p.join() def convert_file_to_utf8(f, rv_file=None): if not rv_file: name, ext = os.path.splitext(f) if isinstance(name, unicode): name = name.encode('utf8') rv_file = '{}_utf8{}'.format(name, ext) logging.info('start to process file %s', f) with open(f) as infd: with open(rv_file, 'w') as outfd: lines = [] loop = 0 chunck = 200000 first_line = infd.readline().strip(codecs.BOM_UTF8).strip() + '\n' lines.append(first_line) for line in infd: clean_line = line.decode('gb18030').encode('utf8') clean_line = clean_line.rstrip() + '\n' lines.append(clean_line) if len(lines) == chunck: outfd.writelines(lines) lines = [] loop += 1 logging.info('processed %s lines.', loop * chunck) outfd.writelines(lines) logging.info('processed %s lines.', loop * chunck + len(lines)) @click.group() def cli(): logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s') @cli.command('gbk_to_utf8') @click.argument('f') def convert_gbk_to_utf8(f): convert_file_to_utf8(f) @cli.command('load') @click.option('-t', '--table', required=True, help='表名') @click.option('-i', '--filename', required=True, help='输入文件') @click.option('-w', '--workers', default=10, help='worker 数量,默认 10') def load_fac_day_pro_nos_sal_table(table, filename, workers): with open(filename) as fd: fd.readline() # skip header reader = csv.reader(fd) insert_parallel(table, reader, w=workers) if name == 'main': cli()
[Related recommendations]
1.
Python Free Video TutorialPython Learning ManualGeek Academy Python Video TutorialThe above is the detailed content of Python multi-process import CSV to database. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics



PHP is suitable for web development and rapid prototyping, and Python is suitable for data science and machine learning. 1.PHP is used for dynamic web development, with simple syntax and suitable for rapid development. 2. Python has concise syntax, is suitable for multiple fields, and has a strong library ecosystem.

The main role of MySQL in web applications is to store and manage data. 1.MySQL efficiently processes user information, product catalogs, transaction records and other data. 2. Through SQL query, developers can extract information from the database to generate dynamic content. 3.MySQL works based on the client-server model to ensure acceptable query speed.

PHP is mainly procedural programming, but also supports object-oriented programming (OOP); Python supports a variety of paradigms, including OOP, functional and procedural programming. PHP is suitable for web development, and Python is suitable for a variety of applications such as data analysis and machine learning.

Golang is more suitable for high concurrency tasks, while Python has more advantages in flexibility. 1.Golang efficiently handles concurrency through goroutine and channel. 2. Python relies on threading and asyncio, which is affected by GIL, but provides multiple concurrency methods. The choice should be based on specific needs.

Python is more suitable for beginners, with a smooth learning curve and concise syntax; JavaScript is suitable for front-end development, with a steep learning curve and flexible syntax. 1. Python syntax is intuitive and suitable for data science and back-end development. 2. JavaScript is flexible and widely used in front-end and server-side programming.

Python excels in automation, scripting, and task management. 1) Automation: File backup is realized through standard libraries such as os and shutil. 2) Script writing: Use the psutil library to monitor system resources. 3) Task management: Use the schedule library to schedule tasks. Python's ease of use and rich library support makes it the preferred tool in these areas.

Running Python code in Notepad requires the Python executable and NppExec plug-in to be installed. After installing Python and adding PATH to it, configure the command "python" and the parameter "{CURRENT_DIRECTORY}{FILE_NAME}" in the NppExec plug-in to run Python code in Notepad through the shortcut key "F6".

Golang and Python each have their own advantages: Golang is suitable for high performance and concurrent programming, while Python is suitable for data science and web development. Golang is known for its concurrency model and efficient performance, while Python is known for its concise syntax and rich library ecosystem.
