Golang has proven to be very suitable for concurrent programming. Goroutine is more readable, elegant and efficient than asynchronous programming. This article proposes a Pipeline execution model suitable for implementation by Golang, which is suitable for batch processing of large amounts of data (ETL).
imagined such application scenarios: (Recommended Learning: Go )
## Load user reviews from Database A (Cassandra) (huge quantity, for example, for example 1 billion); associate user information from database B (MySQL) based on the user ID of each comment; call the NLP service (natural language processing) to process each comment; write the processing results to database C (ElasticSearch). Due to various problems encountered in the application, these requirements are summarized:Requirement 1: Data should be processed in batches, for example, 100 items per batch are specified. When a problem occurs (such as any database failure), it will be interrupted, and checkpoint will be used to resume from the interruption the next time the program starts.
Requirement 2: Set a reasonable number of concurrencies for each process, so that the database and NLP services have a reasonable load (without affecting other businesses, occupy as many resources as possible to improve ETL performance). For example, steps (1)-(4) set the concurrency numbers to 1, 4, 8, and 2 respectively.
Reusable Pipeline module
In order to complete the ETL work more efficiently, I abstracted Pipeline into modules. I'll paste the code first and then analyze the meaning. The module can be used directly, and the main interfaces used are: NewPipeline, Async, and Wait. Using this Pipeline component, our ETL program will be simple, efficient, and reliable, freeing programmers from cumbersome concurrent process control:package main import "log" func main() { //恢复上次执行的checkpoint,如果是第一次执行就获取一个初始值。 checkpoint := loadCheckpoint() //工序(1)在pipeline外执行,最后一个工序是保存checkpoint pipeline := NewPipeline(4, 8, 2, 1) for { //(1) //加载100条数据,并修改变量checkpoint //data是数组,每个元素是一条评论,之后的联表、NLP都直接修改data里的每条记录。 data, err := extractReviewsFromA(&checkpoint, 100) if err != nil { log.Print(err) break } //这里有个Golang著名的坑。 //“checkpoint”是循环体外的变量,它在内存中只有一个实例并在循环中不断被修改,所以不能在异步中使用它。 //这里创建一个副本curCheckpoint,储存本次循环的checkpoint。 curCheckpoint := checkpoint ok := pipeline.Async(func() error { //(2) return joinUserFromB(data) }, func() error { //(3) return nlp(data) }, func() error { //(4) return loadDataToC(data) }, func() error { //(5)保存checkpoint log.Print("done:", curCheckpoint) return saveCheckpoint(curCheckpoint) }) if !ok { break } if len(data) < 100 { break } //处理完毕 } err := pipeline.Wait() if err != nil { log.Print(err) } }
The above is the detailed content of How golang handles big data. For more information, please follow other related articles on the PHP Chinese website!