此示例演示如何使用 Spark 窗口函数来确定用户激活状态,考虑 5 天的活动期,该期在后续登录时重置。 我们获得了一个用户登录的 DataFrame,目的是添加一个列来显示每个用户何时变得活跃。
方法:利用窗口函数
我们的方法使用窗口函数来识别触发活动状态重置的登录事件。 将创建一个窗口来按用户和日期排序登录。 滞后此窗口可以比较当前和之前的登录时间。
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ val window = Window.partitionBy("user_name").orderBy("login_date") val df2 = df.withColumn("previous_login", lag("login_date", 1).over(window))
解决活动状态重置
became_active
日期是通过检查上次登录 ( previous_login
) 是否在 5 天的活跃期内来确定的。如果previous_login
为空(首次登录)或时间差(login_date
- previous_login
)小于5天,则became_active
设置为当前login_date
。否则,该过程将递归地继续,直到满足此条件。
Spark 实施(版本 >= 3.2)
Spark 3.2 及更高版本提供本机会话窗口支持,简化了此任务(有关详细信息,请参阅官方文档)。
Spark 实施(旧版本)
对于较旧的 Spark 版本,需要解决方法:
val userWindow = Window.partitionBy("user_name").orderBy("login_date") val userSessionWindow = Window.partitionBy("user_name", "session")
val newSession = (coalesce( datediff($"login_date", lag($"login_date", 1).over(userWindow)), lit(0) ) > 5).cast("bigint")
val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) val result = sessionized .withColumn("became_active", min($"login_date").over(userSessionWindow)) .drop("session")
示例输出
以下输出演示了使用示例数据集的结果:
<code>+----------------+----------+-------------+ | user_name|login_date|became_active| +----------------+----------+-------------+ | OprahWinfreyJr|2012-01-10| 2012-01-10| |SirChillingtonIV|2012-01-04| 2012-01-04| |SirChillingtonIV|2012-01-11| 2012-01-11| |SirChillingtonIV|2012-01-14| 2012-01-11| |SirChillingtonIV|2012-08-11| 2012-08-11| |Booooooo99900098|2012-01-04| 2012-01-04| |Booooooo99900098|2012-01-06| 2012-01-04| +----------------+----------+-------------+</code>
以上是窗口函数如何通过 5 天活动期重置来识别用户激活状态?的详细内容。更多信息请关注PHP中文网其他相关文章!