Maison > base de données > tutoriel mysql > Comment sélectionner efficacement la première ligne de chaque groupe dans un Spark DataFrame ?

Comment sélectionner efficacement la première ligne de chaque groupe dans un Spark DataFrame ?

Susan Sarandon
Libérer: 2025-01-23 13:06:10
original
355 Les gens l'ont consulté

How to Efficiently Select the First Row of Each Group in a Spark DataFrame?

Comment sélectionner efficacement la première ligne de chaque groupe dans Spark DataFrame ?

Étant donné un DataFrame contenant les colonnes Hour, Category et TotalValue, la tâche consiste à sélectionner la première ligne de chaque groupe, où chaque groupe est défini par une combinaison unique d'Heure et de Catégorie.

Utiliser les fonctions de fenêtre

<code>import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc)

val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")</code>
Copier après la connexion

Utilisez du SQL simple pour rejoindre après l'agrégation

<code>val dfMax = df.groupBy($"Hour").agg(max($"TotalValue").as("max_value"))

val dfTopByJoin = df.join(dfMax, ($"Hour" === dfMax("Hour")) && ($"TotalValue" === dfMax("max_value"))).drop(dfMax("Hour")).drop(dfMax("max_value"))</code>
Copier après la connexion

Utiliser le tri structurel

<code>val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
  .groupBy($"Hour")
  .agg(max("vs").alias("vs"))
  .select($"Hour", $"vs.Category", $"vs.TotalValue")</code>
Copier après la connexion

Utiliser l'API DataSet (Spark 1.6, 2.0)

<code>import org.apache.spark.sql.Encoder
import org.apache.spark.sql.expressions.Aggregator

case class Record(Hour: Integer, Category: String, TotalValue: Double)

def firstOfHour[T: Encoder : Aggregator]: TypedColumn[Record, Record] = {
  aggregator[Record, (Option[Record], Long)](Record(Hour = 0, Category = null, TotalValue = 0.0)) { (buffer, record) =>
    if (record.Hour > buffer._2) buffer else (Some(record), record.Hour)
  } { (buffer1, buffer2) =>
    if (buffer1._2 > buffer2._2) buffer1 else buffer2
  } { x =>
    x._1 match {
      case Some(r) => r
      case _ => Record(Hour = 0, Category = "0", TotalValue = 0.0)
    }
  }
}

df.as[Record].groupByKey(_.Hour).agg(firstOfHour[Record]).show</code>
Copier après la connexion

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Derniers articles par auteur
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal