如何使用Spark从XML复制到SQL

2024-09-29 06:31:39 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要打开并将存储在azuredatalake存储中的多个XML文件的内容复制到azuresqldb中。这是XML文件结构:

<?xml version="1.0" encoding="utf-8"?>
<FileSummary xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:noNamespaceSchemaLocation="invoices.xsd">
      <Header>
      <SequenceNumber>1</SequenceNumber>
      <Description>Hello</Description>
      <ShipDate>20180101</ShipDate>
     </Header>
     <FileInvoices>
      <InvoiceNumber>000000A</InvoiceNumber>
      <InvoiceHeader>
       <InvoiceHeaderDate>201800201</InvoiceHeaderDate>
       <InvoiceHeaderDescription>XYZ</InvoiceHeaderDescription>
      </InvoiceHeader>
      <InvoiceItems>
       <ItemId>000001</ItemId>
       <ItemQuantity>000010</ItemQuantity>
       <ItemPrice>000100</ItemPrice>
      </InvoiceItems>
     </FileInvoices>
     <FileInvoices>
      <InvoiceNumber>000000B</InvoiceNumber>
      <InvoiceHeader>
       <InvoiceHeaderDate>201800301</InvoiceHeaderDate>
       <InvoiceHeaderDescription>ABC</InvoiceHeaderDescription>
      </InvoiceHeader>
      <InvoiceItems>
       <ItemId>000002</ItemId>
       <ItemQuantity>000020</ItemQuantity>
       <ItemPrice>000200</ItemPrice>
      </InvoiceItems>
     </FileInvoices>
</FileSummary>

所以我使用azuredatabricks将Datalake存储挂载为“/mnt/testdata”,然后我尝试用以下命令打开上面的示例文件

^{pr2}$

返回以下结果:

dfXml:pyspark.sql.dataframe.DataFrame
FileInvoices:array
element:struct
InvoiceHeader:struct
InvoiceHeaderDate:long
InvoiceHeaderDescription:string
InvoiceItems:struct
ItemId:long
ItemPrice:long
ItemQuantity:long
InvoiceNumber:string
Header:struct
Description:string
SequenceNumber:long
ShipDate:long
xmlns:xsi:string
xsi:noNamespaceSchemaLocation:string
Number of records in this dataframe: 1
root
 |-- FileInvoices: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- InvoiceHeader: struct (nullable = true)
 |    |    |    |-- InvoiceHeaderDate: long (nullable = true)
 |    |    |    |-- InvoiceHeaderDescription: string (nullable = true)
 |    |    |-- InvoiceItems: struct (nullable = true)
 |    |    |    |-- ItemId: long (nullable = true)
 |    |    |    |-- ItemPrice: long (nullable = true)
 |    |    |    |-- ItemQuantity: long (nullable = true)
 |    |    |-- InvoiceNumber: string (nullable = true)
 |-- Header: struct (nullable = true)
 |    |-- Description: string (nullable = true)
 |    |-- SequenceNumber: long (nullable = true)
 |    |-- ShipDate: long (nullable = true)
 |-- xmlns:xsi: string (nullable = true)
 |-- xsi:noNamespaceSchemaLocation: string (nullable = true)

因此,上面的命令似乎确实正确地读取了文件,当然,我能够连接到规范化良好的Azure SQL DB,并将记录写入特定的表中:

dfXml.write.jdbc(url=jdbcUrl, table="dest_table", mode="overwrite", properties=connectionProperties)

但是,这种方法需要设置一些嵌套循环和大量手动任务来跟踪每个表的键并尊重引用完整性,而这些引用完整性不利用Spark体系结构,所以我现在想知道是否有最佳实践(或预构建库)以更自动化和可伸缩的方式完成此任务。在

我希望这是一个常见的需求,所以理想情况下我会使用一个库,它读取开头显示的完整XML结构,并自动提取信息以插入到规范化表中。在

非常感谢你的任何建议。在

毛罗


Tags: truestringstructlongitemidnullablexsiinvoicenumber
3条回答

我使用SparkShell来执行下面的操作,我相信xml结构是重复的。 您需要创建/引用一个与xml文件相关的模式。 你可以利用砖厂的udf罐。 那么

1.创建如下函数

sql(""" create temporary function numeric_range as brickhouse.udf.collect.NumericRange""")

2.使用模式

var df=sqlContext.read.format("com.databricks.spark.xml").option("rowTag","FileSummary").load("location of schema file")

val schema=df.schema

3.var df1=sqlContext.read.format("com.databricks.spark.xml").option("rowTag","FileSummary").schema(schema).load("location of actual xml file")

^{pr2}$

4.您需要将文件发票展开,如下所示

val df2=sql("select array_index(FileInvoices,n) as FileInvoices from XML_Data lateral view numeric_range(size(FileInvoices))n1 as n""").registerTempTable("xmlData2")

一旦every被转换为Struct,就更容易遍历或使用FileInvoices.InvoiceHeader.InvoiceHeaderDate进行分解

val jdbcUsername = "<username>"
val jdbcPassword = "<password>"
val jdbcHostname = "<hostname>" //typically, this is in the form or servername.database.windows.net
val jdbcPort = 1433
val jdbcDatabase ="<database>"

val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"

val connectionProperties = new Properties()
connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")

spark.table("").write.jdbc(jdbc_url, "xmlData2", connectionProperties)

取决于您要做什么以及表结构的外观。我假设您正在尝试使用spark处理许多文件。并希望将数据加载到不同的规范化表中

例如,您可能希望将标题写入一个表中,header->fileInvoices是一对多关系,因此可以是另一个表。在

  • 当您使用load(filename*.xml)读取多个xml文件时 希望将文件摘要设置为rowtag。然后你会有多个 数据帧中的行,每个文件摘要一行。

  • 您可以选择另一个数据帧中的标题列并将其写入 一张桌子。

  • FileInvoices是struc的数组,可以将它们分解成行 把它们放在另一张桌子上。

  • 此外,如果每个发票可以包含多个项目,则可以另做一个 分解以使其成为行并存储到另一个表中

或者您可以进行两次分解并将结果数据帧加载到一个大的非规范化表中。在

这里有一篇关于爆炸如何工作的文章 https://hadoopist.wordpress.com/2016/05/16/how-to-handle-nested-dataarray-of-structures-or-multiple-explodes-in-sparkscala-and-pyspark/

谢谢你,苏巴什,阿南德。 关于Subash的答案,我没有模式文件,所以我修改了他的步骤2,将“实际xml文件的位置”替换为“实际xml文件的位置”,它实际上起作用了:在步骤3之后,如果我只是运行

df2=sql("select * from XML_Data")

然后我就跑了

^{pr2}$

因此,它跨多行复制头的同一个结构,在FileInvoices列中,我有一个单独的invoices结构: exploded FileInvoices

所以看起来我离我的最终目标越来越近了,但是我仍然没有按照正确的顺序自动创建记录,以避免破坏引用完整性。在

但在此之前,我很感激你的反馈。在

再次感谢

毛罗

相关问题 更多 >