首页 > 数据库 > mysql教程 > 如何高效地选择 Spark DataFrame 中每个组的第一行?

如何高效地选择 Spark DataFrame 中每个组的第一行?

Susan Sarandon
发布: 2025-01-23 13:06:10
原创
338 人浏览过

How to Efficiently Select the First Row of Each Group in a Spark DataFrame?

如何高效地选择Spark DataFrame中每个分组的第一行?

给定一个包含Hour、Category和TotalValue列的DataFrame,任务是选择每个分组的第一行,其中每个分组由Hour和Category的唯一组合定义。

使用窗口函数

<code>import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc)

val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")</code>
登录后复制

使用简单的SQL聚合后进行连接

<code>val dfMax = df.groupBy($"Hour").agg(max($"TotalValue").as("max_value"))

val dfTopByJoin = df.join(dfMax, ($"Hour" === dfMax("Hour")) && ($"TotalValue" === dfMax("max_value"))).drop(dfMax("Hour")).drop(dfMax("max_value"))</code>
登录后复制

使用结构排序

<code>val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
  .groupBy($"Hour")
  .agg(max("vs").alias("vs"))
  .select($"Hour", $"vs.Category", $"vs.TotalValue")</code>
登录后复制

使用DataSet API (Spark 1.6 , 2.0 )

<code>import org.apache.spark.sql.Encoder
import org.apache.spark.sql.expressions.Aggregator

case class Record(Hour: Integer, Category: String, TotalValue: Double)

def firstOfHour[T: Encoder : Aggregator]: TypedColumn[Record, Record] = {
  aggregator[Record, (Option[Record], Long)](Record(Hour = 0, Category = null, TotalValue = 0.0)) { (buffer, record) =>
    if (record.Hour > buffer._2) buffer else (Some(record), record.Hour)
  } { (buffer1, buffer2) =>
    if (buffer1._2 > buffer2._2) buffer1 else buffer2
  } { x =>
    x._1 match {
      case Some(r) => r
      case _ => Record(Hour = 0, Category = "0", TotalValue = 0.0)
    }
  }
}

df.as[Record].groupByKey(_.Hour).agg(firstOfHour[Record]).show</code>
登录后复制

以上是如何高效地选择 Spark DataFrame 中每个组的第一行?的详细内容。更多信息请关注PHP中文网其他相关文章!

来源:php.cn
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
作者最新文章
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板