Bei einem gegebenen DataFrame mit den Spalten „Stunde“, „Kategorie“ und „Gesamtwert“ besteht die Aufgabe darin, die erste Zeile jeder Gruppierung auszuwählen, wobei jede Gruppierung durch eine eindeutige Kombination aus „Stunde“ und „Kategorie“ definiert wird.
<code>import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc) val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")</code>
<code>val dfMax = df.groupBy($"Hour").agg(max($"TotalValue").as("max_value")) val dfTopByJoin = df.join(dfMax, ($"Hour" === dfMax("Hour")) && ($"TotalValue" === dfMax("max_value"))).drop(dfMax("Hour")).drop(dfMax("max_value"))</code>
<code>val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs")) .groupBy($"Hour") .agg(max("vs").alias("vs")) .select($"Hour", $"vs.Category", $"vs.TotalValue")</code>
<code>import org.apache.spark.sql.Encoder import org.apache.spark.sql.expressions.Aggregator case class Record(Hour: Integer, Category: String, TotalValue: Double) def firstOfHour[T: Encoder : Aggregator]: TypedColumn[Record, Record] = { aggregator[Record, (Option[Record], Long)](Record(Hour = 0, Category = null, TotalValue = 0.0)) { (buffer, record) => if (record.Hour > buffer._2) buffer else (Some(record), record.Hour) } { (buffer1, buffer2) => if (buffer1._2 > buffer2._2) buffer1 else buffer2 } { x => x._1 match { case Some(r) => r case _ => Record(Hour = 0, Category = "0", TotalValue = 0.0) } } } df.as[Record].groupByKey(_.Hour).agg(firstOfHour[Record]).show</code>
Das obige ist der detaillierte Inhalt vonWie wähle ich effizient die erste Zeile jeder Gruppe in einem Spark-DataFrame aus?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!