Partager via


Ingérer des données en tant que type de variante semi-structurée

Important

Cette fonctionnalité est disponible en préversion publique.

Dans Databricks Runtime 15.3 et les versions ultérieures, vous pouvez utiliser le VARIANT type pour ingérer des données semi-structurées. Cet article décrit le comportement et fournit des exemples de modèles pour l’ingestion de données à partir du stockage d’objets dans le cloud à l’aide d’Auto Loader COPY INTO, d’enregistrements de diffusion en continu à partir de Kafka, et de commandes SQL pour la création de tables avec des données variantes ou l’insertion de nouveaux enregistrements en utilisant le type de variante.

Consultez les données de variante de requête.

Créer une table avec une colonne de variante

VARIANT est un type SQL standard dans Databricks Runtime 15.3 et les versions ultérieures. Ce type est pris en charge par les tables sauvegardées par Delta Lake. Les tables managées sur Azure Databricks utilisent Delta Lake par défaut. Vous pouvez donc créer une table vide avec une seule VARIANT colonne à l’aide de la syntaxe suivante :

CREATE TABLE table_name (variant_column VARIANT)

Vous pouvez également utiliser la PARSE_JSON fonction sur une chaîne JSON pour utiliser une instruction CTAS pour créer une table avec une colonne de variante. L’exemple suivant crée une table normale contenant deux colonnes :

  • la id colonne extraite de la chaîne JSON en tant que STRING type ;
  • la variant_column colonne contient l’intégralité de la chaîne JSON encodée en tant que VARIANT type.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Remarque

Databricks recommande d’extraire et de stocker les champs comme des colonnes non variables que vous prévoyez d’utiliser pour accélérer les requêtes et optimiser la disposition du stockage.

VARIANT les colonnes ne peuvent pas être utilisées pour les clés de clustering, les partitions ou les clés de classement Z. Le type de données VARIANT ne peut pas être utilisé pour les comparaisons, le regroupement, l’ordre et les opérations de définition. Pour obtenir la liste complète des limitations, consultez Limitations.

Insérer des données à l’aide de parse_json

Si la table cible contient déjà une colonne encodée en tant que VARIANT, vous pouvez utiliser parse_json pour insérer des enregistrements de chaîne JSON comme VARIANT, comme dans l’exemple suivant :

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Ingérer des données à partir du stockage d’objets cloud en tant que variante

Dans Databricks Runtime 15.3 et les versions ultérieures, vous pouvez utiliser le chargeur automatique pour charger toutes les données à partir de sources JSON en tant que colonne unique VARIANT dans une table cible. VARIANT est flexible pour les modifications de schéma et de type et conserve la sensibilité de la casse et NULL les valeurs présentes dans la source de données, par conséquent, ce modèle est robuste pour la plupart des scénarios d’ingestion avec les avertissements suivants :

  • Les enregistrements JSON mal formés ne peuvent pas être encodés à l’aide du VARIANT type.
  • Le VARIANT type ne peut contenir que des enregistrements d’une taille de 16 Mo.

Remarque

La variante traite les enregistrements trop volumineux similaires aux enregistrements endommagés. En mode de traitement par défaut PERMISSIVE, les enregistrements trop volumineux sont capturés dans la _malformed_data colonne en même temps que les enregistrements JSON mal formés.

Toutes les données de la source JSON sont enregistrées sous la forme d’une seule colonne VARIANT, par conséquent, aucune évolution de schéma ne se produit pendant l’ingestion, et rescuedDataColumn n’est pas pris en charge. L’exemple suivant suppose que la table cible existe déjà avec une seule VARIANT colonne.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Vous pouvez également spécifier VARIANT lors de la définition d’un schéma ou d’un passage schemaHints. Les données du champ source référencé doivent contenir une chaîne JSON valide. Les exemples ci-dessous illustrent cette syntaxe :

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Utiliser COPY INTO avec une variante

Databricks recommande d’utiliser le chargeur automatique pour COPY INTO lorsqu’il est disponible.

COPY INTO prend en charge l’ingestion de l’intégralité du contenu d’une source de données JSON comme colonne unique. L’exemple suivant crée une table avec une seule VARIANT colonne, puis utilise COPY INTO pour ingérer des enregistrements à partir d’une source de fichier JSON.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

Vous pouvez également définir n’importe quel champ dans une table cible en tant que VARIANT. Lorsque vous exécutez COPY INTO, les champs correspondants de la source de données sont ingérés et diffusés sur VARIANT le type, comme dans les exemples suivants :

-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

Diffuser des données Kafka comme variante

De nombreux flux Kafka encodent leurs charges utiles à l’aide de JSON. L’ingestion de flux Kafka à l’aide de VARIANT rend ces charges de travail robustes aux modifications de schéma.

L’exemple suivant illustre la lecture d’une source de diffusion en continu Kafka, la diffusion de key en tant que STRING et value en tant que VARIANT, et l’écriture sur une table cible.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)