Heim > Datenbank > MySQL-Tutorial > Wie wähle ich effizient die erste Zeile jeder Gruppe in einem Spark-DataFrame aus?

Wie wähle ich effizient die erste Zeile jeder Gruppe in einem Spark-DataFrame aus?

Susan Sarandon
Freigeben: 2025-01-23 13:06:10
Original
300 Leute haben es durchsucht

How to Efficiently Select the First Row of Each Group in a Spark DataFrame?

Wie kann ich die erste Zeile jeder Gruppierung in Spark DataFrame effizient auswählen?

Bei einem gegebenen DataFrame mit den Spalten „Stunde“, „Kategorie“ und „Gesamtwert“ besteht die Aufgabe darin, die erste Zeile jeder Gruppierung auszuwählen, wobei jede Gruppierung durch eine eindeutige Kombination aus „Stunde“ und „Kategorie“ definiert wird.

Fensterfunktionen verwenden

<code>import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc)

val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")</code>
Nach dem Login kopieren

Verwenden Sie einfaches SQL, um nach der Aggregation beizutreten

<code>val dfMax = df.groupBy($"Hour").agg(max($"TotalValue").as("max_value"))

val dfTopByJoin = df.join(dfMax, ($"Hour" === dfMax("Hour")) && ($"TotalValue" === dfMax("max_value"))).drop(dfMax("Hour")).drop(dfMax("max_value"))</code>
Nach dem Login kopieren

Strukturelle Sortierung verwenden

<code>val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
  .groupBy($"Hour")
  .agg(max("vs").alias("vs"))
  .select($"Hour", $"vs.Category", $"vs.TotalValue")</code>
Nach dem Login kopieren

Verwenden Sie die DataSet-API (Spark 1.6, 2.0)

<code>import org.apache.spark.sql.Encoder
import org.apache.spark.sql.expressions.Aggregator

case class Record(Hour: Integer, Category: String, TotalValue: Double)

def firstOfHour[T: Encoder : Aggregator]: TypedColumn[Record, Record] = {
  aggregator[Record, (Option[Record], Long)](Record(Hour = 0, Category = null, TotalValue = 0.0)) { (buffer, record) =>
    if (record.Hour > buffer._2) buffer else (Some(record), record.Hour)
  } { (buffer1, buffer2) =>
    if (buffer1._2 > buffer2._2) buffer1 else buffer2
  } { x =>
    x._1 match {
      case Some(r) => r
      case _ => Record(Hour = 0, Category = "0", TotalValue = 0.0)
    }
  }
}

df.as[Record].groupByKey(_.Hour).agg(firstOfHour[Record]).show</code>
Nach dem Login kopieren

Das obige ist der detaillierte Inhalt vonWie wähle ich effizient die erste Zeile jeder Gruppe in einem Spark-DataFrame aus?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Neueste Artikel des Autors
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage