Aracılığıyla paylaş


Yarı yapılandırılmış değişken türü olarak veri alma

Önemli

Bu özellik Genel Önizlemededir.

Databricks Runtime 15.3 ve üzerindeki sürümlerde, VARIANT türünü kullanarak yarı yapılandırılmış verileri alabilirsiniz. Bu makalede davranış açıklanır ve Otomatik Yükleyici ve COPY INTOkullanarak bulut nesne depolama alanından veri almak, Kafka'dan kayıt akışı yapmak ve değişken verilerle yeni tablolar oluşturmak veya değişken türünü kullanarak yeni kayıtlar eklemek için SQL komutları için örnek desenler sağlanır.

Bkz. Değişken verileri sorgulama.

Değişken sütunlu tablo oluşturma

VARIANT, Databricks Runtime 15.3 ve üzeri sürümlerin standart bir SQL türüdür ve Delta Lake tarafından desteklenen tablolar tarafından desteklenir. Azure Databricks'te yönetilen tablolar varsayılan olarak Delta Lake kullanır, böylece aşağıdaki söz dizimini kullanarak tek bir VARIANT sütunu olan boş bir tablo oluşturabilirsiniz:

CREATE TABLE table_name (variant_column VARIANT)

Alternatif olarak, değişken sütunlu bir tablo oluşturmak üzere CTAS deyimini kullanmak için JSON dizesinde PARSE_JSON işlevini kullanabilirsiniz. Aşağıdaki örnek, iki sütunlu bir tablo oluşturur:

  • JSON dizesinden id sütunu, STRING türü olarak ayıklanmıştır.
  • variant_column sütunu, VARIANT türü olarak kodlanmış JSON dizesinin tamamını içerir.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Not Düşün

Databricks, sorguları hızlandırmak ve depolama düzenini iyileştirmek için kullanmayı planladığınız alanların değişken olmayan sütunlar olarak ayıklanıp depolanmasını önerir.

VARIANT sütunlar kümeleme anahtarları, bölümler veya Z sırası anahtarları için kullanılamaz. VARIANT veri türü karşılaştırmalar, gruplandırma, sıralama ve ayarlama işlemleri için kullanılamaz. Sınırlamaların tam listesi için bkz. Sınırlamalar.

parse_json kullanarak veri ekleme

Hedef tablo zaten VARIANTolarak kodlanmış bir sütun içeriyorsa, aşağıdaki örnekte olduğu gibi JSON dize kayıtlarını parse_jsonolarak eklemek için VARIANT kullanabilirsiniz:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Bulut nesne depolama alanından değişken olarak veri alma

Databricks Runtime 15.3 ve üzeri sürümlerde, JSON kaynaklarından gelen tüm verileri hedef tabloda tek bir VARIANT sütunu olarak yüklemek için Otomatik Yükleyici'yi kullanabilirsiniz. VARIANT şemaya ve tür değişikliklerine esnek olduğundan ve veri kaynağında bulunan büyük/küçük harf duyarlılığını ve NULL değerlerini koruduğundan, bu düzen aşağıdaki istisnalarla çoğu yükleme senaryosu için sağlamdır:

  • Hatalı biçimlendirilmiş JSON kayıtları tür kullanılarak VARIANT kodlanamaz.
  • VARIANT türü yalnızca 16 mb boyutuna kadar olan kayıtları tutabilir.

Not

Değişken, aşırı büyük kayıtları bozuk kayıtlara benzer şekilde ele alır. Varsayılan PERMISSIVE işleme modunda, çok büyük kayıtlar _malformed_data sütununda hatalı biçimlendirilmiş JSON kayıtlarıyla birlikte yakalanır.

JSON kaynağındaki tüm veriler tek bir VARIANT sütunu olarak kaydedildiğinden, alma sırasında şema evrimi gerçekleşmez ve rescuedDataColumn desteklenmez. Aşağıdaki örnekte, hedef tablonun tek bir VARIANT sütunuyla zaten var olduğu varsayılır.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Ayrıca bir şema tanımlarken veya VARIANTgeçirirken schemaHints belirtebilirsiniz. Başvuruda bulunılan kaynak alandaki veriler geçerli bir JSON dizesi içermelidir. Aşağıdaki örneklerde bu söz dizimi gösterilmektedir:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Varyant ile COPY INTO kullan

Databricks, mevcut olduğu durumda COPY INTO yerine Otomatik Yükleyici'nin kullanılmasını önerir.

COPY INTO bir JSON veri kaynağının tüm içeriğinin tek bir sütun olarak alımını destekler. Aşağıdaki örnek, tek bir VARIANT sütunuyla yeni bir tablo oluşturur ve ardından COPY INTO kullanarak bir JSON dosya kaynağından kayıt alır.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

Hedef tablodaki herhangi bir alanı VARIANTolarak da tanımlayabilirsiniz. komutunu çalıştırdığınızda COPY INTO, aşağıdaki örneklerde gösterildiği gibi veri kaynağındaki ilgili alanlar alınıp türe VARIANT dönüştürülür:

-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

Kafka verilerini değişken olarak akışla aktarma

Birçok Kafka akışı JSON kullanarak yüklerini kodlar. VARIANT kullanarak Kafka akışlarının alımı, bu iş yüklerinin şema değişikliklerine karşı sağlam olmasını sağlar.

Aşağıdaki örnekte Kafka akış kaynağını okuma, keySTRING ve valueVARIANTolarak atama ve hedef tabloya yazma işlemleri gösterilmektedir.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)