使用itertools.groupby在pyspark但是fai

2024-10-01 09:19:21 发布

您现在位置:Python中文网/ 问答频道 /正文

我编写了一个map函数来使用itertools.groupby,我所做的事情如下。在

驱动程序代码

pair_count = df.mapPartitions(lambda iterable: pair_func_cnt(iterable))
pair_count.collection()

映射函数

^{pr2}$

但它给出了以下错误

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/serializers.py", line 267, in dump_stream
    bytes = self.serializer.dumps(vs)
  File "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/serializers.py", line 415, in dumps
    return pickle.dumps(obj, protocol)
PicklingError: Can't pickle <type 'itertools._grouper'>: attribute lookup itertools._grouper failed
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

Tags: orgapibinapachezipatsparkpyspark
1条回答
网友
1楼 · 发布于 2024-10-01 09:19:21

Pythonpickle无法序列化匿名函数。让我们用一个简单的例子来说明:

import pickle

xs = [[1, 2, 3], [1, 2, 5], [1, 3, 5], [2, 4, 6]]
pickle.dumps([x for x in  groupby(xs, lambda x: x[0])])

## PicklingError
## ...
## PicklingError: Can't pickle ...

在序列化之前,应删除对lambdas的所有引用:

^{pr2}$

或者不要使用lambda表达式:

from operator import itemgetter

pickle.dumps([kv for kv in groupby(xs, itemgetter(0))])

## b'\x80\x ...

相关问题 更多 >