Databricks - 将嵌套的 xml 文件导入到多个表

问题描述 投票:0回答:1

我是 databricks 新手,想要将嵌套 xml 导入到多个表中。 我有一个 xml 和一个 xsd,您可以在下面找到。 到目前为止我已经完成了以下步骤:

Python 代码

导入 XML 到 df:

# Files
xml_file: str = "/FileStore/tables/demo.xml"
xsd_file: str = "/FileStore/tables/demo.xsd"

# Read XML file with XSD validation
df = spark.read \
    .format("xml") \
    .option("rowTag", "transactions") \
    .option("attributePrefix", "") \
    .option("rowValidationXSDPath", xsd_file) \
    .load(xml_file)

# Display the DataFrame
df.display()

df 代表

transactions
:

from pyspark.sql.functions import monotonically_increasing_id, col

transactions_df = df.select(
    monotonically_increasing_id().alias("transaction_id"),
    "day",
    "art",
    "version",
    col("site.site_id").alias("site_id"),
    col("footer.tran_count").alias("tran_count")
)

transactions_df.display()

这可行,但我想要以下三个带有人工主键和外键的目标表,以便稍后可以加入它们。

所以我也需要

trans
data_detail
的 DataFrame。

表1:交易量

transaction_id
:人工主键

交易ID 艺术 版本 站点_id tran_count
1 1000 1 20 3

表2:反式

transaction_id
transactions.transaction_id

的外键

trans_id
:人工初级

交易ID trans_id 数字 lib nr BT ct
1 1 1234567 p1 l1 12345-1234567 123 1 120
1 2 2345678 p2 l2 12345-2345678 456 1 240
1 3 3456789 p3 l3 12345-3456789 789 1 360

表3:数据详细信息

trans_id
trans.trans_id

的外键

detail_id
:人工主键

trans_id 详细_id 数据详细信息ID 类型 班级
1 1 0 5 10
1 2 1 6 11
2 3 0 7 20
2 4 1 8 21
3 5 0 9 30
3 6 1 10 31

xsd 文件(demo.xsd)

<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" elementFormDefault="qualified">
  <xs:element name="transactions">
    <xs:complexType>
      <xs:sequence>
        <xs:element ref="site"/>
        <xs:element maxOccurs="unbounded" ref="trans"/>
        <xs:element ref="footer"/>
      </xs:sequence>
      <xs:attribute name="day" use="required" type="xs:integer"/>
      <xs:attribute name="art" use="required" type="xs:string"/>
      <xs:attribute name="version" use="required" type="xs:integer"/>
    </xs:complexType>
  </xs:element>
  <xs:element name="site">
    <xs:complexType>
      <xs:attribute name="site_id" use="required" type="xs:integer"/>
    </xs:complexType>
  </xs:element>
  <xs:element name="trans">
    <xs:complexType>
      <xs:all>
        <xs:element ref="number"/>
        <xs:element ref="meta"/>
        <xs:element ref="details"/>
      </xs:all>
    </xs:complexType>
  </xs:element>
  <xs:element name="footer">
    <xs:complexType>
      <xs:attribute name="tran_count" use="required" type="xs:integer"/>
    </xs:complexType>
  </xs:element>
  <xs:element name="number" type="xs:integer"/>
  <xs:element name="meta">
    <xs:complexType>
      <xs:sequence>
        <xs:element ref="pro"/>
        <xs:element ref="lib"/>
        <xs:element ref="nr"/>
      </xs:sequence>
    </xs:complexType>
  </xs:element>
  <xs:element name="pro" type="xs:string"/>
  <xs:element name="lib" type="xs:string"/>
  <xs:element name="nr" type="xs:string"/>
  <xs:element name="details">
    <xs:complexType>
      <xs:sequence>
        <xs:element ref="header"/>
        <xs:element ref="data"/>
      </xs:sequence>
    </xs:complexType>
  </xs:element>
  <xs:element name="header">
    <xs:complexType>
      <xs:sequence>
        <xs:element ref="at"/>
        <xs:element ref="bt"/>
        <xs:element ref="ct"/>
      </xs:sequence>
    </xs:complexType>
  </xs:element>
  <xs:element name="at" type="xs:integer"/>
  <xs:element name="bt" type="xs:integer"/>
  <xs:element name="ct" type="xs:integer"/>
  <xs:element name="data">
    <xs:complexType>
      <xs:sequence>
        <xs:element maxOccurs="unbounded" ref="data_detail"/>
      </xs:sequence>
    </xs:complexType>
  </xs:element>
  <xs:element name="data_detail">
    <xs:complexType>
      <xs:sequence>
        <xs:element ref="typ"/>
        <xs:element ref="class"/>
      </xs:sequence>
      <xs:attribute name="detail_id" use="required" type="xs:integer"/> 
    </xs:complexType>
  </xs:element>
  <xs:element name="typ" type="xs:integer"/>
  <xs:element name="class" type="xs:integer"/>
</xs:schema>

xml 文件 (demo.xml)

<?xml version="1.0" encoding="UTF-8"?>
<transactions day="1000" art="lo" version="1">
<site site_id="20"/>
<trans>
  <number>1234567</number>
  <meta>
    <pro>p1</pro>
    <lib>l1</lib>
    <nr>12345-1234567</nr>
  </meta>
  <details>
    <header>
      <at>123</at>
      <bt>1</bt>
      <ct>120</ct>
    </header>
    <data>
        <data_detail detail_id="0">
          <typ>5</typ>
          <class>10</class>
        </data_detail>
        <data_detail detail_id="1">
          <typ>6</typ>
          <class>11</class>
        </data_detail>
    </data>
  </details>
</trans>
<trans>
  <number>2345678</number>
  <meta>
    <pro>p2</pro>
    <lib>l2</lib>
    <nr>12345-2345678</nr>
  </meta>
  <details>
    <header>
      <at>456</at>
      <bt>1</bt>
      <ct>240</ct>
    </header>
    <data>
        <data_detail detail_id="0">
          <typ>7</typ>
          <class>20</class>
        </data_detail>
        <data_detail detail_id="1">
          <typ>8</typ>
          <class>21</class>
        </data_detail>
    </data>
  </details>
</trans>
<trans>
  <number>3456789</number>
  <meta>
    <pro>p3</pro>
    <lib>l3</lib>
    <nr>12345-3456789</nr>
  </meta>
  <details>
    <header>
      <at>789</at>
      <bt>1</bt>
      <ct>360</ct>
    </header>
    <data>
        <data_detail detail_id="0">
          <typ>9</typ>
          <class>30</class>
        </data_detail>
        <data_detail detail_id="1">
          <typ>10</typ>
          <class>31</class>
        </data_detail>
    </data>
  </details>
</trans>
<footer tran_count="3"/>
</transactions>
xml pyspark import xsd databricks
1个回答
0
投票

虽然我不知道

pyspark
,但我确实知道如何处理用于数据帧迁移的嵌套 XML(即使用
pandas
)。为了满足您细致入微的需求,请考虑XSLT,这是一种旨在转换 XML 文件的专用语言。 Python 的
lxml
可以运行 XSLT 1.0 转换。

具体来说,对于您的用例,让 XSLT 转换原始 XML 以为数据帧迁移准备三个元素:

transactions
trans
detail
。样式表甚至可以通过计算兄弟姐妹来处理单调递增的 ID!此解决方案不需要 XSD,特别是考虑到我们转换原始 XML。但在 XSLT 处理之前,一定要使用 XSD 来验证原始 XML。

XSLT (另存为.xsl,一种特殊的.xml文件)

<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
 <xsl:output encoding="UTF-8" indent="yes"/>
 <xsl:strip-space elements="*"/>

 <xsl:template match="/">
    <data>
        <xsl:apply-templates select="descendant::transactions"/>
        <trans>
            <xsl:apply-templates select="descendant::trans"/>
        </trans>
        <data_detail>
            <xsl:apply-templates select="descendant::data_detail"/>
        </data_detail>
    </data>
 </xsl:template>

 <xsl:template match="transactions">
    <xsl:copy>
        <transaction_id><xsl:value-of select="count(preceding-sibling::transaction) + 1"/></transaction_id>
        <xsl:apply-templates select="@*"/>
        <site_id><xsl:value-of select="site/@site_id"/></site_id>
        <transaction_count><xsl:value-of select="count(descendant::trans)"/></transaction_count>
    </xsl:copy>
 </xsl:template>

 <xsl:template match="transactions/@*|data_detail/@*">
    <xsl:element name="{name()}">
        <xsl:value-of select="."/>
    </xsl:element>
 </xsl:template>

 <xsl:template match="trans">
    <xsl:copy>
        <transaction_id><xsl:value-of select="count(ancestor::transaction/preceding-sibling::transaction) + 1"/></transaction_id>
        <trans_id><xsl:value-of select="count(preceding-sibling::trans) + 1"/></trans_id>
        <xsl:copy-of select="number|meta/*|header/*"/>
    </xsl:copy>
 </xsl:template>

 <xsl:template match="data_detail">
    <xsl:copy>
        <trans_id><xsl:value-of select="count(ancestor::trans/preceding-sibling::trans) + 1"/></trans_id>
        <detail_id><xsl:value-of select="count(preceding-sibling::data_detail) + 1"/></detail_id>
        <xsl:apply-templates select="@*"/>
        <xsl:copy-of select="*"/>
    </xsl:copy>
 </xsl:template>

</xsl:stylesheet>

转换后的 XML (注意根级别所需的元素)

<?xml version="1.0" encoding="utf-8"?>
<data>
  <transactions>
    <transaction_id>1</transaction_id>
    <day>1000</day>
    <art>lo</art>
    <version>1</version>
    <site_id>20</site_id>
    <transaction_count>3</transaction_count>
  </transactions>
  <trans>
    <trans>
      <transaction_id>1</transaction_id>
      <trans_id>1</trans_id>
      <number>1234567</number>
      <pro>p1</pro>
      <lib>l1</lib>
      <nr>12345-1234567</nr>
    </trans>
    <trans>
      <transaction_id>1</transaction_id>
      <trans_id>2</trans_id>
      <number>2345678</number>
      <pro>p2</pro>
      <lib>l2</lib>
      <nr>12345-2345678</nr>
    </trans>
    <trans>
      <transaction_id>1</transaction_id>
      <trans_id>3</trans_id>
      <number>3456789</number>
      <pro>p3</pro>
      <lib>l3</lib>
      <nr>12345-3456789</nr>
    </trans>
  </trans>
  <detail>
    <data_detail>
      <trans_id>1</trans_id>
      <detail_id>1</detail_id>
      <detail_id>0</detail_id>
      <typ>5</typ>
      <class>10</class>
    </data_detail>
    <data_detail>
      <trans_id>1</trans_id>
      <detail_id>2</detail_id>
      <detail_id>1</detail_id>
      <typ>6</typ>
      <class>11</class>
    </data_detail>
    <data_detail>
      <trans_id>2</trans_id>
      <detail_id>1</detail_id>
      <detail_id>0</detail_id>
      <typ>7</typ>
      <class>20</class>
    </data_detail>
    <data_detail>
      <trans_id>2</trans_id>
      <detail_id>2</detail_id>
      <detail_id>1</detail_id>
      <typ>8</typ>
      <class>21</class>
    </data_detail>
    <data_detail>
      <trans_id>3</trans_id>
      <detail_id>1</detail_id>
      <detail_id>0</detail_id>
      <typ>9</typ>
      <class>30</class>
    </data_detail>
    <data_detail>
      <trans_id>3</trans_id>
      <detail_id>2</detail_id>
      <detail_id>1</detail_id>
      <typ>10</typ>
      <class>31</class>
    </data_detail>
  </detail>
</data>

Python

import lxml.etree as lx
from pyspark.sql.functions import monotonically_increasing_id, col

# FILES
xml_file: str = "/path/to/input.xml"
xsl_file: str = "/path/to/style.xsl"
out_file: str = "/path/to/output.xml"

# TRANSFORM ORIGINAL XML
xml, xsl = lx.parse(xml_file), lx.parse(xsl_file)
transform = lx.XSLT(xsl)
result = transform(xml)
result.write_output(out_file)

# READ IN TRANSFORMED, FLAT XML INTO THREE PYSPARK DATA FRAMES
spark = ...

transformations_df = (
    spark.read
        .format("com.databricks.spark.xml")
        .option("rootTag", "data")
        .option("rowTag", "transformations")
        .load(out_file)
)

trans_df = (
    spark.read
        .format("com.databricks.spark.xml")
        .option("rootTag", "data")
        .option("rowTag", "trans")
        .load(out_file)
)

data_detail_df = (
    spark.read
        .format("com.databricks.spark.xml")
        .option("rootTag", "data")
        .option("rowTag", "data_detail")
        .load(out_file)
)

# ... DATA BRICKS MIGRATION

或者通过字典理解 DRY-er 代码:

dfs = {
    el: (
        spark.read
            .format("com.databricks.spark.xml")
            .option("rootTag", "data")
            .option("rowTag", el)
            .load(out_file)
        )
    for el in ["transformations", "trans", "data_detail"]
}

# ... DATA BRICKS MIGRATION

© www.soinside.com 2019 - 2024. All rights reserved.