質問:
Web サイトにログインしたユーザーの記録を含むデータフレーム。ユーザーがいつアクティブになるかを判断し、アクティビティの期間を考慮する必要があります。この期間後にユーザーが再度ログインすると、アクティブな日付がリセットされます。
提案された方法:
ヒステリシスと再帰を備えたウィンドウ関数を使用して、アクティビティ期間内の最初または最新のログインを特定し、アクティビティの日付を決定します。
Spark ネイティブ ソリューション (>= 3.2):
Spark 3.2 以降はセッション ウィンドウをサポートします。使用例については公式ドキュメントを参照してください。
レガシー ソリューション (Spark
インポート関数:
Window
はウィンドウの定義に使用されますcoalesce
、datediff
、lag
、lit
、min
、sum
定義ウィンドウ:
userWindow
user_name
によってパーティション化され、login_date
userSessionWindow
user_name
と session
によるパーティション 新しいセッションの開始を見つけます:
datediff
と lag
を使用してログイン日を比較し、アクティブな期間よりも大きなギャップがあるかどうかを確認します。 cast
に変換するには、bigint
を使用します。 userWindow
で sum
を使用して、新しいセッションの開始を蓄積します。 各セッションの最も早い日付を検索します:
withColumn
列を追加するには、session
を使用します。 userSessionWindow
で min
を使用して、各セッションの最も早い login_date
を見つけます。 session
列を削除します。 例:
<code class="language-scala"> val df = Seq( ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"), ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"), ("SirChillingtonIV", "2012-08-11") ).toDF("user_name", "login_date") val result = sessionized //sessionized is assumed to be defined elsewhere, this is a crucial part missing from the original .withColumn("became_active", min($"login_date").over(userSessionWindow)) .drop("session") df.show(5) result.show(5)</code>
sessionized
の定義がコード例にないことに注意してください。これは、このソリューションを完了するための重要な部分です。 session
列は、アクティビティ期間とログイン日に基づいて計算する必要があります。 これには通常、カスタム関数またはより複雑なウィンドウ関数ロジックが必要です。 完全なソリューションには、この欠落しているコード部分を追加する必要があります。
以上がウィンドウ関数を使用して Spark SQL でユーザーのアクティブ日を確認する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。