ホームページ バックエンド開発 Python チュートリアル Python マルチプロセスで CSV をデータベースにインポート

Python マルチプロセスで CSV をデータベースにインポート

May 06, 2017 pm 02:54 PM
csv mysql python マルチプログレス

この記事では、Python を使用して CSV ファイル データのマルチプロセス インポートを MySQL に実装するアイデアと方法、および特定のコード共有について説明します。同じニーズを持つ友人は参考にしてください。同僚は、CSV データを MySQL 要件にインポートするという問題に取り組んでいます。 2 つの大きな CSV ファイル、それぞれ 2,100 万レコードを含む 3GB と 3,500 万レコードを含む 7GB。この規模のデータの場合、単純な単一プロセス/単一スレッドのインポートには長い時間がかかるため、最終的にはマルチプロセスのアプローチを使用して実装されました。特定のプロセスについては詳しく説明しませんが、重要なポイントをいくつか記録します。

    1 つずつ挿入するのではなく、バッチで挿入します
  1. 挿入を高速化するために、
  2. インデックスを構築しないでください。まず

  3. プロデューサーとコンシューマー
  4. モデル

    、メインプロセスがファイルを読み取り、複数のワーカープロセスが挿入を実行します

  5. MySQLに過度の負荷をかけないようワーカーの数の制御に注意してください
  6. ダーティデータの処理による例外に注意してください
  7. 元のデータはGBKエンコードされているため、UTF-8への変換にも注意してください
  8. クリックを使用してコマンドラインツールをカプセル化してください

  9. コードの実装は次のとおりです:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import codecs
import csv
import logging
import multiprocessing
import os
import warnings

import click
import MySQLdb
import sqlalchemy

warnings.filterwarnings('ignore', category=MySQLdb.Warning)

# 批量插入的记录数量
BATCH = 5000

DB_URI = 'mysql://root@localhost:3306/example?charset=utf8'

engine = sqlalchemy.create_engine(DB_URI)


def get_table_cols(table):
  sql = 'SELECT * FROM `{table}` LIMIT 0'.format(table=table)
  res = engine.execute(sql)
  return res.keys()


def insert_many(table, cols, rows, cursor):
  sql = 'INSERT INTO `{table}` ({cols}) VALUES ({marks})'.format(
      table=table,
      cols=', '.join(cols),
      marks=', '.join(['%s'] * len(cols)))
  cursor.execute(sql, *rows)
  logging.info('process %s inserted %s rows into table %s', os.getpid(), len(rows), table)


def insert_worker(table, cols, queue):
  rows = []
  # 每个子进程创建自己的 engine 对象
  cursor = sqlalchemy.create_engine(DB_URI)
  while True:
    row = queue.get()
    if row is None:
      if rows:
        insert_many(table, cols, rows, cursor)
      break

    rows.append(row)
    if len(rows) == BATCH:
      insert_many(table, cols, rows, cursor)
      rows = []


def insert_parallel(table, reader, w=10):
  cols = get_table_cols(table)

  # 数据队列,主进程读文件并往里写数据,worker 进程从队列读数据
  # 注意一下控制队列的大小,避免消费太慢导致堆积太多数据,占用过多内存
  queue = multiprocessing.Queue(maxsize=w*BATCH*2)
  workers = []
  for i in range(w):
    p = multiprocessing.Process(target=insert_worker, args=(table, cols, queue))
    p.start()
    workers.append(p)
    logging.info('starting # %s worker process, pid: %s...', i + 1, p.pid)

  dirty_data_file = './{}_dirty_rows.csv'.format(table)
  xf = open(dirty_data_file, 'w')
  writer = csv.writer(xf, delimiter=reader.dialect.delimiter)

  for line in reader:
    # 记录并跳过脏数据: 键值数量不一致
    if len(line) != len(cols):
      writer.writerow(line)
      continue

    # 把 None 值替换为 'NULL'
    clean_line = [None if x == 'NULL' else x for x in line]

    # 往队列里写数据
    queue.put(tuple(clean_line))
    if reader.line_num % 500000 == 0:
      logging.info('put %s tasks into queue.', reader.line_num)

  xf.close()

  # 给每个 worker 发送任务结束的信号
  logging.info('send close signal to worker processes')
  for i in range(w):
    queue.put(None)

  for p in workers:
    p.join()


def convert_file_to_utf8(f, rv_file=None):
  if not rv_file:
    name, ext = os.path.splitext(f)
    if isinstance(name, unicode):
      name = name.encode('utf8')
    rv_file = '{}_utf8{}'.format(name, ext)
  logging.info('start to process file %s', f)
  with open(f) as infd:
    with open(rv_file, 'w') as outfd:
      lines = []
      loop = 0
      chunck = 200000
      first_line = infd.readline().strip(codecs.BOM_UTF8).strip() + '\n'
      lines.append(first_line)
      for line in infd:
        clean_line = line.decode('gb18030').encode('utf8')
        clean_line = clean_line.rstrip() + '\n'
        lines.append(clean_line)
        if len(lines) == chunck:
          outfd.writelines(lines)
          lines = []
          loop += 1
          logging.info('processed %s lines.', loop * chunck)

      outfd.writelines(lines)
      logging.info('processed %s lines.', loop * chunck + len(lines))


@click.group()
def cli():
  logging.basicConfig(level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')


@cli.command('gbk_to_utf8')
@click.argument('f')
def convert_gbk_to_utf8(f):
  convert_file_to_utf8(f)


@cli.command('load')
@click.option('-t', '--table', required=True, help='表名')
@click.option('-i', '--filename', required=True, help='输入文件')
@click.option('-w', '--workers', default=10, help='worker 数量,默认 10')
def load_fac_day_pro_nos_sal_table(table, filename, workers):
  with open(filename) as fd:
    fd.readline()  # skip header
    reader = csv.reader(fd)
    insert_parallel(table, reader, w=workers)


if name == 'main':
  cli()
ログイン後にコピー

[関連する推奨事項]

1.

Python 学習マニュアル

3.

Geek Academy Python ビデオ チュートリアル

以上がPython マルチプロセスで CSV をデータベースにインポートの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

Python:自動化、スクリプト、およびタスク管理 Python:自動化、スクリプト、およびタスク管理 Apr 16, 2025 am 12:14 AM

Pythonは、自動化、スクリプト、およびタスク管理に優れています。 1)自動化:OSやShutilなどの標準ライブラリを介してファイルバックアップが実現されます。 2)スクリプトの書き込み:Psutilライブラリを使用してシステムリソースを監視します。 3)タスク管理:スケジュールライブラリを使用してタスクをスケジュールします。 Pythonの使いやすさと豊富なライブラリサポートにより、これらの分野で優先ツールになります。

ターミナルVSCODEでプログラムを実行する方法 ターミナルVSCODEでプログラムを実行する方法 Apr 15, 2025 pm 06:42 PM

VSコードでは、次の手順を通じて端末でプログラムを実行できます。コードを準備し、統合端子を開き、コードディレクトリが端末作業ディレクトリと一致していることを確認します。プログラミング言語(pythonのpython your_file_name.pyなど)に従って実行コマンドを選択して、それが正常に実行されるかどうかを確認し、エラーを解決します。デバッガーを使用して、デバッグ効率を向上させます。

VSCODE拡張機能は悪意がありますか? VSCODE拡張機能は悪意がありますか? Apr 15, 2025 pm 07:57 PM

VSコード拡張機能は、悪意のあるコードの隠れ、脆弱性の活用、合法的な拡張機能としての自慰行為など、悪意のあるリスクを引き起こします。悪意のある拡張機能を識別する方法には、パブリッシャーのチェック、コメントの読み取り、コードのチェック、およびインストールに注意してください。セキュリティ対策には、セキュリティ認識、良好な習慣、定期的な更新、ウイルス対策ソフトウェアも含まれます。

vscodeとは何ですか?vscodeとは何ですか? vscodeとは何ですか?vscodeとは何ですか? Apr 15, 2025 pm 06:45 PM

VSコードは、Microsoftが開発した無料のオープンソースクロスプラットフォームコードエディターと開発環境であるフルネームVisual Studioコードです。幅広いプログラミング言語をサポートし、構文の強調表示、コード自動完了、コードスニペット、および開発効率を向上させるスマートプロンプトを提供します。リッチな拡張エコシステムを通じて、ユーザーは、デバッガー、コードフォーマットツール、GIT統合など、特定のニーズや言語に拡張機能を追加できます。 VSコードには、コードのバグをすばやく見つけて解決するのに役立つ直感的なデバッガーも含まれています。

vscodeはMacで使用できます vscodeはMacで使用できます Apr 15, 2025 pm 07:45 PM

VSコードはMACOでうまく機能し、開発効率を向上させることができます。インストールと構成の手順には、インストールとコードと構成が含まれます。言語固有の拡張機能(JavaScriptのESLINTなど)をインストールします。拡張機能を慎重に取り付けて、過度のスタートアップが遅くなることを避けます。 GIT統合、ターミナル、デバッガーなどの基本的な機能を学びます。適切なテーマとコードフォントを設定します。潜在的な問題に注意:拡張互換性、ファイル許可など。

Python vs. JavaScript:学習曲線と使いやすさ Python vs. JavaScript:学習曲線と使いやすさ Apr 16, 2025 am 12:12 AM

Pythonは、スムーズな学習曲線と簡潔な構文を備えた初心者により適しています。 JavaScriptは、急な学習曲線と柔軟な構文を備えたフロントエンド開発に適しています。 1。Python構文は直感的で、データサイエンスやバックエンド開発に適しています。 2。JavaScriptは柔軟で、フロントエンドおよびサーバー側のプログラミングで広く使用されています。

Pythonを実行していないVSCODEで何が起こっているのか Pythonを実行していないVSCODEで何が起こっているのか Apr 15, 2025 pm 06:00 PM

最も一般的な「Pythonを実行できません」という問題は、Pythonインタープリターパスの誤った構成に起因します。ソリューションには、Pythonのインストールの確認、コードの構成、仮想環境の使用が含まれます。さらに、仮想環境を使用した依存関係の分離、ブレークポイントを使用したコード実行の追跡、モニタリング式などを使用したリアルタイムのトラッキング依存関係など、ブレークポイントのデバッグ、可変監視、ログ出力、コードフォーマットなど、効率的なデバッグ手法とベストプラクティスがあります。

VSコードはPythonを実行できます VSコードはPythonを実行できます Apr 15, 2025 pm 08:21 PM

はい、VSコードはPythonコードを実行できます。 VSコードでPythonを効率的に実行するには、次の手順を完了します。Pythonインタープリターをインストールし、環境変数を構成します。 VSコードにPython拡張機能をインストールします。コマンドラインを介してVSコードの端末でPythonコードを実行します。 VSコードのデバッグ機能とコードフォーマットを使用して、開発効率を向上させます。優れたプログラミング習慣を採用し、パフォーマンス分析ツールを使用してコードパフォーマンスを最適化します。

See all articles