Udostępnij za pośrednictwem


Pozyskiwanie danych jako typu wariantu częściowo ustrukturyzowanego

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

W środowisku Databricks Runtime 15.3 lub nowszym można użyć VARIANT typu do pozyskiwania częściowo ustrukturyzowanych danych. W tym artykule opisano zachowanie i przedstawiono przykładowe wzorce pozyskiwania danych z magazynu obiektów w chmurze przy użyciu automatycznego modułu ładującego i COPY INTO, rekordów przesyłanych strumieniowo z platformy Kafka i poleceń SQL do tworzenia nowych tabel z danymi wariantu lub wstawiania nowych rekordów przy użyciu typu wariantu.

Zobacz Zapytanie dotyczące danych wariantów.

Tworzenie tabeli z kolumną wariantu

VARIANT jest standardowym typem SQL w środowisku Databricks Runtime 15.3 lub nowszym i obsługiwanym przez tabele wspierane przez usługę Delta Lake. Tabele zarządzane w usłudze Azure Databricks domyślnie używają usługi Delta Lake, więc można utworzyć pustą tabelę z jedną kolumną VARIANT przy użyciu następującej składni:

CREATE TABLE table_name (variant_column VARIANT)

Alternatywnie możesz użyć funkcji PARSE_JSON na łańcuchu JSON, aby za pomocą instrukcji CTAS utworzyć tabelę z kolumną typu wariant. Poniższy przykład tworzy tabelę z dwiema kolumnami:

  • Kolumna id wyodrębniona z ciągu JSON jako typ STRING.
  • Kolumna variant_column zawiera cały ciąg JSON zakodowany jako typ VARIANT.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Uwaga

Databricks zaleca wyodrębnianie i przechowywanie pól jako kolumn, które nie są wariantami, których planujesz używać do przyspieszania zapytań i optymalizowania układu przechowywania.

VARIANT kolumny nie mogą być używane w przypadku kluczy klastrowania, partycji lub kluczy kolejności Z. Nie można używać VARIANT typu danych na potrzeby porównań, grupowania, porządkowania i ustawiania operacji. Aby uzyskać pełną listę ograniczeń, zobacz Ograniczenia.

Wstawianie danych przy użyciu parse_json

Jeśli tabela docelowa zawiera już kolumnę zakodowaną jako VARIANT, możesz użyć parse_json, aby wstawić rekordy ciągów JSON jako VARIANT, jak w poniższym przykładzie:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Pozyskiwanie danych z magazynu obiektów w chmurze jako wariant

W środowisku Databricks Runtime 15.3 lub nowszym można użyć modułu automatycznego ładującego, aby załadować wszystkie dane ze źródeł JSON jako pojedynczą kolumnę VARIANT w tabeli docelowej. Ponieważ VARIANT jest elastyczny względem zmian schematu i typu oraz uwzględnia wielkość liter i wartości NULL obecne w źródle danych, ten wzorzec sprawdza się w większości scenariuszy przyjmowania danych z następującymi zastrzeżeniami:

  • Źle sformułowane rekordy JSON nie mogą być zakodowane przy użyciu VARIANT typu.
  • VARIANT typ może przechowywać tylko rekordy o rozmiarze do 16 mb.

Uwaga

Wariant traktuje rekordy nadmiernie duże rekordy podobne do uszkodzonych rekordów. W domyślnym trybie przetwarzania PERMISSIVE duże rekordy są przechwytywane w kolumnie _malformed_data obok nieprawidłowo sformułowanych rekordów JSON.

Ponieważ wszystkie dane ze źródła JSON są rejestrowane jako pojedyncza kolumna VARIANT, podczas pozyskiwania nie występuje ewolucja schematu i rescuedDataColumn nie jest obsługiwana. W poniższym przykładzie przyjęto założenie, że tabela docelowa już istnieje z jedną kolumną VARIANT.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Można również określić VARIANT podczas definiowania schematu lub przekazywania schemaHints. Dane w polu źródłowym, do których odwołuje się odwołanie, muszą zawierać prawidłowy ciąg JSON. W poniższych przykładach pokazano tę składnię:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Używanie COPY INTO z wariantem

Usługa Databricks zaleca używanie automatycznego modułu ładującego COPY INTO w przypadku dostępności.

COPY INTO obsługuje pozyskiwanie całej zawartości źródła danych JSON jako pojedynczej kolumny. Poniższy przykład tworzy nową tabelę z pojedynczą kolumną VARIANT, a następnie używa COPY INTO do pozyskiwania rekordów ze źródła plików JSON.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

Można również zdefiniować dowolne pole w tabeli docelowej jako VARIANT. Po uruchomieniu polecenia COPY INTOodpowiednie pola w źródle danych są pozyskiwane i rzutowane do VARIANT typu, jak w następujących przykładach:

-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

Przesyłanie strumieniowe danych platformy Kafka jako wariant

Wiele strumieni platformy Kafka koduje swoje ładunki przy użyciu kodu JSON. Podczas pozyskiwania strumieni Kafka z użyciem VARIANT, obciążenia te stają się odporne na zmiany w schemacie.

W poniższym przykładzie pokazano odczytywanie źródła przesyłania strumieniowego Kafka, rzutowanie key jako STRING i value jako VARIANT, oraz zapisywanie do tabeli docelowej.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)