我正在尝试编写一个PySpark脚本,该脚本将根据客户所属的每个零售商的收入对客户进行排名。我目前的解决方案如下:
unique_retailers = RETAILERS.select('ID').distinct().rdd.map(lambda r: r[0]).collect()
CUSTOMERS = CUSTOMERS.orderBy(sf.col('REVENUE'), ascending=False)
for i in unique_retailers:
RANK = CUSTOMERS.select('ID').where(sf.substring(sf.col('ID'), 0, 1) == sf.lit(i)).withColumn('RANK_'+i, sf.monotonically_increasing_id())
RANK.show()
CUSTOMERS = CUSTOMERS.join(RANK, ['ID'], 'left')
CUSTOMERS.show()
其中,在每个CUSTOMERS.ID
中,第一个字符是特定零售商的RETAILERS.ID
。然而monotonically_increasing_id())
的行为却出人意料,我期望值遵循v(k+1) = v(k) + 1
的模式,它们似乎由于某种原因出现了很大的差异。示例如下:
+--------+--------------------+
|ID |RANK_1 |
+--------+--------------------+
|1_502765| 0|
|1_522762| 1|
|1_532768| 17179869184|
|1_452763| 68719476736|
|1_522766| 94489280512|
|1_512769| 214748364800|
|1_542766| 223338299392|
|1_452766| 549755813888|
|1_542769| 549755813889|
|1_512766| 721554505728|
|1_132760| 962072674304|
|1_522761| 996432412672|
|1_542764| 1065151889408|
|1_172765| 1151051235328|
|1_542762| 1194000908288|
|1_542765| 1245540515840|
|1_532766| 1254130450432|
|1_542760| 1400159338496|
|1_172767| 1408749273088|
|1_412764| 1511828488192|
+--------+--------------------+
单调并不意味着连续。你所经历的实际上是预期的行为。看看documentation。。。请记住,spark是分布式的,因此生成连续的索引虽然不是不可能的,但也不是微不足道的。你知道吗
相关问题 更多 >
编程相关推荐