XML dosyalarını okuma ve yazma
Önemli
Bu özellik Genel Önizlemededir.
Bu makalede XML dosyalarının nasıl okunduğu ve yazıldığı açıklanır.
Genişletilebilir biçimlendirme dili (XML), verileri metin biçiminde biçimlendirmeye, depolamaya ve paylaşmaya yönelik bir işaretleme dilidir. Belgelerden rastgele veri yapılarına kadar değişen verileri seri hale getirmek için bir dizi kural tanımlar.
Yerel XML dosya biçimi desteği, xml verilerini toplu işleme veya akış için alma, sorgulama ve ayrıştırma işlemlerini etkinleştirir. Şema ve veri türlerini otomatik olarak çıkarıp geliştirebilir, from_xml
gibi SQL ifadelerini destekler ve XML belgeleri oluşturabilir. Dış jar gerektirmez ve Otomatik Yükleyici read_files
ve COPY INTO
ile sorunsuz çalışır. İsteğe bağlı olarak her satır düzeyi XML kaydını bir XML Şema Tanımına (XSD) göre doğrulayabilirsiniz.
Gereksinimler
Databricks Runtime 14.3 ve üzeri
XML kayıtlarını ayrıştırma
XML belirtimi iyi biçimlendirilmiş bir yapıyı zorunlu kullanır. Ancak, bu belirtim hemen tablosal bir biçimle eşlenemez. bir ile rowTag
eşleyen XML öğesini belirtmek için DataFrame
Row
seçeneğini belirtmeniz gerekir.
rowTag
öğesi en üst düzey struct
olur. öğesinin rowTag
alt öğeleri, en üst düzey struct
alanının alanları haline gelir.
Bu kaydın şemasını belirtebilir veya otomatik olarak çıkarılmalarını sağlayabilirsiniz. Ayrıştırıcı yalnızca öğeleri incelediğinden rowTag
, DTD ve dış varlıklar filtrelenir.
Aşağıdaki örneklerde farklı rowTag
seçenekleri kullanılarak xml dosyasının şema çıkarımı ve ayrıştırılması gösterilmektedir:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
XML dosyasını "kitaplar" seçeneğiyle rowTag
okuyun:
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Çıktı:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
XML dosyasını rowTag
"book" olarak okuyun:
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Çıktı:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Veri kaynağı seçenekleri
XML için veri kaynağı seçenekleri aşağıdaki yollarla belirtilebilir:
- Aşağıdaki
.option/.options
yöntemleri:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- Aşağıdaki yerleşik işlevler:
-
OPTIONS
yan tümcesiCREATE TABLE, VERİ_KAYNAĞI KULLANARAK
Seçeneklerin listesi için bkz. Otomatik Yükleyici seçenekleri.
XSD desteği
İsteğe bağlı olarak her satır düzeyi XML kaydını bir XML Şema Tanımı (XSD) ile doğrulayabilirsiniz. Seçeneğinde rowValidationXSDPath
XSD dosyası belirtilir. XSD, sağlanan veya çıkarılmış şemayı başka şekilde etkilemez. Doğrulamada başarısız olan bir kayıt "bozuk" olarak işaretlenir ve seçenek bölümünde açıklanan bozuk kayıt işleme modu seçeneğine göre işlenir.
XSDToSchema
kullanarak bir XSD dosyasından Spark DataFrame şeması ayıklayabilirsiniz. Yalnızca basit, karmaşık ve sıra türlerini destekler ve yalnızca temel XSD işlevlerini destekler.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
Aşağıdaki tabloda XSD veri türlerinin Spark veri türlerine dönüştürülmesi gösterilmektedir:
XSD Veri Türleri | Spark Veri Türleri |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , negativeInteger , nonNegativeInteger , , nonPositiveInteger , positiveInteger , unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
İç içe XML ayrıştırma
Var olan bir DataFrame'deki dize değerine sahip sütunda yer alan XML verileri, şemayı ve ayrıştırılmış sonuçları yeni schema_of_xml
sütunları olarak döndüren from_xml
ve struct
tarafından ayrıştırılabilir. ve bağımsız schema_of_xml
from_xml
değişkeni olarak geçirilen XML verileri tek bir iyi biçimlendirilmiş XML kaydı olmalıdır.
schema_of_xml
Söz dizimi
schema_of_xml(xmlStr [, options] )
Bağımsız Değişkenler
-
xmlStr
: İyi biçimlendirilmiş tek bir XML kaydı belirten STRING ifadesi. -
options
: İsteğe bağlıMAP<STRING,STRING>
değişmez değer belirtme yönergeleri.
İadeler
Sütun adlarının XML öğesinden ve öznitelik adlarından türetildiği n dize alanı içeren bir yapının tanımını tutan STRING. Alan değerleri türetilmiş biçimlendirilmiş SQL türlerini barındırır.
from_xml
Söz dizimi
from_xml(xmlStr, schema [, options])
Bağımsız Değişkenler
-
xmlStr
: İyi biçimlendirilmiş tek bir XML kaydı belirten STRING ifadesi. -
schema
: BIR STRING ifadesi veya işlevi çağırmaschema_of_xml
. -
options
: İsteğe bağlıMAP<STRING,STRING>
değişmez değer belirtme yönergeleri.
İadeler
Şema tanımıyla eşleşen alan adlarını ve türlerini içeren bir yapı. Şema, içinde kullanıldığı gibi virgülle ayrılmış sütun adı ve veri türü çiftleri olarak tanımlanmalıdır, örneğin CREATE TABLE
. Veri kaynağı seçeneklerinde gösterilen çoğu seçenek aşağıdaki özel durumlar için geçerlidir:
-
rowTag
: Yalnızca bir XML kaydı olduğundan seçeneğirowTag
geçerli değildir. -
mode
(varsayılan:PERMISSIVE
): Ayrıştırma sırasında bozuk kayıtlarla ilgilenmek için bir moda izin verir.-
PERMISSIVE
: Bozuk bir kaydı karşıladığında, hatalı biçimlendirilmiş dizeyi tarafındancolumnNameOfCorruptRecord
yapılandırılan bir alana yerleştirir ve hatalı biçimlendirilmiş alanları olaraknull
ayarlar. Bozuk kayıtları tutmak için, kullanıcı tanımlı şemadacolumnNameOfCorruptRecord
adlı bir dize türü alanı ayarlayabilirsiniz. Bir şemanın alanı yoksa, ayrıştırma sırasında bozuk kayıtları bırakır. Bir şema çıkarıldığında, bir çıktı şemasına örtük olarak bircolumnNameOfCorruptRecord
alanı ekler. -
FAILFAST
: Bozuk kayıtları karşıladığında bir özel durum oluşturur.
-
Yapı dönüştürme
DataFrame ile XML arasındaki yapı farklılıkları nedeniyle, XML verilerinden XML verilerine ve DataFrame
XML verilerine DataFrame
bazı dönüştürme kuralları vardır. Öznitelikleri işlemenin seçeneğiyle excludeAttribute
devre dışı bırakılabildiğini unutmayın.
XML'den DataFrame'e dönüştürme
Öznitelikler: Öznitelikler, başlık ön ekine attributePrefix
sahip alanlar olarak dönüştürülür.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
aşağıdaki şemayı oluşturur:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Öznitelik veya alt öğe içeren bir öğedeki karakter verileri: Bunlar alana ayrıştırılır valueTag
. Karakter verilerinin birden çok tekrarı varsa, valueTag
alan bir array
türe dönüştürülür.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
aşağıdaki şemayı oluşturur:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
DataFrame'den XML'ye dönüştürme
öğesinin,içinde bir dizi olarak bulunduğu durumda: DataFrame
XML dosyasının ArrayType
alanı, ArrayType
öğesiyle yazıldığında, bu öğe için ek olarak iç içe bir alan olur. Bu durum XML verilerini okurken ve yazarken değil, diğer kaynaklardan okuma yazarken DataFrame
meydana gelir. Bu nedenle, XML dosyalarını okuma ve yazmada gidiş dönüş aynı yapıya sahiptir, ancak başka kaynaklardan okuma yazmak DataFrame
farklı bir yapıya sahip olabilir.
Aşağıdaki şemaya sahip DataFrame:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
ve aşağıdaki verilerle:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
aşağıda bir XML dosyası oluşturur:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
içindeki DataFrame
adlandırılmamış dizinin öğe adı seçeneğiyle arrayElementName
belirtilir (Varsayılan: item
).
Kurtarılan veri sütunu
Kurtarılan veri sütunu, ETL sırasında verileri asla kaybetmenizi veya kaçırmamanızı sağlar. Bir kayıttaki bir veya daha fazla alanda aşağıdaki sorunlardan biri olduğundan, kurtarılan veri sütununu ayrıştırılmayan verileri yakalamak için etkinleştirebilirsiniz:
- Sağlanan şemada yok
- Sağlanan şemanın veri türüyle eşleşmiyor
- Sağlanan şemada, alan adlarının büyük/küçük harf kullanımında bir uyuşmazlık var.
Kurtarılan veri sütunu, kurtarılan sütunları ve kaydın kaynak dosya yolunu içeren bir JSON belgesi olarak döndürülür. Kurtarılan veri sütunundan kaynak dosya yolunu kaldırmak için aşağıdaki SQL yapılandırmasını ayarlayabilirsiniz:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
verileri okurken rescuedDataColumn
seçeneğini _rescued_data
ile spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
gibi bir sütun adına ayarlayarak kurtarılan veri sütununu etkinleştirebilirsiniz.
XML ayrıştırıcısı kayıtları ayrıştırırken üç modu destekler: PERMISSIVE
, DROPMALFORMED
ve FAILFAST
. ile rescuedDataColumn
birlikte kullanıldığında, veri türü uyuşmazlıkları kayıtların modda DROPMALFORMED
bırakılmasına veya modda hata FAILFAST
oluşturmasına neden olmaz. Yalnızca bozuk kayıtlar (eksik veya yanlış biçimlendirilmiş XML) bırakılır veya hata oluşturur.
Otomatik Yükleyici'de şema çıkarımı ve evrimi
Bu konu başlığı ve ilgili seçenekler hakkında ayrıntılı bilgi için bkz. Otomatik Yükleyicişema çıkarımı ve evrimini yapılandırma . Otomatik Yükleyici'yi, yüklenen XML verilerinin şemasını otomatik olarak algılayarak veri şemasını açıkça bildirmeden tabloları başlatmanıza ve yeni sütunlar kullanıma sunulduğunda tablo şemasını geliştirmenize olanak tanıyacak şekilde yapılandırabilirsiniz. Bu, şema değişikliklerini zaman içinde el ile izleme ve uygulama gereksinimini ortadan kaldırır.
Varsayılan olarak, Otomatik Yükleyici şema çıkarımı, tür uyuşmazlıkları nedeniyle şema evrimi sorunlarından kaçınmayı arar. Veri türlerini (JSON, CSV ve XML) kodlamamış biçimler için Otomatik Yükleyici, XML dosyalarındaki iç içe yerleştirilmiş alanlar da dahil olmak üzere tüm sütunları dize olarak çıkarsar. Apache Spark DataFrameReader
şema çıkarımı için farklı bir davranış kullanır ve örnek verilere göre XML kaynaklarındaki sütunlar için veri türlerini seçer. Otomatik Yükleyici ile bu davranışı etkinleştirmek için cloudFiles.inferColumnTypes
seçeneğini true
olarak ayarlayın.
Otomatik Yükleyici, verilerinizi işlerken yeni sütunların eklenmesini algılar. Otomatik Yükleyici yeni bir sütun algıladığında, akış UnknownFieldException
ile durur. Akışınız bu hatayı oluşturmadan önce, Otomatik Yükleyici en son mikro veri toplu işleminde şema çıkarımını gerçekleştirir ve şemanın sonuna yeni sütunları birleştirerek şema konumunu en son şemayla güncelleştirir. Mevcut sütunların veri türleri değişmeden kalır. Otomatik Yükleyici,
şema ipuçlarını kullanarak, bildiğiniz ve çıkarım yapılan bir şemada beklediğiniz şema bilgilerini zorunlu kılabilirsiniz. Sütunun belirli bir veri türünde olduğunu biliyorsanız veya daha genel bir veri türü (örneğin, tamsayı yerine çift) seçmek istiyorsanız, SQL şema belirtimi söz dizimini kullanarak sütun veri türleri için dize olarak rastgele sayıda ipucu sağlayabilirsiniz. Kurtarılan veri sütunu etkinleştirildiğinde, şemanın dışında bir durumda adlandırılan alanlar _rescued_data
sütununa yüklenir. Seçeneğini olarak ayarlayarak readerCaseSensitive
false
bu davranışı değiştirebilirsiniz. Bu durumda Otomatik Yükleyici verileri büyük/küçük harfe duyarlı olmayan bir şekilde okur.
Örnekler
Bu bölümdeki örneklerde Apache Spark GitHub deposunda indirilebilen bir XML dosyası kullanılmaktadır.
XML okuma ve yazma
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Verileri okurken şemayı el ile belirtebilirsiniz:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
SQL API'si
XML veri kaynağı veri türlerini çıkarsayabilir:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
DDL'de sütun adlarını ve türlerini de belirtebilirsiniz. Bu durumda şema otomatik olarak çıkarılmaz.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
COPY INTO kullanarak XML yükleme
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
Satır doğrulama ile XML okuma
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
İç içe XML ayrıştırma (from_xml ve schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
SQL API ile from_xml ve schema_of_xml
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Otomatik Yükleyici ile XML Yükleme
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)
Ek kaynaklar
Spark-xml kitaplığını kullanarak XML verilerini okuma ve yazma