擅长:python、mysql、java
<p>如果你想继续使用RDDAPI。<code>mapPartitions</code>接受一个类型的迭代器,并期望另一个类型的迭代器作为结果。df不是<code>mapPartitions</code>可以直接处理的迭代器类型。如果您必须使用pandas api,那么只需从<code>pandas.iterrows</code>创建一个适当的生成器即可</p>
<p>这样,整个<code>mapPartitions</code>结果将是行类型的单个rdd,而不是数据帧的rdd。这样的rdd可以无缝地转换成一个数据帧,并通过动态模式发现返回</p>
<pre><code>from pyspark.sql import Row
def some_fuction(iter):
pandas_df = some_pandas_result(iter)
for index, row in pandas_df.iterrows():
yield Row(id=index, foo=row['foo'], bar=row['bar'])
rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))
df = spark.createDataFrame(rdd)
</code></pre>