Pyspark:使用toLocalI重置连接

2024-09-28 05:16:50 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图使用pyspark中的for row in rdd.toLocalIterator():这样的循环在rdd数据上进行局部迭代,得到以下错误:

19/03/21 17:01:36 ERROR PythonRDD: Error while sending iterator
java.net.SocketException: Connection reset
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
        at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:515)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:527)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:527)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:527)
        at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:728)
        at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:728)
        at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:728)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1340)
        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:729)

当我在一个小表上使用rdd.collect()时,它工作得很好,但是对于大型数据集使用了太多内存。在

我已经试过增加spark.driver.memoryspark.executor.memory和{}。在

Spark版本:2.2.1

我该怎么解决这个问题?在

这是我使用的最小代码:

^{pr2}$

我也有同样的错误:

iterator = rdd.toLocalIterator()
next(iterator)

Tags: runorgapiapachejavaatsparkwrite

热门问题