Ingestování dat jako částečně strukturovaný typ varianty
Důležité
Tato funkce je ve verzi Public Preview.
V Databricks Runtime 15.3 a novějších můžete použít VARIANT
typ k ingestování částečně strukturovaných dat. Tento článek popisuje chování a poskytuje ukázkové vzory pro příjem dat z cloudového úložiště objektů pomocí Auto Loaderu a COPY INTO
, streamování záznamů ze systému Kafka a používání SQL příkazů pro vytváření nových tables s daty variant nebo pro vložení nových záznamů pomocí variantního typu.
Viz Dotaz na data variant.
Vytvořte table s variantou column
VARIANT
je standardní typ SQL v Databricks Runtime 15.3 a novějších verzích a je podporován tables na platformě Delta Lake. Spravované tables v Azure Databricks standardně používají Delta Lake, takže můžete pomocí následující syntaxe vytvořit prázdný table s jedním VARIANT
column.
CREATE TABLE table_name (variant_column VARIANT)
Alternativně můžete pomocí funkce PARSE_JSON
v řetězci JSON použít příkaz CTAS k vytvoření table s variantou column. Následující příklad vytvoří table se dvěma columns:
-
id
column extrahován z řetězce JSON jako typSTRING
. -
variant_column
column obsahuje celý řetězec JSON kódovaný jako typVARIANT
.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Poznámka:
Databricks doporučuje extrahovat a ukládat pole jako ne variantní columns, které chcete použít k urychlení dotazů a optimize rozložení úložiště.
VARIANT
columns nelze použít pro klíče clusteringu, oddíly nebo klíče pořadí Z. Datový typ VARIANT
nelze použít pro porovnání, seskupení, řazení a set operací. Úplný seznam list omezení viz Omezení.
Insert data pomocí parse_json
Pokud cílový table již obsahuje column kódovaný jako VARIANT
, můžete použít parse_json
pro insert záznamy řetězců JSON jako VARIANT
, jak je uvedeno v následujícím příkladu:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Příjem dat z cloudového úložiště objektů jako varianty
Ve službě Databricks Runtime 15.3 a novějších můžete pomocí Auto Loaderu načíst všechna data ze zdrojů JSON jako jediný VARIANT
column v cílovém table. Vzhledem k tomu, že VARIANT
je flexibilní vůči schema a změnám typů a zachovává citlivost na velikost písmen a přítomnost NULL
values ve zdroji dat, je tento model robustní pro většinu scénářů příjmu dat s následujícími upozorněními:
- Chybně formátované záznamy JSON nelze zakódovat pomocí
VARIANT
typu. -
VARIANT
typ může obsahovat pouze záznamy o velikosti až 16 mb.
Poznámka:
Variant zpracovává příliš velké záznamy podobné poškozeným záznamům. Ve výchozím režimu zpracování PERMISSIVE
se v _malformed_data
column společně s poškozenými záznamy JSON zaznamenávají příliš velké záznamy.
Vzhledem k tomu, že všechna data ze zdroje JSON se zaznamenávají jako jedna VARIANT
column, během příjmu dat nedojde k žádnému schema vývoji a rescuedDataColumn
se nepodporuje. Následující příklad předpokládá, že cíl table již existuje s jedním VARIANT
column.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Můžete také zadat VARIANT
při definování schema nebo předání schemaHints
. Data v odkazovaném zdrojovém poli musí obsahovat platný řetězec JSON. Následující příklady ukazují tuto syntaxi:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Použití COPY INTO
s variantou
Databricks doporučuje používat automatický zavaděč, COPY INTO
pokud je k dispozici.
COPY INTO
podporuje ingestování celého obsahu zdroje dat JSON jako jednoho column. Následující příklad vytvoří novou table s jedním VARIANT
column a pak použije COPY INTO
k ingestování záznamů ze zdroje souboru JSON.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Můžete také definovat libovolné pole v cílovém table jako VARIANT
. Při spuštění COPY INTO
se odpovídající pole ve zdroji dat ingestují a přetypují na VARIANT
typ, jak je znázorněno v následujících příkladech:
-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
Streamování dat Kafka jako varianty
Mnoho datových proudů Kafka kóduje jejich datové části pomocí JSON. Příjem datových proudů Kafka pomocí VARIANT
činí tyto úlohy odolnými vůči schema změnám.
Následující příklad ukazuje čtení zdroje streamování Kafka, přetypování key
jako STRING
a value
jako VARIANT
a zápis do cílového table.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)