給定一個包含Hour、Category和TotalValue欄位的DataFrame,任務是選擇每個分組的第一行,其中每個分組由Hour和Category的唯一組合定義。
<code>import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc) val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")</code>
<code>val dfMax = df.groupBy($"Hour").agg(max($"TotalValue").as("max_value")) val dfTopByJoin = df.join(dfMax, ($"Hour" === dfMax("Hour")) && ($"TotalValue" === dfMax("max_value"))).drop(dfMax("Hour")).drop(dfMax("max_value"))</code>
<code>val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs")) .groupBy($"Hour") .agg(max("vs").alias("vs")) .select($"Hour", $"vs.Category", $"vs.TotalValue")</code>
<code>import org.apache.spark.sql.Encoder import org.apache.spark.sql.expressions.Aggregator case class Record(Hour: Integer, Category: String, TotalValue: Double) def firstOfHour[T: Encoder : Aggregator]: TypedColumn[Record, Record] = { aggregator[Record, (Option[Record], Long)](Record(Hour = 0, Category = null, TotalValue = 0.0)) { (buffer, record) => if (record.Hour > buffer._2) buffer else (Some(record), record.Hour) } { (buffer1, buffer2) => if (buffer1._2 > buffer2._2) buffer1 else buffer2 } { x => x._1 match { case Some(r) => r case _ => Record(Hour = 0, Category = "0", TotalValue = 0.0) } } } df.as[Record].groupByKey(_.Hour).agg(firstOfHour[Record]).show</code>
以上是如何有效率地選擇 Spark DataFrame 中每個群組的第一行?的詳細內容。更多資訊請關注PHP中文網其他相關文章!