无法序列化类org.apache.hadoop.io.DoubleWritableMongoDB Hadoop连接器+Spark+Python

2024-10-01 17:26:07 发布

您现在位置:Python中文网/ 问答频道 /正文

在这个好的blog post之后,我找到了他的代码的github repo,我cloned它使它能轻松地为其他人工作。在

我创建了一个script来:

  1. 下载并生成spark。在
  2. 下载,应用补丁并构建mongodb hadoop connector。在
  3. 下载mongodb java driver v3.1.1 jar。在
  4. 把最少的罐子放在一起
  5. 下载sample file
  6. 将其导入mongodb集合
  7. 安装一个必需的python库pytz

它为一切准备就绪。在

基本上是这样的:

config = {"mongo.input.uri": "mongodb://localhost:27017/marketdata.minbars"}
inputFormatClassName = "com.mongodb.hadoop.MongoInputFormat"
keyClassName = "org.apache.hadoop.io.Text"
valueClassName = "org.apache.hadoop.io.MapWritable"

minBarRawRDD = sc.newAPIHadoopRDD(inputFormatClassName, keyClassName, valueClassName, None, None, config)
minBarRDD = minBarRawRDD.values()

import calendar, time, math
dateFormatString = '%Y-%m-%d %H:%M'     
groupedBars = minBarRDD.sortBy(lambda doc: str(doc["Timestamp"])).groupBy(lambda doc: 
    (doc["Symbol"], math.floor(calendar.timegm(time.strptime(doc["Timestamp"], dateFormatString)) / (5*60))))

def ohlc(grouping):
    # some

config["mongo.output.uri"] = "mongodb://localhost:27017/marketdata.fiveminutebars"
outputFormatClassName = "com.mongodb.hadoop.MongoOutputFormat"
# resultRDD.saveAsNewAPIHadoopFile("file:///placeholder", outputFormatClassName, None, None, None, None, config)

当我运行spark-ohlcbars-example.submit.sh脚本时,最后一行注释了这句话,一切顺利进行,没有任何错误。在

但是一旦我取消了onder中最后一行的注释,试图将数据保存回mongodb,就会引发异常

^{pr2}$

我想从Python>;Java>;MongoDB格式转换的过程在尝试序列化时会丢失,但无法工作。在

您可以在git repository中看到issue

我希望社区有办法解决这个问题。你们知道怎么克服这个问题吗?在


Tags: orgcomhadoopnoneconfiglocalhostdocmongo
1条回答
网友
1楼 · 发布于 2024-10-01 17:26:07

通过在提交哈希上使用pymongo_spark,affad1b7上使用pymongo_spark。在

我将文件复制到我的项目中,并在python主脚本上添加了3行代码:

import pymongo_spark
pymongo_spark.activate()
...
# at the end of the script
resultRDD.saveToMongoDB(config["mongo.output.uri"])

您可以在github commit上看到完整的差异

https://github.com/danielsan/mongodb-analytics-examples/commit/f287620874038b2a491b50f48505c106299293fb

相关问题 更多 >

    热门问题