Sdílet prostřednictvím


Čtení a zápis souborů XML

Důležité

Tato funkce je ve verzi Public Preview.

Tento článek popisuje, jak číst a zapisovat soubory XML.

Jazyk XML (Extensible Markup Language) je jazyk značek pro formátování, ukládání a sdílení dat v textovém formátu. Definuje set pravidel pro serializaci dat od dokumentů po libovolné datové struktury.

Nativní podpora formátu souboru XML umožňuje příjem dat, dotazování a analýzu dat XML pro dávkové zpracování nebo streamování. Může automaticky odvodit a vyvíjet schema a datové typy, podporuje výrazy SQL, jako je from_xml, a může generate dokumenty XML. Nevyžaduje externí soubory JAR a bezproblémově funguje s automatickým zavaděčem read_files a COPY INTO. Volitelně můžete ověřit každý záznam XML na úrovni řádků vůči definici XSD (XML Schema Definition).

Požadavky

Databricks Runtime 14.3 a novější

Analýza záznamů XML

Specifikace XML vyžaduje dobře formátovanou strukturu. Tato specifikace se ale okamžitě nemapuje na tabulkový formát. Je nutné zadat rowTag možnost označit XML element, který mapuje na DataFrameRow. Prvek rowTag se stane nejvyšší úrovní struct. Podřízené prvky rowTag se stanou poli nejvyšší úrovně struct.

Můžete zadat schema pro tento záznam nebo ho nechat automaticky odvodit. Vzhledem k tomu, že analyzátor zkoumá rowTag pouze prvky, odfiltrují se DTD a externí entity.

Následující příklady ilustrují schema odvozování a parsování souboru XML pomocí různých možností rowTag:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Přečtěte si soubor XML s rowTag možností "books":

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Výstup:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Soubor XML rowTag si můžete přečíst jako "knihu":

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Výstup:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Možnosti zdroje dat

Možnosti zdroje dat pro XML lze zadat následujícími způsoby:

Pro možnosti list si prohlédněte Možnosti automatického zavaděče.

Podpora XSD

Volitelně můžete ověřit každý záznam XML na úrovni řádků pomocí definice XSD (XML Schema Definition). V možnosti je zadaný rowValidationXSDPath soubor XSD. XSD jinak nemá vliv na poskytnuté nebo odvozené schema. Záznam, který selže, je označený jako poškozený a zpracován na základě možnosti režimu zpracování poškozených záznamů popsaných v části možnosti.

Pomocí XSDToSchema můžete extrahovat datový rámec Sparku schema ze souboru XSD. Podporuje pouze jednoduché, složité a sekvenční typy a podporuje pouze základní funkce XSD.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

Následující table ukazuje převod datových typů XSD na datové typy Spark:

Datové typy XSD Datové typy Sparku
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, , positiveIntegerunsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Parsování vnořeného XML

XML data v řetězcové hodnotě column v existujícím DataFrame může být zpracováno pomocí schema_of_xml a from_xml, které vracejí schema a zpracované výsledky ve formě nových structcolumns. Data XML předaná jako argument schema_of_xml a from_xml musí se jednat o jeden správně formátovaný záznam XML.

schema_of_xml

Syntaxe

schema_of_xml(xmlStr [, options] )

Argumenty

  • xmlStr: Výraz STRING určující jeden správně formátovaný záznam XML.
  • options: Volitelný MAP<STRING,STRING> literál určující direktivy.

Vrácení

ŘETĚZEC obsahující definici struktury s n poli řetězců where názvy column jsou odvozeny od elementu XML a názvů atributů. Pole values obsahovat odvozené formátované typy SQL.

from_xml

Syntaxe

from_xml(xmlStr, schema [, options])

Argumenty

  • xmlStr: Výraz STRING určující jeden správně formátovaný záznam XML.
  • schema: Výraz STRING nebo vyvolání schema_of_xml funkce.
  • options: Volitelný MAP<STRING,STRING> literál určující direktivy.

Vrácení

Struktura s názvy polí a typy odpovídající definici schema. Schema musí být definovány jako dvojice column oddělených čárkami a datového typu, které se používají například CREATE TABLE. Většina možností zobrazených v možnostech zdroje dat platí s následujícími výjimkami:

  • rowTag: Protože existuje pouze jeden záznam XML, rowTag možnost není použitelná.
  • mode (výchozí: PERMISSIVE): Umožňuje režim zpracování poškozených záznamů během analýzy.
    • PERMISSIVE: Když splňuje poškozený záznam, umístí poškozený řetězec do pole nakonfigurovaného columnNameOfCorruptRecordpomocí a nastaví poškozená pole na null. Abyste mohli uchovat poškozené záznamy, můžete do pole typu řetězec s názvem columnNameOfCorruptRecord v uživatelsky definovaném schemapřidat set. Pokud schema pole neobsahuje, během analýzy zahodí poškozené záznamy. Při odvození schemaimplicitně přidá pole columnNameOfCorruptRecord do výstupního schema.
    • FAILFAST: Vyvolá výjimku, když splňuje poškozené záznamy.

Převod struktury

Vzhledem k rozdílům ve struktuře mezi datovým rámcem DataFrame a XML existují určitá pravidla převodu z dat XML do DataFrame a z DataFrame dat XML. Všimněte si, že zpracování atributů lze zakázat pomocí možnosti excludeAttribute.

Převod z XML na datový rámec

Atributy: Atributy jsou převedeny jako pole s předponou nadpisu attributePrefix.

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

vytvoří schema níže:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Znaková data v elementu, který obsahuje atributy nebo podřízené elementy: Tyto prvky jsou analyzovány do valueTag pole. Pokud existuje více výskytů znakových dat, valueTag pole se převede na array typ.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

vytvoří hodnotu schema níže:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Převod z datového rámce na XML

Element jako pole v poli: Zápis souboru XML z DataFramehaving pole ArrayType s jeho elementem jako ArrayType by pro prvek měl další vnořené pole. K tomu nedojde při čtení a zápisu dat XML, ale při zápisu DataFrame čtení z jiných zdrojů. Proto zaokrouhlování při čtení a zápisu souborů XML má stejnou strukturu, ale zápis DataFrame čtení z jiných zdrojů je možné mít jinou strukturu.

DataFrame s schema níže:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

a s daty níže:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

vytvoří soubor XML níže:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

Název prvku nepojmenovaného pole v poli DataFrame je určen možností arrayElementName (Výchozí: item).

Zachráněná data column

Záchranná data column zajistí, že během ETL nikdy neztratíte nebo nezmeškáte data. Můžete povolit, aby zachráněná data column získala všechna data, která nebyla analyzována, protože jedno nebo více polí v záznamu má jeden z následujících problémů:

  • Chybí v poskytnutém schema
  • Neodpovídá datovému typu poskytnutého schema
  • Má neshodu s velikostí písmen v názvech polí v zadaném schema

Zachráněná data column se vrátí jako dokument JSON obsahující columns, které byly zachráněny, a cestu ke zdrojovému souboru záznamu. Chcete-li remove cestu ke zdrojovému souboru ze zachráněných dat column, můžete použít následující konfiguraci SQL set.

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

Můžete povolit záchranná data column tím, že při čtení dat nastavíte možnost rescuedDataColumn na název column, například _rescued_data s spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

Analyzátor XML podporuje při analýze záznamů tři režimy: PERMISSIVE, DROPMALFORMEDa FAILFAST. Při použití společně s datovým rescuedDataColumntypem neshody nezpůsobí vyřazení záznamů v DROPMALFORMED režimu nebo vyvolání chyby v FAILFAST režimu. Zahodí se pouze poškozené záznamy (neúplné nebo poškozené XML) nebo vyvolá chyby.

Schema inferenční proces a evoluce v Auto Loader

Podrobnou diskusi o tomto tématu a příslušných možnostech najdete v části Konfigurace schema odvozování a evoluce v Automatickém zavaděči. Automatický zavaděč můžete nakonfigurovat tak, aby automaticky detekoval schema načtených dat XML, což vám umožní inicializovat tables bez explicitního deklarování schema a rozvíjet tableschema, když jsou zaváděny nové columns. To eliminuje potřebu ručního sledování a aplikace změn schema v průběhu času.

Ve výchozím nastavení se automatické zavaděče schema snaží zabránit problémům s evolucí schema způsobeným neshodami typů. U formátů, které nekódují datové typy (JSON, CSV a XML), auto loader odvodí všechny columns jako řetězce, včetně vnořených polí v souborech XML. Apache Spark DataFrameReader používá jiné chování pro schema odvozování a výběr datových typů pro columns ve zdrojích XML na základě ukázkových dat. Chcete-li toto chování povolit s automatickým zavaděčem, zvolte možnost setcloudFiles.inferColumnTypestrue.

Auto Loader zjistí přidání nových columns při zpracování vašich dat. Když Auto Loader zjistí nový column, datový proud se zastaví s UnknownFieldException. Než datový proud vyvolá tuto chybu, Auto Loader provede odvozování schema u nejnovější mikrodávky dat a aktualizuje umístění schema s nejnovějšími schema pomocí sloučení nových columns na konec schema. Datové typy existujících columns zůstanou beze změny. Auto Loader podporuje různé režimy pro schema vývoj, které set v možnosti cloudFiles.schemaEvolutionMode.

Pomocí schema tipů můžete uplatnit schema informace, které znáte a očekáváte, na odvozené schema. Pokud víte, že column je konkrétní datový typ, nebo pokud chcete zvolit obecnější datový typ (například dvojité místo celého čísla), můžete zadat libovolný počet tipů pro column datových typů jako řetězec pomocí syntaxe specifikace SQL schema. Pokud jsou povolena obnovená data column, pole pojmenovaná v jiném tvaru než schema se načtou do _rescued_datacolumn. Toto chování můžete změnit tak, že nastavíte možnost readerCaseSensitivefalsena možnost , v takovém případě Auto Loader čte data bez rozlišování malých a malých písmen.

Příklady

Příklady v této části používají soubor XML dostupný ke stažení v úložišti Apache Spark GitHub.

Čtení a zápis XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Při čtení dat můžete ručně zadat schema:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Rozhraní API SQL

Zdroj dat XML může odvodit datové typy:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

V DDL můžete také zadat názvy a typy column. V tomto případě se schema neodvozuje automaticky.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Načtení XML pomocí COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Čtení XML s ověřením řádků

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Analýza vnořeného XML (from_xml a schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml a schema_of_xml s využitím rozhraní SQL API

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Načtení XML pomocí automatického zavaděče

Python

query = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", True)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

val query = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow()
  .toTable("table_name")
  )

Další materiály

Čtení a zápis dat XML pomocí knihovny spark-xml