我们正在尝试读取大量XML,并在pyspark中对它们运行Xquery,例如XML书籍。我们正在使用spark xml utils库
参考答案:Calling scala code in pyspark for XSLT transformations
xquery处理器的定义,其中xquery是xquery的字符串:
proc = sc._jvm.com.elsevier.spark_xml_utils.xquery.XQueryProcessor.getInstance(xquery)
我们正在使用以下命令读取目录中的文件:
sc.wholeTextFiles("xmls/test_files")
这为我们提供了一个RDD,其中包含作为元组列表的所有文件:
[(Filename1,FileContentAsString),(Filename2,File2ContentAsString)]
如果我们在字符串(FileContentAsString)上运行,xquery将进行计算并给出结果
whole_files = sc.wholeTextFiles("xmls/test_files").collect()
proc.evaluate(whole_files[1][1])
# Prints proper xquery result for that file
问题:
如果我们尝试使用lambda函数在RDD上运行proc.evaluate(),它将失败
test_file = sc.wholeTextFiles("xmls/test_files")
test_file.map(lambda x: proc.evaluate(x[1])).collect()
# Should give us a list of xquery results
错误:
PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
这些功能在某种程度上起作用,但与上述功能不同:
打印xquery应用于的内容
test_file.map(lambda x: x[1]).collect()
# Outputs the content. if x[0], gives us the list of filenames
返回内容中的字符长度
test_file.map(lambda x: len(x[1])).collect()
# Output: [15274, 13689, 13696]
参考书籍示例:
books_xquery = """for $x in /bookstore/book
where $x/price>30
return $x/title/data()"""
proc_books = sc._jvm.com.elsevier.spark_xml_utils.xquery.XQueryProcessor.getInstance(books_xquery)
books_xml = sc.wholeTextFiles("xmls/books.xml")
books_xml.map(lambda x: proc_books.evaluate(x[1])).collect()
# Error
# I can share the stacktrace if you guys want
不幸的是,不可能在Python代码的映射调用中直接调用Java/Scala库This answer很好地解释了为什么没有简单的方法可以做到这一点。简而言之,原因是Py4J网关(将Python调用“转换”到JVM世界所必需的)仅存在于驱动程序节点上,而您试图执行的映射调用则在执行器节点上运行
解决这个问题的一种方法是将XQuery函数包装在Scala UDF(解释为here)中,但仍然需要编写几行Scala代码
编辑:如果您能够从XQuery切换到XPath,一个可能更简单的选择是更改(XPath)库ElementTree是用Python编写的XML库,也是XPath
代码
将针对从目录
xmls/test_files
加载的所有文档打印运行xpathquery
的所有结果首先使用flatMap作为findall调用,返回每个文档中所有匹配的elements的列表。通过使用
flatMap
,此列表被展平(每个文件的结果可能包含多个元素)。在第二个map
调用中,元素被映射到它们的text以获得可读的输出相关问题 更多 >
编程相关推荐