Home > Backend Development > Golang > Apache Beam select top N rows from PCollection in Go

Apache Beam select top N rows from PCollection in Go

PHPz
Release: 2024-02-10 17:48:08
forward
583 people have browsed it

Apache Beam 从 Go 中的 PCollection 中选择前 N 行

Apache Beam is an open source distributed data processing framework that provides a unified programming model that can run on different batch and stream processing engines. Recently, a very useful feature was added to Apache Beam's Go SDK - selecting the first N rows from a PCollection. This feature is very helpful for scenarios where large data sets need to be sampled or quickly previewed. In this article, we'll cover how to use this feature in Apache Beam's Go SDK and show some practical example code. let's start!

Question content

I have a pcollection from which I need to select the n largest rows. I'm trying to create a dataflow pipeline using go and am stuck.

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

type user struct {
    name string
    age  int
}

func printrow(ctx context.context, list user) {
    fmt.println(list)
}

func main() {

    flag.parse()
    beam.init()

    ctx := context.background()

    p := beam.newpipeline()
    s := p.root()

    var userlist = []user{
        {"bob", 5},
        {"adam", 8},
        {"john", 3},
        {"ben", 1},
        {"jose", 1},
        {"bryan", 1},
        {"kim", 1},
        {"tim", 1},
    }
    initial := beam.createlist(s, userlist)

    pc2 := beam.pardo(s, func(row user, emit func(user)) {
        emit(row)
    }, initial)

    beam.pardo0(s, printrow, pc2)

    if err := beamx.run(ctx, p); err != nil {
        log.exitf(ctx, "failed to execute job: %v", err)
    }

}
Copy after login

From the above code, I need to select the first 5 rows based on user.age I found the link at the top of the package which has the same functionality but it says it returns a single element pcollection. what is the difference?

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.RegisterFunction(less)
}

type User struct {
    Name string
    Age  int
}

func printRow(ctx context.Context, list User) {
    fmt.Println(list)
}

func less(a, b User) bool {
    return a.Age < b.Age
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var userList = []User{
        {"Bob", 5},
        {"Adam", 8},
        {"John", 3},
        {"Ben", 1},
        {"Jose", 1},
        {"Bryan", 1},
        {"Kim", 1},
        {"Tim", 1},
    }
    initial := beam.CreateList(s, userList)

    best := top.Largest(s, initial, 5, less)

    pc2 := beam.ParDo(s, func(row User, emit func(User)) {
        emit(row)
    }, best)

    beam.ParDo0(s, printRow, pc2)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}
Copy after login

I added the function to select the first 5 rows like above, but I got the error []main.user is not allocate to main.user

I need the pcollection in the same format as before because I need to process it further. I suspect this is because the top.largest function returns a single element pcollection. Any ideas on how to convert the format?

Solution

The best pcollection is []user

So give it a try...

pc2 := beam.ParDo(s, func(rows []User, emit func(User)) {
    for _, row := range rows {
        emit(row)
    }
}, best)
Copy after login

The above is the detailed content of Apache Beam select top N rows from PCollection in Go. For more information, please follow other related articles on the PHP Chinese website!

source:stackoverflow.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template