Čtení a zápis souborů XML
Důležité
Tato funkce je ve verzi Public Preview.
Tento článek popisuje, jak číst a zapisovat soubory XML.
Jazyk XML (Extensible Markup Language) je jazyk značek pro formátování, ukládání a sdílení dat v textovém formátu. Definuje set pravidel pro serializaci dat od dokumentů po libovolné datové struktury.
Nativní podpora formátu souboru XML umožňuje příjem dat, dotazování a analýzu dat XML pro dávkové zpracování nebo streamování. Může automaticky odvodit a vyvíjet schema a datové typy, podporuje výrazy SQL, jako je from_xml
, a může generate dokumenty XML. Nevyžaduje externí soubory JAR a bezproblémově funguje s automatickým zavaděčem read_files
a COPY INTO
. Volitelně můžete ověřit každý záznam XML na úrovni řádků vůči definici XSD (XML Schema Definition).
Požadavky
Databricks Runtime 14.3 a novější
Analýza záznamů XML
Specifikace XML vyžaduje dobře formátovanou strukturu. Tato specifikace se ale okamžitě nemapuje na tabulkový formát. Je nutné zadat rowTag
možnost označit XML element, který mapuje na DataFrame
Row
. Prvek rowTag
se stane nejvyšší úrovní struct
. Podřízené prvky rowTag
se stanou poli nejvyšší úrovně struct
.
Můžete zadat schema pro tento záznam nebo ho nechat automaticky odvodit. Vzhledem k tomu, že analyzátor zkoumá rowTag
pouze prvky, odfiltrují se DTD a externí entity.
Následující příklady ilustrují schema odvozování a parsování souboru XML pomocí různých možností rowTag
:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
Přečtěte si soubor XML s rowTag
možností "books":
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Výstup:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
Soubor XML rowTag
si můžete přečíst jako "knihu":
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Výstup:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Možnosti zdroje dat
Možnosti zdroje dat pro XML lze zadat následujícími způsoby:
- Metody
.option/.options
následujících:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- Následující předdefinované funkce:
- Klauzule
OPTIONS
CREATE TABLE POUŽITÍM DATA_SOURCE
Pro možnosti list si prohlédněte Možnosti automatického zavaděče.
Podpora XSD
Volitelně můžete ověřit každý záznam XML na úrovni řádků pomocí definice XSD (XML Schema Definition). V možnosti je zadaný rowValidationXSDPath
soubor XSD. XSD jinak nemá vliv na poskytnuté nebo odvozené schema. Záznam, který selže, je označený jako poškozený a zpracován na základě možnosti režimu zpracování poškozených záznamů popsaných v části možnosti.
Pomocí XSDToSchema
můžete extrahovat datový rámec Sparku schema ze souboru XSD. Podporuje pouze jednoduché, složité a sekvenční typy a podporuje pouze základní funkce XSD.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
Následující table ukazuje převod datových typů XSD na datové typy Spark:
Datové typy XSD | Datové typy Sparku |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , negativeInteger , nonNegativeInteger , nonPositiveInteger , , positiveInteger unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Parsování vnořeného XML
XML data v řetězcové hodnotě column v existujícím DataFrame může být zpracováno pomocí schema_of_xml
a from_xml
, které vracejí schema a zpracované výsledky ve formě nových struct
columns. Data XML předaná jako argument schema_of_xml
a from_xml
musí se jednat o jeden správně formátovaný záznam XML.
schema_of_xml
Syntaxe
schema_of_xml(xmlStr [, options] )
Argumenty
-
xmlStr
: Výraz STRING určující jeden správně formátovaný záznam XML. -
options
: VolitelnýMAP<STRING,STRING>
literál určující direktivy.
Vrácení
ŘETĚZEC obsahující definici struktury s n poli řetězců where názvy column jsou odvozeny od elementu XML a názvů atributů. Pole values obsahovat odvozené formátované typy SQL.
from_xml
Syntaxe
from_xml(xmlStr, schema [, options])
Argumenty
-
xmlStr
: Výraz STRING určující jeden správně formátovaný záznam XML. -
schema
: Výraz STRING nebo vyvoláníschema_of_xml
funkce. -
options
: VolitelnýMAP<STRING,STRING>
literál určující direktivy.
Vrácení
Struktura s názvy polí a typy odpovídající definici schema.
Schema musí být definovány jako dvojice column oddělených čárkami a datového typu, které se používají například CREATE TABLE
. Většina možností zobrazených v možnostech zdroje dat platí s následujícími výjimkami:
-
rowTag
: Protože existuje pouze jeden záznam XML,rowTag
možnost není použitelná. -
mode
(výchozí:PERMISSIVE
): Umožňuje režim zpracování poškozených záznamů během analýzy.-
PERMISSIVE
: Když splňuje poškozený záznam, umístí poškozený řetězec do pole nakonfigurovanéhocolumnNameOfCorruptRecord
pomocí a nastaví poškozená pole nanull
. Abyste mohli uchovat poškozené záznamy, můžete do pole typu řetězec s názvemcolumnNameOfCorruptRecord
v uživatelsky definovaném schemapřidat set. Pokud schema pole neobsahuje, během analýzy zahodí poškozené záznamy. Při odvození schemaimplicitně přidá polecolumnNameOfCorruptRecord
do výstupního schema. -
FAILFAST
: Vyvolá výjimku, když splňuje poškozené záznamy.
-
Převod struktury
Vzhledem k rozdílům ve struktuře mezi datovým rámcem DataFrame a XML existují určitá pravidla převodu z dat XML do DataFrame
a z DataFrame
dat XML. Všimněte si, že zpracování atributů lze zakázat pomocí možnosti excludeAttribute
.
Převod z XML na datový rámec
Atributy: Atributy jsou převedeny jako pole s předponou nadpisu attributePrefix
.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
vytvoří schema níže:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Znaková data v elementu, který obsahuje atributy nebo podřízené elementy: Tyto prvky jsou analyzovány do valueTag
pole. Pokud existuje více výskytů znakových dat, valueTag
pole se převede na array
typ.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
vytvoří hodnotu schema níže:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Převod z datového rámce na XML
Element jako pole v poli: Zápis souboru XML z DataFrame
having pole ArrayType
s jeho elementem jako ArrayType
by pro prvek měl další vnořené pole. K tomu nedojde při čtení a zápisu dat XML, ale při zápisu DataFrame
čtení z jiných zdrojů. Proto zaokrouhlování při čtení a zápisu souborů XML má stejnou strukturu, ale zápis DataFrame
čtení z jiných zdrojů je možné mít jinou strukturu.
DataFrame s schema níže:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
a s daty níže:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
vytvoří soubor XML níže:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
Název prvku nepojmenovaného pole v poli DataFrame
je určen možností arrayElementName
(Výchozí: item
).
Zachráněná data column
Záchranná data column zajistí, že během ETL nikdy neztratíte nebo nezmeškáte data. Můžete povolit, aby zachráněná data column získala všechna data, která nebyla analyzována, protože jedno nebo více polí v záznamu má jeden z následujících problémů:
- Chybí v poskytnutém schema
- Neodpovídá datovému typu poskytnutého schema
- Má neshodu s velikostí písmen v názvech polí v zadaném schema
Zachráněná data column se vrátí jako dokument JSON obsahující columns, které byly zachráněny, a cestu ke zdrojovému souboru záznamu. Chcete-li remove cestu ke zdrojovému souboru ze zachráněných dat column, můžete použít následující konfiguraci SQL set.
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
Můžete povolit záchranná data column tím, že při čtení dat nastavíte možnost rescuedDataColumn
na název column, například _rescued_data
s spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
.
Analyzátor XML podporuje při analýze záznamů tři režimy: PERMISSIVE
, DROPMALFORMED
a FAILFAST
. Při použití společně s datovým rescuedDataColumn
typem neshody nezpůsobí vyřazení záznamů v DROPMALFORMED
režimu nebo vyvolání chyby v FAILFAST
režimu. Zahodí se pouze poškozené záznamy (neúplné nebo poškozené XML) nebo vyvolá chyby.
Schema inferenční proces a evoluce v Auto Loader
Podrobnou diskusi o tomto tématu a příslušných možnostech najdete v části Konfigurace schema odvozování a evoluce v Automatickém zavaděči. Automatický zavaděč můžete nakonfigurovat tak, aby automaticky detekoval schema načtených dat XML, což vám umožní inicializovat tables bez explicitního deklarování schema a rozvíjet tableschema, když jsou zaváděny nové columns. To eliminuje potřebu ručního sledování a aplikace změn schema v průběhu času.
Ve výchozím nastavení se automatické zavaděče schema snaží zabránit problémům s evolucí schema způsobeným neshodami typů. U formátů, které nekódují datové typy (JSON, CSV a XML), auto loader odvodí všechny columns jako řetězce, včetně vnořených polí v souborech XML. Apache Spark DataFrameReader
používá jiné chování pro schema odvozování a výběr datových typů pro columns ve zdrojích XML na základě ukázkových dat. Chcete-li toto chování povolit s automatickým zavaděčem, zvolte možnost setcloudFiles.inferColumnTypes
true
.
Auto Loader zjistí přidání nových columns při zpracování vašich dat. Když Auto Loader zjistí nový column, datový proud se zastaví s UnknownFieldException
. Než datový proud vyvolá tuto chybu, Auto Loader provede odvozování schema u nejnovější mikrodávky dat a aktualizuje umístění schema s nejnovějšími schema pomocí sloučení nových columns na konec schema. Datové typy existujících columns zůstanou beze změny. Auto Loader podporuje různé režimy pro schema vývoj, které set v možnosti cloudFiles.schemaEvolutionMode
.
Pomocí schema tipů můžete uplatnit schema informace, které znáte a očekáváte, na odvozené schema. Pokud víte, že column je konkrétní datový typ, nebo pokud chcete zvolit obecnější datový typ (například dvojité místo celého čísla), můžete zadat libovolný počet tipů pro column datových typů jako řetězec pomocí syntaxe specifikace SQL schema. Pokud jsou povolena obnovená data column, pole pojmenovaná v jiném tvaru než schema se načtou do _rescued_data
column. Toto chování můžete změnit tak, že nastavíte možnost readerCaseSensitive
false
na možnost , v takovém případě Auto Loader čte data bez rozlišování malých a malých písmen.
Příklady
Příklady v této části používají soubor XML dostupný ke stažení v úložišti Apache Spark GitHub.
Čtení a zápis XML
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Při čtení dat můžete ručně zadat schema:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Rozhraní API SQL
Zdroj dat XML může odvodit datové typy:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
V DDL můžete také zadat názvy a typy column. V tomto případě se schema neodvozuje automaticky.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
Načtení XML pomocí COPY INTO
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
Čtení XML s ověřením řádků
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
Analýza vnořeného XML (from_xml a schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
from_xml a schema_of_xml s využitím rozhraní SQL API
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Načtení XML pomocí automatického zavaděče
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)