Heim Backend-Entwicklung Python-Tutorial Python-Multiprozess-CSV-Import in die Datenbank

Python-Multiprozess-CSV-Import in die Datenbank

May 06, 2017 pm 02:54 PM
csv mysql python 多进程

Dieser Artikel teilt Ihnen die Idee und Methode der Verwendung von Python zur Implementierung des Multiprozess-Imports von CSV-Dateidaten in MySQL und die spezifische Codefreigabe mit. Freunde, die die gleichen Anforderungen haben, können darauf verweisen

I Hat vor einiger Zeit einem Kollegen geholfen, damit umzugehen. Eine Anforderung zum Importieren von CSV-Daten in MySQL. Zwei große CSV-Dateien, 3 GB mit 21 Millionen Datensätzen bzw. 7 GB mit 35 Millionen Datensätzen. Für Daten dieser Größenordnung würde ein einfacher Einzelprozess-/Einzelthread-Import viel Zeit in Anspruch nehmen, und schließlich wurde für die Implementierung ein Mehrprozessansatz verwendet. Ich werde nicht näher auf den spezifischen Prozess eingehen, aber ein paar wichtige Punkte festhalten:

  1. Fügen Sie stapelweise statt einzeln ein

  2. Um das Einfügen zu beschleunigen, erstellen Sie keinen Index

  3. Produzent und VerbraucherModell, der Hauptprozess liest die Datei , und mehrere Worker-Prozesse führen das Einfügen aus

  4. Achten Sie darauf, die Anzahl der Worker zu kontrollieren, um MySQL nicht zu stark zu belasten

  5. Achten Sie darauf Ausnahmen, die durch den Umgang mit schmutzigen Daten verursacht werden

  6. Original Die Daten sind GBK-codiert, daher müssen Sie auch auf die Konvertierung in UTF-8 achten

  7. Verwenden Sie einen Klick, um das Befehlszeilentool zu kapseln

spezifischer Code Die Implementierung ist wie folgt:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import codecs
import csv
import logging
import multiprocessing
import os
import warnings

import click
import MySQLdb
import sqlalchemy

warnings.filterwarnings('ignore', category=MySQLdb.Warning)

# 批量插入的记录数量
BATCH = 5000

DB_URI = 'mysql://root@localhost:3306/example?charset=utf8'

engine = sqlalchemy.create_engine(DB_URI)


def get_table_cols(table):
  sql = 'SELECT * FROM `{table}` LIMIT 0'.format(table=table)
  res = engine.execute(sql)
  return res.keys()


def insert_many(table, cols, rows, cursor):
  sql = 'INSERT INTO `{table}` ({cols}) VALUES ({marks})'.format(
      table=table,
      cols=', '.join(cols),
      marks=', '.join(['%s'] * len(cols)))
  cursor.execute(sql, *rows)
  logging.info('process %s inserted %s rows into table %s', os.getpid(), len(rows), table)


def insert_worker(table, cols, queue):
  rows = []
  # 每个子进程创建自己的 engine 对象
  cursor = sqlalchemy.create_engine(DB_URI)
  while True:
    row = queue.get()
    if row is None:
      if rows:
        insert_many(table, cols, rows, cursor)
      break

    rows.append(row)
    if len(rows) == BATCH:
      insert_many(table, cols, rows, cursor)
      rows = []


def insert_parallel(table, reader, w=10):
  cols = get_table_cols(table)

  # 数据队列,主进程读文件并往里写数据,worker 进程从队列读数据
  # 注意一下控制队列的大小,避免消费太慢导致堆积太多数据,占用过多内存
  queue = multiprocessing.Queue(maxsize=w*BATCH*2)
  workers = []
  for i in range(w):
    p = multiprocessing.Process(target=insert_worker, args=(table, cols, queue))
    p.start()
    workers.append(p)
    logging.info('starting # %s worker process, pid: %s...', i + 1, p.pid)

  dirty_data_file = './{}_dirty_rows.csv'.format(table)
  xf = open(dirty_data_file, 'w')
  writer = csv.writer(xf, delimiter=reader.dialect.delimiter)

  for line in reader:
    # 记录并跳过脏数据: 键值数量不一致
    if len(line) != len(cols):
      writer.writerow(line)
      continue

    # 把 None 值替换为 'NULL'
    clean_line = [None if x == 'NULL' else x for x in line]

    # 往队列里写数据
    queue.put(tuple(clean_line))
    if reader.line_num % 500000 == 0:
      logging.info('put %s tasks into queue.', reader.line_num)

  xf.close()

  # 给每个 worker 发送任务结束的信号
  logging.info('send close signal to worker processes')
  for i in range(w):
    queue.put(None)

  for p in workers:
    p.join()


def convert_file_to_utf8(f, rv_file=None):
  if not rv_file:
    name, ext = os.path.splitext(f)
    if isinstance(name, unicode):
      name = name.encode('utf8')
    rv_file = '{}_utf8{}'.format(name, ext)
  logging.info('start to process file %s', f)
  with open(f) as infd:
    with open(rv_file, 'w') as outfd:
      lines = []
      loop = 0
      chunck = 200000
      first_line = infd.readline().strip(codecs.BOM_UTF8).strip() + '\n'
      lines.append(first_line)
      for line in infd:
        clean_line = line.decode('gb18030').encode('utf8')
        clean_line = clean_line.rstrip() + '\n'
        lines.append(clean_line)
        if len(lines) == chunck:
          outfd.writelines(lines)
          lines = []
          loop += 1
          logging.info('processed %s lines.', loop * chunck)

      outfd.writelines(lines)
      logging.info('processed %s lines.', loop * chunck + len(lines))


@click.group()
def cli():
  logging.basicConfig(level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')


@cli.command('gbk_to_utf8')
@click.argument('f')
def convert_gbk_to_utf8(f):
  convert_file_to_utf8(f)


@cli.command('load')
@click.option('-t', '--table', required=True, help='表名')
@click.option('-i', '--filename', required=True, help='输入文件')
@click.option('-w', '--workers', default=10, help='worker 数量,默认 10')
def load_fac_day_pro_nos_sal_table(table, filename, workers):
  with open(filename) as fd:
    fd.readline()  # skip header
    reader = csv.reader(fd)
    insert_parallel(table, reader, w=workers)


if name == 'main':
  cli()
Nach dem Login kopieren

[Verwandte Empfehlungen]

1. Python-kostenloses Video-Tutorial

2. Python-Lernhandbuch

3. Geek Academy Python-Video-Tutorial

Das obige ist der detaillierte Inhalt vonPython-Multiprozess-CSV-Import in die Datenbank. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn

Heiße KI -Werkzeuge

Undresser.AI Undress

Undresser.AI Undress

KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover

AI Clothes Remover

Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Undress AI Tool

Undress AI Tool

Ausziehbilder kostenlos

Clothoff.io

Clothoff.io

KI-Kleiderentferner

AI Hentai Generator

AI Hentai Generator

Erstellen Sie kostenlos Ai Hentai.

Heißer Artikel

R.E.P.O. Energiekristalle erklärten und was sie tun (gelber Kristall)
1 Monate vor By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Beste grafische Einstellungen
1 Monate vor By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. So reparieren Sie Audio, wenn Sie niemanden hören können
1 Monate vor By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Chat -Befehle und wie man sie benutzt
1 Monate vor By 尊渡假赌尊渡假赌尊渡假赌

Heiße Werkzeuge

Notepad++7.3.1

Notepad++7.3.1

Einfach zu bedienender und kostenloser Code-Editor

SublimeText3 chinesische Version

SublimeText3 chinesische Version

Chinesische Version, sehr einfach zu bedienen

Senden Sie Studio 13.0.1

Senden Sie Studio 13.0.1

Leistungsstarke integrierte PHP-Entwicklungsumgebung

Dreamweaver CS6

Dreamweaver CS6

Visuelle Webentwicklungstools

SublimeText3 Mac-Version

SublimeText3 Mac-Version

Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

PHP und Python: Code Beispiele und Vergleich PHP und Python: Code Beispiele und Vergleich Apr 15, 2025 am 12:07 AM

PHP und Python haben ihre eigenen Vor- und Nachteile, und die Wahl hängt von den Projektbedürfnissen und persönlichen Vorlieben ab. 1.PHP eignet sich für eine schnelle Entwicklung und Wartung großer Webanwendungen. 2. Python dominiert das Gebiet der Datenwissenschaft und des maschinellen Lernens.

Wie ist die GPU -Unterstützung für Pytorch bei CentOS? Wie ist die GPU -Unterstützung für Pytorch bei CentOS? Apr 14, 2025 pm 06:48 PM

Aktivieren Sie die Pytorch -GPU -Beschleunigung am CentOS -System erfordert die Installation von CUDA-, CUDNN- und GPU -Versionen von Pytorch. Die folgenden Schritte führen Sie durch den Prozess: Cuda und Cudnn Installation Bestimmen Sie die CUDA-Version Kompatibilität: Verwenden Sie den Befehl nvidia-smi, um die von Ihrer NVIDIA-Grafikkarte unterstützte CUDA-Version anzuzeigen. Beispielsweise kann Ihre MX450 -Grafikkarte CUDA11.1 oder höher unterstützen. Download und installieren Sie Cudatoolkit: Besuchen Sie die offizielle Website von Nvidiacudatoolkit und laden Sie die entsprechende Version gemäß der höchsten CUDA -Version herunter und installieren Sie sie, die von Ihrer Grafikkarte unterstützt wird. Installieren Sie die Cudnn -Bibliothek:

Detaillierte Erklärung des Docker -Prinzips Detaillierte Erklärung des Docker -Prinzips Apr 14, 2025 pm 11:57 PM

Docker verwendet Linux -Kernel -Funktionen, um eine effiziente und isolierte Anwendungsumgebung zu bieten. Sein Arbeitsprinzip lautet wie folgt: 1. Der Spiegel wird als schreibgeschützte Vorlage verwendet, die alles enthält, was Sie für die Ausführung der Anwendung benötigen. 2. Das Union File System (UnionFS) stapelt mehrere Dateisysteme, speichert nur die Unterschiede, speichert Platz und beschleunigt. 3. Der Daemon verwaltet die Spiegel und Container, und der Kunde verwendet sie für die Interaktion. 4. Namespaces und CGroups implementieren Container -Isolation und Ressourcenbeschränkungen; 5. Mehrere Netzwerkmodi unterstützen die Containerverbindung. Nur wenn Sie diese Kernkonzepte verstehen, können Sie Docker besser nutzen.

Python gegen JavaScript: Community, Bibliotheken und Ressourcen Python gegen JavaScript: Community, Bibliotheken und Ressourcen Apr 15, 2025 am 12:16 AM

Python und JavaScript haben ihre eigenen Vor- und Nachteile in Bezug auf Gemeinschaft, Bibliotheken und Ressourcen. 1) Die Python-Community ist freundlich und für Anfänger geeignet, aber die Front-End-Entwicklungsressourcen sind nicht so reich wie JavaScript. 2) Python ist leistungsstark in Bibliotheken für Datenwissenschaft und maschinelles Lernen, während JavaScript in Bibliotheken und Front-End-Entwicklungsbibliotheken und Frameworks besser ist. 3) Beide haben reichhaltige Lernressourcen, aber Python eignet sich zum Beginn der offiziellen Dokumente, während JavaScript mit Mdnwebdocs besser ist. Die Wahl sollte auf Projektbedürfnissen und persönlichen Interessen beruhen.

MySQL gegen andere Datenbanken: Vergleich der Optionen MySQL gegen andere Datenbanken: Vergleich der Optionen Apr 15, 2025 am 12:08 AM

MySQL eignet sich für Webanwendungen und Content -Management -Systeme und ist beliebt für Open Source, hohe Leistung und Benutzerfreundlichkeit. 1) Im Vergleich zu Postgresql führt MySQL in einfachen Abfragen und hohen gleichzeitigen Lesevorgängen besser ab. 2) Im Vergleich zu Oracle ist MySQL aufgrund seiner Open Source und niedrigen Kosten bei kleinen und mittleren Unternehmen beliebter. 3) Im Vergleich zu Microsoft SQL Server eignet sich MySQL besser für plattformübergreifende Anwendungen. 4) Im Gegensatz zu MongoDB eignet sich MySQL besser für strukturierte Daten und Transaktionsverarbeitung.

So installieren Sie MySQL in CentOS7 So installieren Sie MySQL in CentOS7 Apr 14, 2025 pm 08:30 PM

Der Schlüssel zur eleganten Installation von MySQL liegt darin, das offizielle MySQL -Repository hinzuzufügen. Die spezifischen Schritte sind wie folgt: Laden Sie den offiziellen GPG -Schlüssel von MySQL herunter, um Phishing -Angriffe zu verhindern. Add MySQL repository file: rpm -Uvh https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm Update yum repository cache: yum update installation MySQL: yum install mysql-server startup MySQL service: systemctl start mysqld set up booting

So rufen Sie Docker lnmp an So rufen Sie Docker lnmp an Apr 15, 2025 am 11:15 AM

Docker LNMP Container Anrufschritte: Führen Sie den Container aus: Docker Run -d --Name LNMP -Container -P 80:80 -P 443: 443 LNMP -Stack, um den Container IP zu erhalten: Docker Inspecy Lnmp -Container | GREP iPaddress Access Website: http: // & lt; Container IP & gt;/index.phpssh Access: Docker Exec -it lnmp -container Bash Access MySQL: Mysql -U Roo

So installieren Sie Nginx in CentOS So installieren Sie Nginx in CentOS Apr 14, 2025 pm 08:06 PM

Die Installation von CentOS-Installationen erfordert die folgenden Schritte: Installieren von Abhängigkeiten wie Entwicklungstools, PCRE-Devel und OpenSSL-Devel. Laden Sie das Nginx -Quellcode -Paket herunter, entpacken Sie es, kompilieren Sie es und installieren Sie es und geben Sie den Installationspfad als/usr/local/nginx an. Erstellen Sie NGINX -Benutzer und Benutzergruppen und setzen Sie Berechtigungen. Ändern Sie die Konfigurationsdatei nginx.conf und konfigurieren Sie den Hörport und den Domänennamen/die IP -Adresse. Starten Sie den Nginx -Dienst. Häufige Fehler müssen beachtet werden, z. B. Abhängigkeitsprobleme, Portkonflikte und Konfigurationsdateifehler. Die Leistungsoptimierung muss entsprechend der spezifischen Situation angepasst werden, z. B. das Einschalten des Cache und die Anpassung der Anzahl der Arbeitsprozesse.

See all articles