Dieses Beispiel zeigt, wie Spark-Fensterfunktionen verwendet werden, um den Aktivierungsstatus des Benutzers zu ermitteln. Dabei wird ein aktiver Zeitraum von 5 Tagen berücksichtigt, der bei nachfolgenden Anmeldungen zurückgesetzt wird. Wir erhalten einen DataFrame mit Benutzeranmeldungen und möchten eine Spalte hinzufügen, die anzeigt, wann jeder Benutzer aktiv wurde.
Methodik: Fensterfunktionen nutzen
Unser Ansatz verwendet Fensterfunktionen, um Anmeldeereignisse zu identifizieren, die ein Zurücksetzen des aktiven Status auslösen. Es wird ein Fenster zum Sortieren der Anmeldungen nach Benutzer und Datum erstellt. Durch die Verzögerung dieses Fensters ist ein Vergleich zwischen aktuellen und vorherigen Anmeldezeiten möglich.
<code class="language-scala">import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ val window = Window.partitionBy("user_name").orderBy("login_date") val df2 = df.withColumn("previous_login", lag("login_date", 1).over(window))</code>
Umgang mit aktiven Status-Resets
Das became_active
Datum wird ermittelt, indem überprüft wird, ob die vorherige Anmeldung ( previous_login
) in den 5-tägigen aktiven Zeitraum fällt. Wenn previous_login
null ist (erste Anmeldung) oder der Zeitunterschied (login_date
- previous_login
) weniger als 5 Tage beträgt, wird became_active
auf das aktuelle login_date
gesetzt. Andernfalls wird der Prozess rekursiv fortgesetzt, bis diese Bedingung erfüllt ist.
Spark-Implementierung (Versionen >= 3.2)
Spark 3.2 und höher bieten native Unterstützung für Sitzungsfenster und vereinfachen so diese Aufgabe (Einzelheiten finden Sie in der offiziellen Dokumentation).
Spark-Implementierung (ältere Versionen)
Für ältere Spark-Versionen ist eine Problemumgehung erforderlich:
<code class="language-scala">val userWindow = Window.partitionBy("user_name").orderBy("login_date") val userSessionWindow = Window.partitionBy("user_name", "session")</code>
<code class="language-scala">val newSession = (coalesce( datediff($"login_date", lag($"login_date", 1).over(userWindow)), lit(0) ) > 5).cast("bigint")</code>
<code class="language-scala">val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) val result = sessionized .withColumn("became_active", min($"login_date").over(userSessionWindow)) .drop("session")</code>
Beispielausgabe
Die folgende Ausgabe zeigt das Ergebnis anhand eines Beispieldatensatzes:
<code>+----------------+----------+-------------+ | user_name|login_date|became_active| +----------------+----------+-------------+ | OprahWinfreyJr|2012-01-10| 2012-01-10| |SirChillingtonIV|2012-01-04| 2012-01-04| |SirChillingtonIV|2012-01-11| 2012-01-11| |SirChillingtonIV|2012-01-14| 2012-01-11| |SirChillingtonIV|2012-08-11| 2012-08-11| |Booooooo99900098|2012-01-04| 2012-01-04| |Booooooo99900098|2012-01-06| 2012-01-04| +----------------+----------+-------------+</code>
Das obige ist der detaillierte Inhalt vonWie können Fensterfunktionen den Benutzeraktivierungsstatus mit einer 5-tägigen Zurücksetzung des aktiven Zeitraums identifizieren?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!