Ingérer des données en tant que type de variante semi-structurée
Important
Cette fonctionnalité est disponible en préversion publique.
Dans Databricks Runtime 15.3 et les versions ultérieures, vous pouvez utiliser le VARIANT
type pour ingérer des données semi-structurées. Cet article décrit le comportement et fournit des exemples de modèles pour l’ingestion de données à partir du stockage d’objets dans le cloud à l’aide d’Auto Loader COPY INTO
, d’enregistrements de diffusion en continu à partir de Kafka, et de commandes SQL pour la création de tables avec des données variantes ou l’insertion de nouveaux enregistrements en utilisant le type de variante.
Consultez les données de variante de requête.
Créer une table avec une colonne de variante
VARIANT
est un type SQL standard dans Databricks Runtime 15.3 et les versions ultérieures. Ce type est pris en charge par les tables sauvegardées par Delta Lake. Les tables managées sur Azure Databricks utilisent Delta Lake par défaut. Vous pouvez donc créer une table vide avec une seule VARIANT
colonne à l’aide de la syntaxe suivante :
CREATE TABLE table_name (variant_column VARIANT)
Vous pouvez également utiliser la PARSE_JSON
fonction sur une chaîne JSON pour utiliser une instruction CTAS pour créer une table avec une colonne de variante. L’exemple suivant crée une table normale contenant deux colonnes :
- la
id
colonne extraite de la chaîne JSON en tant queSTRING
type ; - la
variant_column
colonne contient l’intégralité de la chaîne JSON encodée en tant queVARIANT
type.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Remarque
Databricks recommande d’extraire et de stocker les champs comme des colonnes non variables que vous prévoyez d’utiliser pour accélérer les requêtes et optimiser la disposition du stockage.
VARIANT
les colonnes ne peuvent pas être utilisées pour les clés de clustering, les partitions ou les clés de classement Z. Le type de données VARIANT
ne peut pas être utilisé pour les comparaisons, le regroupement, l’ordre et les opérations de définition. Pour obtenir la liste complète des limitations, consultez Limitations.
Insérer des données à l’aide de parse_json
Si la table cible contient déjà une colonne encodée en tant que VARIANT
, vous pouvez utiliser parse_json
pour insérer des enregistrements de chaîne JSON comme VARIANT
, comme dans l’exemple suivant :
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Ingérer des données à partir du stockage d’objets cloud en tant que variante
Dans Databricks Runtime 15.3 et les versions ultérieures, vous pouvez utiliser le chargeur automatique pour charger toutes les données à partir de sources JSON en tant que colonne unique VARIANT
dans une table cible. VARIANT
est flexible pour les modifications de schéma et de type et conserve la sensibilité de la casse et NULL
les valeurs présentes dans la source de données, par conséquent, ce modèle est robuste pour la plupart des scénarios d’ingestion avec les avertissements suivants :
- Les enregistrements JSON mal formés ne peuvent pas être encodés à l’aide du
VARIANT
type. - Le
VARIANT
type ne peut contenir que des enregistrements d’une taille de 16 Mo.
Remarque
La variante traite les enregistrements trop volumineux similaires aux enregistrements endommagés. En mode de traitement par défaut PERMISSIVE
, les enregistrements trop volumineux sont capturés dans la _malformed_data
colonne en même temps que les enregistrements JSON mal formés.
Toutes les données de la source JSON sont enregistrées sous la forme d’une seule colonne VARIANT
, par conséquent, aucune évolution de schéma ne se produit pendant l’ingestion, et rescuedDataColumn
n’est pas pris en charge. L’exemple suivant suppose que la table cible existe déjà avec une seule VARIANT
colonne.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Vous pouvez également spécifier VARIANT
lors de la définition d’un schéma ou d’un passage schemaHints
. Les données du champ source référencé doivent contenir une chaîne JSON valide. Les exemples ci-dessous illustrent cette syntaxe :
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Utiliser COPY INTO
avec une variante
Databricks recommande d’utiliser le chargeur automatique pour COPY INTO
lorsqu’il est disponible.
COPY INTO
prend en charge l’ingestion de l’intégralité du contenu d’une source de données JSON comme colonne unique. L’exemple suivant crée une table avec une seule VARIANT
colonne, puis utilise COPY INTO
pour ingérer des enregistrements à partir d’une source de fichier JSON.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Vous pouvez également définir n’importe quel champ dans une table cible en tant que VARIANT
. Lorsque vous exécutez COPY INTO
, les champs correspondants de la source de données sont ingérés et diffusés sur VARIANT
le type, comme dans les exemples suivants :
-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
Diffuser des données Kafka comme variante
De nombreux flux Kafka encodent leurs charges utiles à l’aide de JSON. L’ingestion de flux Kafka à l’aide de VARIANT
rend ces charges de travail robustes aux modifications de schéma.
L’exemple suivant illustre la lecture d’une source de diffusion en continu Kafka, la diffusion de key
en tant que STRING
et value
en tant que VARIANT
, et l’écriture sur une table cible.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)