Spark RDD에서 SQL의 행 번호 매기기를 복제하는 방법
문제 이해
원하는 것 Spark RDD의 각 항목에 대해 특정 열을 기준으로 정렬된 순차적 행 번호를 생성합니다. 키 열로 분할됩니다. SQL의 row_number() over(partition by ... order by ...)와 유사하지만 Spark RDD를 사용합니다.
초기 시도
초기 시도가 사용되었습니다. sortByKey 및 zipWithIndex는 원하는 분할 행 번호를 생성하지 못했습니다. sortBy는 RDD에 직접 적용할 수 없으므로 먼저 수집해야 하므로 RDD가 아닌 출력이 생성됩니다.
Spark 1.4를 사용하는 솔루션
데이터 준비
(K, (col1, col2, col3)).
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3)) val temp1 = sc.parallelize(sample_data)
분할된 행 번호 생성
분할된 창에서 rowNumber를 사용하여 각 키에 대한 행 번호를 생성합니다.
import org.apache.spark.sql.functions._ temp1.toDF("key", "col1", "col2", "col3").withColumn("rownum", rowNumber() over (Window partitionBy "key" orderBy desc("col2"), "col3")))
예시 출력
+---+----+----+----+------+ |key|col1|col2|col3|rownum| +---+----+----+----+------+ |1,2|1 |4 |7 |2 | |1,2|1 |2 |3 |1 | |1,2|2 |2 |3 |3 | |3,4|5 |5 |5 |1 | |3,4|5 |5 |9 |2 | |3,4|7 |5 |5 |3 | +---+----+----+----+------+
위 내용은 SQL의 `row_number()`와 유사한 Spark RDD에서 순차적 행 번호를 생성하는 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!