基于我之前的问题Spark and Python use custom file format/generator as input for RDD,我认为我应该能够通过sc.TEXT文件(),然后使用my或某些库中的自定义函数。在
现在我特别尝试使用gensim框架解析wikipedia转储。我已经在我的主节点和所有的worker节点上安装了gensim,现在我想使用gensim内置函数来解析wikipedia页面,灵感来自这个问题List (or iterator) of tuples returned by MAP (PySpark)。在
我的代码如下:
import sys
import gensim
from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) != 2:
print >> sys.stderr, "Usage: wordcount <file>"
exit(-1)
sc = SparkContext(appName="Process wiki - distributed RDD")
distData = sc.textFile(sys.argv[1])
#take 10 only to see how the output would look like
processed_data = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)
print processed_data
sc.stop()
extract_页面的源代码可以在https://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.py找到,根据我的浏览,它似乎应该与Spark一起工作。在
但不幸的是,当我运行代码时,我得到了以下错误日志:
^{pr2}$还有一些可能是火花测井:
14/10/05 13:21:12 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
14/10/05 13:21:12 INFO scheduler.DAGScheduler: Failed to run runJob at PythonRDD.scala:296
以及
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
我在没有Spark的情况下尝试过,所以问题应该出在Spark和gensim的组合中,但是我不太理解我得到的错误。我在gensim的第190行没有看到任何文件在读维基语料库.py. 在
编辑:
从Spark添加了更多日志:
编辑2:
gensim使用xml.etree.cElementTree import iterparse
,文档here,这可能会导致问题。它实际上需要包含xml数据的文件名或文件。RDD可以被认为是包含xml数据的文件吗?在
我通常在Scala中使用Spark。 不过,我的想法是:
通过加载文件时sc.TEXT文件,它是一种线迭代器,分布在您的Sparkorker上。 我认为给定了wikipedia的xml格式,其中一行不一定对应于一个可解析的xml项,因此您会遇到这个问题。在
即:
如果您尝试单独解析每一行,它将抛出与您所得到的异常类似的异常。在
我通常要处理wikipedia的垃圾堆,所以我要做的第一件事就是把它转换成一个“可读的版本”,很容易被Spark消化掉。i、 e:每个条目一行。 一旦你有了它,你就可以很容易地把它输入spark,并进行各种处理。 它不需要太多的资源来改造它
看看ReadableWiki: https://github.com/idio/wiki2vec
相关问题 更多 >
编程相关推荐