Sdílet prostřednictvím


Ingestování dat jako částečně strukturovaný typ varianty

Důležité

Tato funkce je ve verzi Public Preview.

V Databricks Runtime 15.3 a novějších můžete použít VARIANT typ k ingestování částečně strukturovaných dat. Tento článek popisuje chování a poskytuje ukázkové vzory pro příjem dat z cloudového úložiště objektů pomocí Auto Loaderu a COPY INTO, streamování záznamů ze systému Kafka a používání SQL příkazů pro vytváření nových tables s daty variant nebo pro vložení nových záznamů pomocí variantního typu.

Viz Dotaz na data variant.

Vytvořte table s variantou column

VARIANT je standardní typ SQL v Databricks Runtime 15.3 a novějších verzích a je podporován tables na platformě Delta Lake. Spravované tables v Azure Databricks standardně používají Delta Lake, takže můžete pomocí následující syntaxe vytvořit prázdný table s jedním VARIANTcolumn.

CREATE TABLE table_name (variant_column VARIANT)

Alternativně můžete pomocí funkce PARSE_JSON v řetězci JSON použít příkaz CTAS k vytvoření table s variantou column. Následující příklad vytvoří table se dvěma columns:

  • id column extrahován z řetězce JSON jako typ STRING.
  • variant_column column obsahuje celý řetězec JSON kódovaný jako typ VARIANT.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Poznámka:

Databricks doporučuje extrahovat a ukládat pole jako ne variantní columns, které chcete použít k urychlení dotazů a optimize rozložení úložiště.

VARIANT columns nelze použít pro klíče clusteringu, oddíly nebo klíče pořadí Z. Datový typ VARIANT nelze použít pro porovnání, seskupení, řazení a set operací. Úplný seznam list omezení viz Omezení.

Insert data pomocí parse_json

Pokud cílový table již obsahuje column kódovaný jako VARIANT, můžete použít parse_json pro insert záznamy řetězců JSON jako VARIANT, jak je uvedeno v následujícím příkladu:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Příjem dat z cloudového úložiště objektů jako varianty

Ve službě Databricks Runtime 15.3 a novějších můžete pomocí Auto Loaderu načíst všechna data ze zdrojů JSON jako jediný VARIANTcolumn v cílovém table. Vzhledem k tomu, že VARIANT je flexibilní vůči schema a změnám typů a zachovává citlivost na velikost písmen a přítomnost NULLvalues ve zdroji dat, je tento model robustní pro většinu scénářů příjmu dat s následujícími upozorněními:

  • Chybně formátované záznamy JSON nelze zakódovat pomocí VARIANT typu.
  • VARIANT typ může obsahovat pouze záznamy o velikosti až 16 mb.

Poznámka:

Variant zpracovává příliš velké záznamy podobné poškozeným záznamům. Ve výchozím režimu zpracování PERMISSIVE se v _malformed_datacolumn společně s poškozenými záznamy JSON zaznamenávají příliš velké záznamy.

Vzhledem k tomu, že všechna data ze zdroje JSON se zaznamenávají jako jedna VARIANTcolumn, během příjmu dat nedojde k žádnému schema vývoji a rescuedDataColumn se nepodporuje. Následující příklad předpokládá, že cíl table již existuje s jedním VARIANTcolumn.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Můžete také zadat VARIANT při definování schema nebo předání schemaHints. Data v odkazovaném zdrojovém poli musí obsahovat platný řetězec JSON. Následující příklady ukazují tuto syntaxi:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Použití COPY INTO s variantou

Databricks doporučuje používat automatický zavaděč, COPY INTO pokud je k dispozici.

COPY INTO podporuje ingestování celého obsahu zdroje dat JSON jako jednoho column. Následující příklad vytvoří novou table s jedním VARIANTcolumn a pak použije COPY INTO k ingestování záznamů ze zdroje souboru JSON.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

Můžete také definovat libovolné pole v cílovém table jako VARIANT. Při spuštění COPY INTOse odpovídající pole ve zdroji dat ingestují a přetypují na VARIANT typ, jak je znázorněno v následujících příkladech:

-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

Streamování dat Kafka jako varianty

Mnoho datových proudů Kafka kóduje jejich datové části pomocí JSON. Příjem datových proudů Kafka pomocí VARIANT činí tyto úlohy odolnými vůči schema změnám.

Následující příklad ukazuje čtení zdroje streamování Kafka, přetypování key jako STRING a value jako VARIANTa zápis do cílového table.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)