Python マルチプロセスで CSV をデータベースにインポート
この記事では、Python を使用して CSV ファイル データのマルチプロセス インポートを MySQL に実装するアイデアと方法、および特定のコード共有について説明します。同じニーズを持つ友人は参考にしてください。同僚は、CSV データを MySQL 要件にインポートするという問題に取り組んでいます。 2 つの大きな CSV ファイル、それぞれ 2,100 万レコードを含む 3GB と 3,500 万レコードを含む 7GB。この規模のデータの場合、単純な単一プロセス/単一スレッドのインポートには長い時間がかかるため、最終的にはマルチプロセスのアプローチを使用して実装されました。特定のプロセスについては詳しく説明しませんが、重要なポイントをいくつか記録します。
- 1 つずつ挿入するのではなく、バッチで挿入します
- 挿入を高速化するために、
- インデックスを構築しないでください。まず プロデューサーとコンシューマー
- モデル MySQLに過度の負荷をかけないようワーカーの数の制御に注意してください
- ダーティデータの処理による例外に注意してください
- 元のデータはGBKエンコードされているため、UTF-8への変換にも注意してください
- クリックを使用してコマンドラインツールをカプセル化してください
#!/usr/bin/env python # -*- coding: utf-8 -*- import codecs import csv import logging import multiprocessing import os import warnings import click import MySQLdb import sqlalchemy warnings.filterwarnings('ignore', category=MySQLdb.Warning) # 批量插入的记录数量 BATCH = 5000 DB_URI = 'mysql://root@localhost:3306/example?charset=utf8' engine = sqlalchemy.create_engine(DB_URI) def get_table_cols(table): sql = 'SELECT * FROM `{table}` LIMIT 0'.format(table=table) res = engine.execute(sql) return res.keys() def insert_many(table, cols, rows, cursor): sql = 'INSERT INTO `{table}` ({cols}) VALUES ({marks})'.format( table=table, cols=', '.join(cols), marks=', '.join(['%s'] * len(cols))) cursor.execute(sql, *rows) logging.info('process %s inserted %s rows into table %s', os.getpid(), len(rows), table) def insert_worker(table, cols, queue): rows = [] # 每个子进程创建自己的 engine 对象 cursor = sqlalchemy.create_engine(DB_URI) while True: row = queue.get() if row is None: if rows: insert_many(table, cols, rows, cursor) break rows.append(row) if len(rows) == BATCH: insert_many(table, cols, rows, cursor) rows = [] def insert_parallel(table, reader, w=10): cols = get_table_cols(table) # 数据队列,主进程读文件并往里写数据,worker 进程从队列读数据 # 注意一下控制队列的大小,避免消费太慢导致堆积太多数据,占用过多内存 queue = multiprocessing.Queue(maxsize=w*BATCH*2) workers = [] for i in range(w): p = multiprocessing.Process(target=insert_worker, args=(table, cols, queue)) p.start() workers.append(p) logging.info('starting # %s worker process, pid: %s...', i + 1, p.pid) dirty_data_file = './{}_dirty_rows.csv'.format(table) xf = open(dirty_data_file, 'w') writer = csv.writer(xf, delimiter=reader.dialect.delimiter) for line in reader: # 记录并跳过脏数据: 键值数量不一致 if len(line) != len(cols): writer.writerow(line) continue # 把 None 值替换为 'NULL' clean_line = [None if x == 'NULL' else x for x in line] # 往队列里写数据 queue.put(tuple(clean_line)) if reader.line_num % 500000 == 0: logging.info('put %s tasks into queue.', reader.line_num) xf.close() # 给每个 worker 发送任务结束的信号 logging.info('send close signal to worker processes') for i in range(w): queue.put(None) for p in workers: p.join() def convert_file_to_utf8(f, rv_file=None): if not rv_file: name, ext = os.path.splitext(f) if isinstance(name, unicode): name = name.encode('utf8') rv_file = '{}_utf8{}'.format(name, ext) logging.info('start to process file %s', f) with open(f) as infd: with open(rv_file, 'w') as outfd: lines = [] loop = 0 chunck = 200000 first_line = infd.readline().strip(codecs.BOM_UTF8).strip() + '\n' lines.append(first_line) for line in infd: clean_line = line.decode('gb18030').encode('utf8') clean_line = clean_line.rstrip() + '\n' lines.append(clean_line) if len(lines) == chunck: outfd.writelines(lines) lines = [] loop += 1 logging.info('processed %s lines.', loop * chunck) outfd.writelines(lines) logging.info('processed %s lines.', loop * chunck + len(lines)) @click.group() def cli(): logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s') @cli.command('gbk_to_utf8') @click.argument('f') def convert_gbk_to_utf8(f): convert_file_to_utf8(f) @cli.command('load') @click.option('-t', '--table', required=True, help='表名') @click.option('-i', '--filename', required=True, help='输入文件') @click.option('-w', '--workers', default=10, help='worker 数量,默认 10') def load_fac_day_pro_nos_sal_table(table, filename, workers): with open(filename) as fd: fd.readline() # skip header reader = csv.reader(fd) insert_parallel(table, reader, w=workers) if name == 'main': cli()
[関連する推奨事項]
1.
Python 学習マニュアルGeek Academy Python ビデオ チュートリアル以上がPython マルチプロセスで CSV をデータベースにインポートの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック









Pythonは、自動化、スクリプト、およびタスク管理に優れています。 1)自動化:OSやShutilなどの標準ライブラリを介してファイルバックアップが実現されます。 2)スクリプトの書き込み:Psutilライブラリを使用してシステムリソースを監視します。 3)タスク管理:スケジュールライブラリを使用してタスクをスケジュールします。 Pythonの使いやすさと豊富なライブラリサポートにより、これらの分野で優先ツールになります。

VSコードでは、次の手順を通じて端末でプログラムを実行できます。コードを準備し、統合端子を開き、コードディレクトリが端末作業ディレクトリと一致していることを確認します。プログラミング言語(pythonのpython your_file_name.pyなど)に従って実行コマンドを選択して、それが正常に実行されるかどうかを確認し、エラーを解決します。デバッガーを使用して、デバッグ効率を向上させます。

VSコード拡張機能は、悪意のあるコードの隠れ、脆弱性の活用、合法的な拡張機能としての自慰行為など、悪意のあるリスクを引き起こします。悪意のある拡張機能を識別する方法には、パブリッシャーのチェック、コメントの読み取り、コードのチェック、およびインストールに注意してください。セキュリティ対策には、セキュリティ認識、良好な習慣、定期的な更新、ウイルス対策ソフトウェアも含まれます。

VSコードは、Microsoftが開発した無料のオープンソースクロスプラットフォームコードエディターと開発環境であるフルネームVisual Studioコードです。幅広いプログラミング言語をサポートし、構文の強調表示、コード自動完了、コードスニペット、および開発効率を向上させるスマートプロンプトを提供します。リッチな拡張エコシステムを通じて、ユーザーは、デバッガー、コードフォーマットツール、GIT統合など、特定のニーズや言語に拡張機能を追加できます。 VSコードには、コードのバグをすばやく見つけて解決するのに役立つ直感的なデバッガーも含まれています。

VSコードはMACOでうまく機能し、開発効率を向上させることができます。インストールと構成の手順には、インストールとコードと構成が含まれます。言語固有の拡張機能(JavaScriptのESLINTなど)をインストールします。拡張機能を慎重に取り付けて、過度のスタートアップが遅くなることを避けます。 GIT統合、ターミナル、デバッガーなどの基本的な機能を学びます。適切なテーマとコードフォントを設定します。潜在的な問題に注意:拡張互換性、ファイル許可など。

Pythonは、スムーズな学習曲線と簡潔な構文を備えた初心者により適しています。 JavaScriptは、急な学習曲線と柔軟な構文を備えたフロントエンド開発に適しています。 1。Python構文は直感的で、データサイエンスやバックエンド開発に適しています。 2。JavaScriptは柔軟で、フロントエンドおよびサーバー側のプログラミングで広く使用されています。

最も一般的な「Pythonを実行できません」という問題は、Pythonインタープリターパスの誤った構成に起因します。ソリューションには、Pythonのインストールの確認、コードの構成、仮想環境の使用が含まれます。さらに、仮想環境を使用した依存関係の分離、ブレークポイントを使用したコード実行の追跡、モニタリング式などを使用したリアルタイムのトラッキング依存関係など、ブレークポイントのデバッグ、可変監視、ログ出力、コードフォーマットなど、効率的なデバッグ手法とベストプラクティスがあります。

はい、VSコードはPythonコードを実行できます。 VSコードでPythonを効率的に実行するには、次の手順を完了します。Pythonインタープリターをインストールし、環境変数を構成します。 VSコードにPython拡張機能をインストールします。コマンドラインを介してVSコードの端末でPythonコードを実行します。 VSコードのデバッグ機能とコードフォーマットを使用して、開発効率を向上させます。優れたプログラミング習慣を採用し、パフォーマンス分析ツールを使用してコードパフォーマンスを最適化します。
