def fun_1(csv):
# returns int[] of length = Number of New Lines in String csv
def fun_2(csv): # My WorkArround to Pass one CSV Line at One Time
return fun_1(csv)[0]
输入数据帧为df
+----+----+-----+
|col1|col2|CSVs |
+----+----+-----+
| 1| a|2,0,1|
| 2| b|2,0,2|
| 3| c|2,0,3|
| 4| a|2,0,1|
| 5| b|2,0,2|
| 6| c|2,0,3|
| 7| a|2,0,1|
+----+----+-----+
下面是一段代码片段,它可以工作,但需要很长时间
from pyspark.sql.functions import udf
from pyspark.sql import functions as sf
funudf = udf(fun_2) # wish it could be fun_1
df=df.withColumn( 'pred' , funudf(sf.col('csv')))
fun_1
,内存有问题,一次最多只能处理50000行。我希望使用funudf = udf(fun_1)
。
因此,如何将PySpark DF拆分为50000行的段,调用funudf ->fun_1
。
输出有两个列,分别来自输入的“col1”和“funudf返回值”
通过使用RDDAPI中公开的
groupByKey
方法,可以实现强制PySpark对固定批次的行进行操作的预期结果。使用groupByKey
将强制PySpark将单个密钥的所有数据洗牌到单个执行器注意:出于同样的原因,由于网络成本,通常不鼓励使用
groupByKey
战略:
groupByKey
pyspark.resultiterable.ResultIterable
上运行,这是groupByKey
的结果。使用mapValues
将函数应用于您的组例如:
相关问题 更多 >
编程相关推荐