Apache Beam 从 Go 中的 PCollection 中选择前 N 行
Apache Beam 是一个开源的分布式数据处理框架,它提供了一种统一的编程模型,可以在不同的批处理和流处理引擎上运行。最近,Apache Beam 的 Go SDK 中新增了一个非常有用的功能——从 PCollection 中选择前 N 行。这个功能对于需要对大型数据集进行采样或者快速预览的场景非常有帮助。在本文中,我们将介绍如何在 Apache Beam 的 Go SDK 中使用这个功能,并展示一些实际的示例代码。让我们开始吧!
问题内容
我有一个 pcollection,我需要从中选择 n 个最大的行。我正在尝试使用 go 创建一个数据流管道并陷入困境。
package main import ( "context" "flag" "fmt" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) type user struct { name string age int } func printrow(ctx context.context, list user) { fmt.println(list) } func main() { flag.parse() beam.init() ctx := context.background() p := beam.newpipeline() s := p.root() var userlist = []user{ {"bob", 5}, {"adam", 8}, {"john", 3}, {"ben", 1}, {"jose", 1}, {"bryan", 1}, {"kim", 1}, {"tim", 1}, } initial := beam.createlist(s, userlist) pc2 := beam.pardo(s, func(row user, emit func(user)) { emit(row) }, initial) beam.pardo0(s, printrow, pc2) if err := beamx.run(ctx, p); err != nil { log.exitf(ctx, "failed to execute job: %v", err) } }
从上面的代码中,我需要根据 user.age 选择前 5 行 我发现链接顶部包具有相同的功能,但它说它返回单个元素 pcollection。有什么不同?
package main import ( "context" "flag" "fmt" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) func init() { beam.RegisterFunction(less) } type User struct { Name string Age int } func printRow(ctx context.Context, list User) { fmt.Println(list) } func less(a, b User) bool { return a.Age < b.Age } func main() { flag.Parse() beam.Init() ctx := context.Background() p := beam.NewPipeline() s := p.Root() var userList = []User{ {"Bob", 5}, {"Adam", 8}, {"John", 3}, {"Ben", 1}, {"Jose", 1}, {"Bryan", 1}, {"Kim", 1}, {"Tim", 1}, } initial := beam.CreateList(s, userList) best := top.Largest(s, initial, 5, less) pc2 := beam.ParDo(s, func(row User, emit func(User)) { emit(row) }, best) beam.ParDo0(s, printRow, pc2) if err := beamx.Run(ctx, p); err != nil { log.Exitf(ctx, "Failed to execute job: %v", err) } }
我像上面一样添加了选择前 5 行的函数,但出现错误 []main.user is not allocate to main.user
我需要与以前相同格式的 pcollection,因为我需要进一步处理。我怀疑这是因为 top.largest 函数返回单个元素 pcollection。关于如何转换格式有什么想法吗?
解决方法
最好的 pcollection 是 []user
所以尝试一下...
pc2 := beam.ParDo(s, func(rows []User, emit func(User)) { for _, row := range rows { emit(row) } }, best)
以上是Apache Beam 从 Go 中的 PCollection 中选择前 N 行的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

热门话题

OpenSSL,作为广泛应用于安全通信的开源库,提供了加密算法、密钥和证书管理等功能。然而,其历史版本中存在一些已知安全漏洞,其中一些危害极大。本文将重点介绍Debian系统中OpenSSL的常见漏洞及应对措施。DebianOpenSSL已知漏洞:OpenSSL曾出现过多个严重漏洞,例如:心脏出血漏洞(CVE-2014-0160):该漏洞影响OpenSSL1.0.1至1.0.1f以及1.0.2至1.0.2beta版本。攻击者可利用此漏洞未经授权读取服务器上的敏感信息,包括加密密钥等。

Go爬虫Colly中的Queue线程问题探讨在使用Go语言的Colly爬虫库时,开发者常常会遇到关于线程和请求队列的问题。�...

Go语言中用于浮点数运算的库介绍在Go语言(也称为Golang)中,进行浮点数的加减乘除运算时,如何确保精度是�...

后端学习路径:从前端转型到后端的探索之旅作为一名从前端开发转型的后端初学者,你已经有了nodejs的基础,...

本文讨论了GO编程中的GO FMT命令,该命令将代码格式化以遵守官方样式准则。它突出了GO FMT在维持代码一致性,可读性和降低样式辩论方面的重要性。 FO的最佳实践

在BeegoORM框架下,如何指定模型关联的数据库?许多Beego项目需要同时操作多个数据库。当使用Beego...
