首页 > 数据库 > mysql教程 > 如何在Spark中高效地选择每个组的顶行?

如何在Spark中高效地选择每个组的顶行?

Susan Sarandon
发布: 2025-01-23 12:57:10
原创
969 人浏览过

How to Efficiently Select the Top Row for Each Group in Spark?

高效选择每个分组的首行

本文旨在提取每个“小时”和“类别”分组中“总值”最高的那一行数据。 有多种方法可以实现:

使用窗口函数:

窗口函数提供了一种在每个分组内进行计算的有效方法。 以下是一种实现方式:

<code>import org.apache.spark.sql.functions.{row_number, max, broadcast}
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc)

val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")</code>
登录后复制

使用SQL聚合和连接:

另一种方法是利用SQL聚合和后续连接:

<code>val dfMax = df.groupBy($"Hour".as("max_hour")).agg(max($"TotalValue").as("max_value"))

val dfTopByJoin = df.join(broadcast(dfMax),
    ($"Hour" === $"max_hour") && ($"TotalValue" === $"max_value"))
  .drop("max_hour")
  .drop("max_value")</code>
登录后复制

使用结构体排序:

一个巧妙的方法是对包含“总值”和“类别”的结构体进行排序:

<code>val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
  .groupBy($"Hour")
  .agg(max("vs").alias("vs"))
  .select($"Hour", $"vs.Category", $"vs.TotalValue")</code>
登录后复制

使用DataSet API (Spark 1.6 ):

DataSet API提供了一种简洁的方法来实现相同的结果:

<code>case class Record(Hour: Integer, Category: String, TotalValue: Double)

df.as[Record]
  .groupBy($"Hour")
  .reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y)</code>
登录后复制

避免错误方法:

以下方法可能会产生不可靠的结果,应避免使用:

  • df.orderBy(...).groupBy(...).agg(first(...), ...)
  • df.orderBy(...).dropDuplicates(...)

以上是如何在Spark中高效地选择每个组的顶行?的详细内容。更多信息请关注PHP中文网其他相关文章!

来源:php.cn
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
作者最新文章
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板