使用PyCharm的Windows上的PySpark:java.net.SocketException

2024-09-28 05:21:41 发布

您现在位置:Python中文网/ 问答频道 /正文

从昨天开始,我就面临着一个奇怪的行为。我正在用PyCharm和spark1.5开发windows。在

我在ipython笔记本上成功地运行了以下代码(使用相同版本的python,但在集群上)。但是,在我的Windows环境中使用Pycharm启动它时,我得到了这样的结果:

from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark import SparkConf, SparkContext

from pyspark.sql import SQLContext

# SQL / Spark context:
conf = (SparkConf().setMaster("local").setAppName("analysis"))#.set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

# Input CSV files :
inputCsvFile = "survey.csv"
separator = ','

# read the input file into a RDD
rdd = sc.textFile(inputCsvFile).split(separator)
header = rdd.first().split(separator)

# build the Schema: (some basic functions to chreate StructType object with string as default type)
schema = dictSchemaFromColumnsList(header)
schemaDf = dictSchemaToDFSchema(schema)

# create Dataframe:
df = sqlContext.createDataFrame(rdd, schemaDf)
pprint(rdd.first())
print('\ndf.count()=' + str(df.count()))

# display
df.show()

16/06/23 11:46:32 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)

java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.FilterOutputStream.write(FilterOutputStream.java:97) at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590) at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:410) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:249) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208) 16/06/23 11:46:32 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.FilterOutputStream.write(FilterOutputStream.java:97) at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590) at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:410) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:249) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)

16/06/23 11:46:32 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job 16/06/23 11:46:32 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 16/06/23 11:46:32 INFO TaskSchedulerImpl: Cancelling stage 1 16/06/23 11:46:32 INFO DAGScheduler: ResultStage 1 (runJob at PythonRDD.scala:361) failed in 0.792 s 16/06/23 11:46:32 INFO DAGScheduler: Job 1 failed: runJob at PythonRDD.scala:361, took 0.802922 s Traceback (most recent call last): File "C:/Users/home/PycharmProjects/pySpark_analysis/Survey_2011-2016_Analysis.py", line 38, in df = sqlContext.createDataFrame(rdd, schemaDf) File "C:\Spark\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\context.py", line 404, in createDataFrame rdd, schema = self._createFromRDD(data, schema, samplingRatio) File "C:\Spark\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\context.py", line 296, in _createFromRDD rows = rdd.take(10) File "C:\Spark\spark-1.5.0-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "C:\Spark\spark-1.5.0-bin-hadoop2.6\python\pyspark\context.py", line 916, in runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "C:\Spark\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in call File "C:\Spark\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\utils.py", line 36, in deco return f(*a, **kw) File "C:\Spark\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.FilterOutputStream.write(FilterOutputStream.java:97) at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590) at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:410) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:249) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839) at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:361) at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.FilterOutputStream.write(FilterOutputStream.java:97) at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590) at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:410) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:249) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)

16/06/23 11:46:32 INFO SparkContext: Invoking stop() from shutdown hook

奇怪的是,如果我在调试模式下运行代码并添加一个基本指令,比如:

^{pr2}$

我有时能用密码。。。。这使得跑步不一致。。。。 任何建议都将不胜感激。。。在

更新: 在回顾了这个问题之后,它看起来与Matei在下文中描述的行为完全相同。显然,这个问题在缩短输入csv文件时得到了解决。在


Tags: ioorgapinetapachejavaatspark
1条回答
网友
1楼 · 发布于 2024-09-28 05:21:41

在使用(假设)大文件(20000行)并尝试使用regex过滤它们时,我也遇到了同样的问题:

import re
pattern = re.compile("...")
rdd.filter(lambda x: pattern.search(x) is not None)

而且,正如你所描述的那样,这种行为是间歇性的。 在将文件截断到大约2000行之后,它正常工作。在

相关问题 更多 >

    热门问题