Membaca dan menulis file XML
Penting
Fitur ini ada di Pratinjau Publik.
Artikel ini menjelaskan cara membaca dan menulis file XML.
Extensible Markup Language (XML) adalah bahasa markup untuk memformat, menyimpan, dan berbagi data dalam format tekstual. Ini mendefinisikan sekumpulan aturan untuk menserialisasikan data mulai dari dokumen hingga struktur data sewenang-wenang.
Dukungan format file XML asli memungkinkan penyerapan, kueri, dan penguraian data XML untuk pemrosesan atau streaming batch. Ini dapat secara otomatis menyimpulkan dan mengembangkan skema dan jenis data, mendukung ekspresi SQL seperti from_xml
, dan dapat menghasilkan dokumen XML. Ini tidak memerlukan jar eksternal dan bekerja dengan mulus dengan Auto Loader, read_files
dan COPY INTO
. Anda dapat secara opsional memvalidasi setiap catatan XML tingkat baris terhadap Definisi Skema XML (XSD).
Persyaratan
Databricks Runtime 14.3 ke atas
Mengurai rekaman XML
Spesifikasi XML mengamanatkan struktur yang terbentuk dengan baik. Namun, spesifikasi ini tidak segera dipetakan ke format tabular. Anda harus menentukan rowTag
opsi untuk menunjukkan elemen XML yang memetakan ke DataFrame
Row
. Elemen rowTag
menjadi tingkat struct
atas . Elemen turunan dari rowTag
menjadi bidang tingkat struct
atas .
Anda dapat menentukan skema untuk rekaman ini atau membiarkannya disimpulkan secara otomatis. Karena pengurai hanya memeriksa rowTag
elemen, DTD dan entitas eksternal difilter.
Contoh berikut mengilustrasikan inferensi skema dan penguraian file XML menggunakan opsi rowTag
yang berbeda:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
Baca file XML dengan rowTag
opsi sebagai "buku":
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Output:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
Baca file XML dengan rowTag
sebagai "buku":
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Output:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Opsi sumber data
Opsi sumber data untuk XML dapat ditentukan dengan cara berikut:
- Metode
.option/.options
berikut:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- Fungsi bawaan berikut:
- Klausul
OPTIONS
CREATE TABLE MENGGUNAKAN SUMBER_DATA
Untuk daftar opsi, lihat opsi Auto Loader.
Dukungan XSD
Anda dapat memvalidasi setiap rekaman XML tingkat baris secara opsional dengan Definisi Skema XML (XSD). File XSD ditentukan dalam rowValidationXSDPath
opsi . XSD tidak memengaruhi skema baik yang disediakan maupun yang disimpulkan. Rekaman yang gagal validasi ditandai sebagai "rusak" dan ditangani berdasarkan opsi mode penanganan rekaman yang rusak yang dijelaskan di bagian opsi.
Anda dapat menggunakan XSDToSchema
untuk mengekstrak skema Spark DataFrame dari file XSD. Ini hanya mendukung jenis sederhana, kompleks, dan urutan, dan hanya mendukung fungsionalitas XSD dasar.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
Tabel berikut ini memperlihatkan konversi jenis data XSD ke jenis data Spark:
Jenis Data XSD | Tipe Data Spark |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , , negativeInteger nonNegativeInteger , nonPositiveInteger , , positiveInteger ,unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Mengurai XML bertumpuk
Data XML dalam kolom bernilai string di DataFrame yang ada dapat diurai dengan schema_of_xml
dan from_xml
yang mengembalikan skema dan hasil yang diurai sebagai kolom struct
baru. Data XML diteruskan sebagai argumen ke schema_of_xml
dan from_xml
harus menjadi satu catatan XML yang terbentuk dengan baik.
schema_of_xml
Sintaksis
schema_of_xml(xmlStr [, options] )
Argumen
-
xmlStr
: Ekspresi STRING yang menentukan satu rekaman XML yang terbentuk dengan baik. -
options
: Arahan penentuan harfiah opsionalMAP<STRING,STRING>
.
Kembali
STRING yang memegang sebuah definisi dari sebuah struktur dengan n bidang berupa string di mana nama kolom berasal dari elemen XML dan nama atribut. Nilai bidang menyimpan jenis SQL yang diformat turunan.
from_xml
Sintaksis
from_xml(xmlStr, schema [, options])
Argumen
-
xmlStr
: Ekspresi STRING yang menentukan satu rekaman XML yang terbentuk dengan baik. -
schema
: Ekspresi STRING atau pemanggilanschema_of_xml
fungsi. -
options
: Arahan penentuan harfiah opsionalMAP<STRING,STRING>
.
Kembali
Struktur dengan nama bidang dan jenis yang cocok dengan definisi skema. Skema harus didefinisikan sebagai nama kolom dan pasangan jenis data yang dipisahkan koma seperti yang digunakan, misalnya, CREATE TABLE
. Sebagian besar opsi yang diperlihatkan dalam opsi sumber data berlaku dengan pengecualian berikut:
-
rowTag
: Karena hanya ada satu catatan XML,rowTag
opsi tidak berlaku. -
mode
(default:PERMISSIVE
): Memungkinkan mode untuk menangani rekaman yang rusak selama penguraian.-
PERMISSIVE
: Ketika memenuhi rekaman yang rusak, menempatkan string cacat ke dalam bidang yang dikonfigurasi olehcolumnNameOfCorruptRecord
, dan mengatur bidang cacat kenull
. Untuk menyimpan rekaman yang rusak, Anda dapat mengatur bidang jenis string bernamacolumnNameOfCorruptRecord
dalam skema yang ditentukan pengguna. Jika skema tidak memiliki bidang, maka skema itu akan mengabaikan rekaman yang rusak ketika penguraian. Saat menyimpulkan skema, secara implisit menambahkan bidangcolumnNameOfCorruptRecord
dalam skema output. -
FAILFAST
: Melemparkan pengecualian ketika memenuhi rekaman yang rusak.
-
Konversi struktur
Karena perbedaan struktur antara DataFrame dan XML, ada beberapa aturan konversi dari data XML ke DataFrame
dan dari DataFrame
ke data XML. Perhatikan bahwa menangani atribut dapat dinonaktifkan dengan opsi excludeAttribute
.
Konversi dari XML ke DataFrame
Atribut: Atribut dikonversi sebagai bidang dengan awalan attributePrefix
judul .
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
menghasilkan skema di bawah ini:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Data karakter dalam elemen yang berisi atribut atau elemen turunan: Ini diurai ke valueTag
dalam bidang . Jika ada beberapa kemunculan data karakter, bidang dikonversi valueTag
menjadi jenis array
.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
menghasilkan skema di bawah ini:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Konversi dari DataFrame ke XML
Elemen sebagai array dalam array: Menulis file XML dari DataFrame
yang memiliki bidang ArrayType
dengan elemennya sebagai ArrayType
akan memiliki bidang tertanam tambahan untuk elemen tersebut. Ini tidak akan terjadi dalam membaca dan menulis data XML tetapi menulis DataFrame
bacaan dari sumber lain. Oleh karena itu, pulang-pergi dalam membaca dan menulis file XML memiliki struktur yang sama tetapi menulis DataFrame
baca dari sumber lain dimungkinkan untuk memiliki struktur yang berbeda.
DataFrame dengan skema di bawah ini:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
dan dengan data di bawah ini:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
menghasilkan file XML di bawah ini:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
Nama elemen array yang tidak disebutkan namanya dalam DataFrame
ditentukan oleh opsi arrayElementName
(Default: item
).
Kolom data yang diselamatkan
Kolom data yang diselamatkan memastikan bahwa Anda tidak pernah kehilangan atau melewatkan data selama ETL. Anda bisa mengaktifkan kolom data yang diselamatkan untuk mengambil data apa pun yang tidak diurai karena satu atau beberapa bidang dalam rekaman memiliki salah satu masalah berikut:
- Tidak ada dalam skema yang disediakan
- Tidak cocok dengan tipe data dari skema yang disediakan
- Memiliki ketidakcocokan kasus dengan nama bidang dalam skema yang disediakan
Kolom data yang diselamatkan dikembalikan sebagai dokumen JSON yang berisi kolom yang diselamatkan, dan jalur file sumber rekaman. Untuk menghapus jalur file sumber dari kolom data yang diselamatkan, Anda dapat mengatur konfigurasi SQL berikut:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
Anda dapat mengaktifkan kolom data yang diselamatkan dengan mengatur opsi rescuedDataColumn
ke nama kolom saat membaca data, seperti _rescued_data
dengan spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
.
Pengurai XML mendukung tiga mode saat mengurai rekaman: PERMISSIVE
, , DROPMALFORMED
dan FAILFAST
. Ketika digunakan bersama dengan rescuedDataColumn
, ketidakcocokan tipe data tidak menyebabkan catatan dijatuhkan dalam mode DROPMALFORMED
atau melemparkan kesalahan dalam mode FAILFAST
. Hanya rekaman rusak (XML yang tidak lengkap atau cacat) yang dihilangkan atau melempar kesalahan.
Inferensi dan evolusi skema di Auto Loader
Untuk diskusi terperinci tentang topik ini dan opsi yang berlaku, lihat Mengonfigurasi inferensi dan evolusi skema di Auto Loader. Anda dapat mengonfigurasi Auto Loader untuk secara otomatis mendeteksi skema data XML yang dimuat, memungkinkan Anda menginisialisasi tabel tanpa secara eksplisit mendeklarasikan skema data dan mengembangkan skema tabel saat kolom baru diperkenalkan. Ini menghilangkan kebutuhan untuk melacak dan menerapkan perubahan skema secara manual dari waktu ke waktu.
Secara default, inferensi skema Auto Loader berusaha menghindari masalah evolusi skema karena ketidakcocokan jenis. Untuk format yang tidak mengodekan jenis data (JSON, CSV, dan XML), Auto Loader menyimpulkan semua kolom sebagai string, termasuk bidang berlapis dalam file XML. Apache Spark DataFrameReader
menggunakan perilaku yang berbeda untuk inferensi skema, memilih jenis data untuk kolom di sumber XML berdasarkan data sampel. Untuk mengaktifkan perilaku ini dengan Auto Loader, atur opsi cloudFiles.inferColumnTypes
ke true
.
Auto Loader mendeteksi penambahan kolom baru saat memproses data Anda. Saat Auto Loader mendeteksi kolom baru, aliran berhenti dengan UnknownFieldException
. Sebelum aliran Anda mengalami kesalahan ini, Auto Loader melakukan inferensi skema pada batch mikro data terbaru dan memperbarui lokasi skema dengan skema terbaru dengan menambahkan kolom baru ke akhir skema. Tipe data kolom yang ada tetap tidak berubah. Auto Loader mendukung mode yang berbeda untuk evolusi skema, yang Anda tetapkan dalam opsi cloudFiles.schemaEvolutionMode
.
Anda dapat menggunakan petunjuk skema untuk memberlakukan informasi skema yang Anda ketahui dan harapkan pada skema yang disimpulkan. Ketika Anda tahu bahwa kolom adalah jenis data tertentu, atau jika Anda ingin memilih jenis data yang lebih umum (misalnya, double alih-alih bilangan bulat), Anda dapat memberikan beberapa petunjuk untuk jenis data kolom dalam bentuk string menggunakan sintaks spesifikasi skema SQL. Saat kolom data yang dipulihkan diaktifkan, bidang yang penamaannya berbeda dari skema akan dimuat ke kolom _rescued_data
. Anda dapat mengubah perilaku ini dengan mengatur opsi readerCaseSensitive
ke false
, dalam hal ini Auto Loader membaca data dengan cara yang tidak peka huruf besar/kecil.
Contoh
Contoh di bagian ini menggunakan file XML yang tersedia untuk diunduh di repositori Apache Spark GitHub.
Baca dan tulis XML
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Anda dapat menentukan skema secara manual saat membaca data:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
SQL API
Sumber data XML dapat menyimpulkan jenis data:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
Anda juga dapat menentukan nama dan jenis kolom di DDL. Dalam hal ini, skema tidak disimpulkan secara otomatis.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
Memuat XML menggunakan COPY INTO
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
Membaca XML dengan validasi baris
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
Mengurai XML berlapis (from_xml dan schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
from_xml dan schema_of_xml dengan SQL API
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Muat XML dengan Auto Loader
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)