Apache Beam is an open source distributed data processing framework that provides a unified programming model that can run on different batch and stream processing engines. Recently, a very useful feature was added to Apache Beam's Go SDK - selecting the first N rows from a PCollection. This feature is very helpful for scenarios where large data sets need to be sampled or quickly previewed. In this article, we'll cover how to use this feature in Apache Beam's Go SDK and show some practical example code. let's start!
I have a pcollection from which I need to select the n largest rows. I'm trying to create a dataflow pipeline using go and am stuck.
package main import ( "context" "flag" "fmt" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) type user struct { name string age int } func printrow(ctx context.context, list user) { fmt.println(list) } func main() { flag.parse() beam.init() ctx := context.background() p := beam.newpipeline() s := p.root() var userlist = []user{ {"bob", 5}, {"adam", 8}, {"john", 3}, {"ben", 1}, {"jose", 1}, {"bryan", 1}, {"kim", 1}, {"tim", 1}, } initial := beam.createlist(s, userlist) pc2 := beam.pardo(s, func(row user, emit func(user)) { emit(row) }, initial) beam.pardo0(s, printrow, pc2) if err := beamx.run(ctx, p); err != nil { log.exitf(ctx, "failed to execute job: %v", err) } }
From the above code, I need to select the first 5 rows based on user.age I found the link at the top of the package which has the same functionality but it says it returns a single element pcollection. what is the difference?
package main import ( "context" "flag" "fmt" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) func init() { beam.RegisterFunction(less) } type User struct { Name string Age int } func printRow(ctx context.Context, list User) { fmt.Println(list) } func less(a, b User) bool { return a.Age < b.Age } func main() { flag.Parse() beam.Init() ctx := context.Background() p := beam.NewPipeline() s := p.Root() var userList = []User{ {"Bob", 5}, {"Adam", 8}, {"John", 3}, {"Ben", 1}, {"Jose", 1}, {"Bryan", 1}, {"Kim", 1}, {"Tim", 1}, } initial := beam.CreateList(s, userList) best := top.Largest(s, initial, 5, less) pc2 := beam.ParDo(s, func(row User, emit func(User)) { emit(row) }, best) beam.ParDo0(s, printRow, pc2) if err := beamx.Run(ctx, p); err != nil { log.Exitf(ctx, "Failed to execute job: %v", err) } }
I added the function to select the first 5 rows like above, but I got the error []main.user is not allocate to main.user
I need the pcollection in the same format as before because I need to process it further. I suspect this is because the top.largest function returns a single element pcollection. Any ideas on how to convert the format?
The best pcollection is []user
So give it a try...
pc2 := beam.ParDo(s, func(rows []User, emit func(User)) { for _, row := range rows { emit(row) } }, best)
The above is the detailed content of Apache Beam select top N rows from PCollection in Go. For more information, please follow other related articles on the PHP Chinese website!