在pyspark中对RDD元素计算Xquery

2024-06-20 15:10:22 发布

您现在位置:Python中文网/ 问答频道 /正文

我们正在尝试读取大量XML,并在pyspark中对它们运行Xquery,例如XML书籍。我们正在使用spark xml utils

  • 我们希望将包含xmls的目录提供给pyspark
  • 对所有这些对象运行Xquery以获得结果

参考答案:Calling scala code in pyspark for XSLT transformations

xquery处理器的定义,其中xquery是xquery的字符串:

proc = sc._jvm.com.elsevier.spark_xml_utils.xquery.XQueryProcessor.getInstance(xquery)

我们正在使用以下命令读取目录中的文件:

sc.wholeTextFiles("xmls/test_files")

这为我们提供了一个RDD,其中包含作为元组列表的所有文件:

[(Filename1,FileContentAsString),(Filename2,File2ContentAsString)]

如果我们在字符串(FileContentAsString)上运行,xquery将进行计算并给出结果

whole_files = sc.wholeTextFiles("xmls/test_files").collect()
proc.evaluate(whole_files[1][1])
# Prints proper xquery result for that file

问题:

如果我们尝试使用lambda函数在RDD上运行proc.evaluate(),它将失败

test_file = sc.wholeTextFiles("xmls/test_files")
test_file.map(lambda x: proc.evaluate(x[1])).collect()

# Should give us a list of xquery results 

错误:

PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

这些功能在某种程度上起作用,但与上述功能不同:

打印xquery应用于的内容

test_file.map(lambda x: x[1]).collect()

# Outputs the content. if x[0], gives us the list of filenames

返回内容中的字符长度

test_file.map(lambda x: len(x[1])).collect()
# Output: [15274, 13689, 13696]

参考书籍示例:

books_xquery = """for $x in /bookstore/book
where $x/price>30
return $x/title/data()"""

proc_books = sc._jvm.com.elsevier.spark_xml_utils.xquery.XQueryProcessor.getInstance(books_xquery)

books_xml = sc.wholeTextFiles("xmls/books.xml")
books_xml.map(lambda x: proc_books.evaluate(x[1])).collect()
# Error
# I can share the stacktrace if you guys want

Tags: lambdatestmapfilesxmlprocbookspyspark
1条回答
网友
1楼 · 发布于 2024-06-20 15:10:22

不幸的是,不可能在Python代码的映射调用中直接调用Java/Scala库This answer很好地解释了为什么没有简单的方法可以做到这一点。简而言之,原因是Py4J网关(将Python调用“转换”到JVM世界所必需的)仅存在于驱动程序节点上,而您试图执行的映射调用则在执行器节点上运行

解决这个问题的一种方法是将XQuery函数包装在Scala UDF(解释为here)中,但仍然需要编写几行Scala代码

编辑:如果您能够从XQuery切换到XPath,一个可能更简单的选择是更改(XPath)库ElementTree是用Python编写的XML库,也是XPath

代码

xmls = spark.sparkContext.wholeTextFiles("xmls/test_files")
import xml.etree.ElementTree as ET
xpathquery = "...your query..."
xmls.flatMap(lambda x: ET.fromstring(x[1]).findall(xpathquery)) \
    .map(lambda x: x.text) \
    .foreach(print)

将针对从目录xmls/test_files加载的所有文档打印运行xpathquery的所有结果

首先使用flatMap作为findall调用,返回每个文档中所有匹配的elements的列表。通过使用flatMap,此列表被展平(每个文件的结果可能包含多个元素)。在第二个map调用中,元素被映射到它们的text以获得可读的输出

相关问题 更多 >