Spark和Python试图使用gensim解析wikipedia

2024-06-01 10:24:01 发布

您现在位置:Python中文网/ 问答频道 /正文

基于我之前的问题Spark and Python use custom file format/generator as input for RDD,我认为我应该能够通过sc.TEXT文件(),然后使用my或某些库中的自定义函数。在

现在我特别尝试使用gensim框架解析wikipedia转储。我已经在我的主节点和所有的worker节点上安装了gensim,现在我想使用gensim内置函数来解析wikipedia页面,灵感来自这个问题List (or iterator) of tuples returned by MAP (PySpark)。在

我的代码如下:

import sys
import gensim
from pyspark import SparkContext


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print >> sys.stderr, "Usage: wordcount <file>"
        exit(-1)

    sc = SparkContext(appName="Process wiki - distributed RDD")

    distData = sc.textFile(sys.argv[1])
    #take 10 only to see how the output would look like
    processed_data = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)

    print processed_data
    sc.stop()

extract_页面的源代码可以在https://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.py找到,根据我的浏览,它似乎应该与Spark一起工作。在

但不幸的是,当我运行代码时,我得到了以下错误日志:

^{pr2}$

还有一些可能是火花测井:

14/10/05 13:21:12 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
14/10/05 13:21:12 INFO scheduler.DAGScheduler: Failed to run runJob at PythonRDD.scala:296

以及

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

我在没有Spark的情况下尝试过,所以问题应该出在Spark和gensim的组合中,但是我不太理解我得到的错误。我在gensim的第190行没有看到任何文件在读维基语料库.py. 在

编辑:

从Spark添加了更多日志:

编辑2:

gensim使用xml.etree.cElementTree import iterparse,文档here,这可能会导致问题。它实际上需要包含xml数据的文件名或文件。RDD可以被认为是包含xml数据的文件吗?在


Tags: 文件orgimportapachesysatschedulerspark
1条回答
网友
1楼 · 发布于 2024-06-01 10:24:01

我通常在Scala中使用Spark。 不过,我的想法是:

通过加载文件时sc.TEXT文件,它是一种线迭代器,分布在您的Sparkorker上。 我认为给定了wikipedia的xml格式,其中一行不一定对应于一个可解析的xml项,因此您会遇到这个问题。在

即:

 Line 1 :  <item>
 Line 2 :  <title> blabla </title> <subitem>
 Line 3 : </subItem>
 Line 4 : </item>

如果您尝试单独解析每一行,它将抛出与您所得到的异常类似的异常。在

我通常要处理wikipedia的垃圾堆,所以我要做的第一件事就是把它转换成一个“可读的版本”,很容易被Spark消化掉。i、 e:每个条目一行。 一旦你有了它,你就可以很容易地把它输入spark,并进行各种处理。 它不需要太多的资源来改造它

看看ReadableWiki: https://github.com/idio/wiki2vec

相关问题 更多 >