Golang は同時プログラミングに非常に適していることが証明されており、Goroutine は非同期プログラミングよりも読みやすく、エレガントで効率的です。この記事では、大量のデータのバッチ処理(ETL)に適した、Golangによる実装に適したパイプライン実行モデルを提案します。
次のようなアプリケーション シナリオを想像しました。 (推奨学習: Go )
## データベース A (Cassandra) からユーザー レビューをロードします (大量の、たとえば、たとえば 10 億)、各コメントのユーザー ID に基づいてデータベース B (MySQL) からユーザー情報を関連付け、NLP サービス (自然言語処理) を呼び出して各コメントを処理し、処理結果をデータベース C (ElasticSearch) に書き込みます。 アプリケーションではさまざまな問題が発生するため、これらの要件を要約します。要件 1: データはバッチで処理する必要があります。たとえば、バッチごとに 100 項目を指定します。問題 (データベース障害など) が発生すると中断され、次回プログラムを開始するときに中断から再開するためにチェックポイントが使用されます。
要件 2: データベースと NLP サービスに適切な負荷がかかるように、プロセスごとに適切な同時実行数を設定します (他のビジネスに影響を与えず、ETL パフォーマンスを向上させるためにできるだけ多くのリソースを占有します)。たとえば、手順 (1) ~ (4) では、同時実行数をそれぞれ 1、4、8、および 2 に設定します。
再利用可能なパイプライン モジュール
ETL 作業をより効率的に完了するために、パイプラインをモジュールに抽象化しました。まずコードを貼り付けてから、意味を分析します。このモジュールは直接使用でき、主に使用されるインターフェイスは NewPipeline、Async、Wait です。 このパイプライン コンポーネントを使用すると、ETL プログラムはシンプル、効率的、信頼性の高いものになり、プログラマは面倒な同時プロセス制御から解放されます:package main import "log" func main() { //恢复上次执行的checkpoint,如果是第一次执行就获取一个初始值。 checkpoint := loadCheckpoint() //工序(1)在pipeline外执行,最后一个工序是保存checkpoint pipeline := NewPipeline(4, 8, 2, 1) for { //(1) //加载100条数据,并修改变量checkpoint //data是数组,每个元素是一条评论,之后的联表、NLP都直接修改data里的每条记录。 data, err := extractReviewsFromA(&checkpoint, 100) if err != nil { log.Print(err) break } //这里有个Golang著名的坑。 //“checkpoint”是循环体外的变量,它在内存中只有一个实例并在循环中不断被修改,所以不能在异步中使用它。 //这里创建一个副本curCheckpoint,储存本次循环的checkpoint。 curCheckpoint := checkpoint ok := pipeline.Async(func() error { //(2) return joinUserFromB(data) }, func() error { //(3) return nlp(data) }, func() error { //(4) return loadDataToC(data) }, func() error { //(5)保存checkpoint log.Print("done:", curCheckpoint) return saveCheckpoint(curCheckpoint) }) if !ok { break } if len(data) < 100 { break } //处理完毕 } err := pipeline.Wait() if err != nil { log.Print(err) } }
以上がgolang がビッグデータを処理する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。