首頁 > 資料庫 > mysql教程 > 如何有效率地選擇 Spark DataFrame 中每個群組的第一行?

如何有效率地選擇 Spark DataFrame 中每個群組的第一行?

Susan Sarandon
發布: 2025-01-23 13:06:10
原創
299 人瀏覽過

How to Efficiently Select the First Row of Each Group in a Spark DataFrame?

如何有效率地選擇Spark DataFrame中每個分組的第一行?

給定一個包含Hour、Category和TotalValue欄位的DataFrame,任務是選擇每個分組的第一行,其中每個分組由Hour和Category的唯一組合定義。

使用視窗函數

<code>import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc)

val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")</code>
登入後複製

使用簡單的SQL聚合後進行連線

<code>val dfMax = df.groupBy($"Hour").agg(max($"TotalValue").as("max_value"))

val dfTopByJoin = df.join(dfMax, ($"Hour" === dfMax("Hour")) && ($"TotalValue" === dfMax("max_value"))).drop(dfMax("Hour")).drop(dfMax("max_value"))</code>
登入後複製

使用結構排序

<code>val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
  .groupBy($"Hour")
  .agg(max("vs").alias("vs"))
  .select($"Hour", $"vs.Category", $"vs.TotalValue")</code>
登入後複製

使用DataSet API (Spark 1.6 , 2.0 )

<code>import org.apache.spark.sql.Encoder
import org.apache.spark.sql.expressions.Aggregator

case class Record(Hour: Integer, Category: String, TotalValue: Double)

def firstOfHour[T: Encoder : Aggregator]: TypedColumn[Record, Record] = {
  aggregator[Record, (Option[Record], Long)](Record(Hour = 0, Category = null, TotalValue = 0.0)) { (buffer, record) =>
    if (record.Hour > buffer._2) buffer else (Some(record), record.Hour)
  } { (buffer1, buffer2) =>
    if (buffer1._2 > buffer2._2) buffer1 else buffer2
  } { x =>
    x._1 match {
      case Some(r) => r
      case _ => Record(Hour = 0, Category = "0", TotalValue = 0.0)
    }
  }
}

df.as[Record].groupByKey(_.Hour).agg(firstOfHour[Record]).show</code>
登入後複製

以上是如何有效率地選擇 Spark DataFrame 中每個群組的第一行?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
作者最新文章
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板