ホームページ > バックエンド開発 > Python チュートリアル > Python での大規模データ処理の最適化: CSV 操作の並列化ガイド

Python での大規模データ処理の最適化: CSV 操作の並列化ガイド

DDD
リリース: 2024-12-13 06:26:15
オリジナル
289 人が閲覧しました

Optimizing Large-Scale Data Processing in Python: A Guide to Parallelizing CSV Operations

問題

pandas.read_csv() の使用などの標準的なアプローチは、大規模な CSV ファイルを処理する場合には不十分であることがよくあります。これらのメソッドはシングルスレッドであり、ディスク I/O またはメモリの制限によりすぐにボトルネックになる可能性があります。


究極の Python プログラマー模擬テスト


解決

CSV 操作を並列化することで、複数の CPU コアを利用してデータをより高速かつ効率的に処理できます。このガイドでは、以下を使用したテクニックの概要を説明します:

  1. Dask: pandas コードへの変更を最小限に抑えた並列計算。
  2. Polars: 高性能 DataFrame ライブラリ。
  3. Python のマルチプロセッシング モジュール: カスタム並列化。
  4. ファイル分割: より小さなチャンクを使用して分割統治します。

テクニック

1.大きなファイルの分割

大きな CSV ファイルを小さなチャンクに分割すると、並列処理が可能になります。サンプル スクリプトは次のとおりです:

import os

def split_csv(file_path, lines_per_chunk=1000000):
    with open(file_path, 'r') as file:
        header = file.readline()
        file_count = 0
        output_file = None
        for i, line in enumerate(file):
            if i % lines_per_chunk == 0:
                if output_file:
                    output_file.close()
                file_count += 1
                output_file = open(f'chunk_{file_count}.csv', 'w')
                output_file.write(header)
            output_file.write(line)
        if output_file:
            output_file.close()
    print(f"Split into {file_count} files.")

ログイン後にコピー

2. Daskによる並列処理

Dask は、Python で大規模なデータを処理するための革新的なツールです。大規模なデータセットに対する操作を簡単に並列化できます。

import dask.dataframe as dd

# Load the dataset as a Dask DataFrame
df = dd.read_csv('large_file.csv')

# Perform parallel operations
result = df[df['column_name'] > 100].groupby('another_column').mean()

# Save the result
result.to_csv('output_*.csv', single_file=True)

ログイン後にコピー

Dask は、データのチャンクを操作し、利用可能なコア全体でタスクをインテリジェントにスケジュールすることで、メモリの制約を処理します。


究極の Python プログラマー模擬テスト


3. Polars でスーパーチャージ

Polars は、Rust の速度と Python の柔軟性を組み合わせた比較的新しいライブラリです。最新のハードウェア向けに設計されており、パンダよりも大幅に高速に CSV ファイルを処理できます。

import polars as pl

# Read CSV using Polars
df = pl.read_csv('large_file.csv')

# Filter and aggregate data
filtered_df = df.filter(pl.col('column_name') > 100).groupby('another_column').mean()

# Write to CSV
filtered_df.write_csv('output.csv')


ログイン後にコピー

Polars は、速度と並列性が重要な状況で優れています。これは、複数のコアを備えたシステムに特に効果的です。

4.マルチプロセッシングによる手動並列処理

処理ロジックを制御し続けたい場合は、Python のマルチプロセッシング モジュールを使用して CSV 操作を並列化する簡単な方法を提供します。

from multiprocessing import Pool
import pandas as pd

def process_chunk(file_path):
    df = pd.read_csv(file_path)
    # Perform operations
    filtered_df = df[df['column_name'] > 100]
    return filtered_df

if __name__ == '__main__':
    chunk_files = [f'chunk_{i}.csv' for i in range(1, 6)]
    with Pool(processes=4) as pool:
        results = pool.map(process_chunk, chunk_files)

    # Combine results
    combined_df = pd.concat(results)
    combined_df.to_csv('final_output.csv', index=False)

ログイン後にコピー

主な考慮事項

  1. ディスク I/O と CPU バインドの比較

    並列戦略が CPU 処理とディスクの読み取り/書き込み速度のバランスをとるようにしてください。ボトルネックが I/O であるか計算であるかに基づいて最適化します。

  2. メモリオーバーヘッド

    Dask や Polars などのツールは、手動マルチプロセッシングに比べてメモリ効率が高くなります。システムのメモリ制約に合わせたツールを選択してください。

  3. エラー処理

    並列処理により、デバッグとエラー管理が複雑になる可能性があります。信頼性を確保するために、堅牢なロギングと例外処理を実装します。


究極の Python プログラマー模擬テスト

以上がPython での大規模データ処理の最適化: CSV 操作の並列化ガイドの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:dev.to
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート