Yarı yapılandırılmış değişken türü olarak veri alma
Önemli
Bu özellik Genel Önizlemededir.
Databricks Runtime 15.3 ve üzerindeki sürümlerde, VARIANT
türünü kullanarak yarı yapılandırılmış verileri alabilirsiniz. Bu makalede davranış açıklanır ve Otomatik Yükleyici ve COPY INTO
kullanarak bulut nesne depolama alanından veri almak, Kafka'dan kayıt akışı yapmak ve değişken verilerle yeni tablolar oluşturmak veya değişken türünü kullanarak yeni kayıtlar eklemek için SQL komutları için örnek desenler sağlanır.
Bkz. Değişken verileri sorgulama.
Değişken sütunlu tablo oluşturma
VARIANT
, Databricks Runtime 15.3 ve üzeri sürümlerin standart bir SQL türüdür ve Delta Lake tarafından desteklenen tablolar tarafından desteklenir. Azure Databricks'te yönetilen tablolar varsayılan olarak Delta Lake kullanır, böylece aşağıdaki söz dizimini kullanarak tek bir VARIANT
sütunu olan boş bir tablo oluşturabilirsiniz:
CREATE TABLE table_name (variant_column VARIANT)
Alternatif olarak, değişken sütunlu bir tablo oluşturmak üzere CTAS deyimini kullanmak için JSON dizesinde PARSE_JSON
işlevini kullanabilirsiniz. Aşağıdaki örnek, iki sütunlu bir tablo oluşturur:
- JSON dizesinden
id
sütunu,STRING
türü olarak ayıklanmıştır. -
variant_column
sütunu,VARIANT
türü olarak kodlanmış JSON dizesinin tamamını içerir.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Not Düşün
Databricks, sorguları hızlandırmak ve depolama düzenini iyileştirmek için kullanmayı planladığınız alanların değişken olmayan sütunlar olarak ayıklanıp depolanmasını önerir.
VARIANT
sütunlar kümeleme anahtarları, bölümler veya Z sırası anahtarları için kullanılamaz.
VARIANT
veri türü karşılaştırmalar, gruplandırma, sıralama ve ayarlama işlemleri için kullanılamaz. Sınırlamaların tam listesi için bkz. Sınırlamalar.
parse_json
kullanarak veri ekleme
Hedef tablo zaten VARIANT
olarak kodlanmış bir sütun içeriyorsa, aşağıdaki örnekte olduğu gibi JSON dize kayıtlarını parse_json
olarak eklemek için VARIANT
kullanabilirsiniz:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Bulut nesne depolama alanından değişken olarak veri alma
Databricks Runtime 15.3 ve üzeri sürümlerde, JSON kaynaklarından gelen tüm verileri hedef tabloda tek bir VARIANT
sütunu olarak yüklemek için Otomatik Yükleyici'yi kullanabilirsiniz.
VARIANT
şemaya ve tür değişikliklerine esnek olduğundan ve veri kaynağında bulunan büyük/küçük harf duyarlılığını ve NULL
değerlerini koruduğundan, bu düzen aşağıdaki istisnalarla çoğu yükleme senaryosu için sağlamdır:
- Hatalı biçimlendirilmiş JSON kayıtları tür kullanılarak
VARIANT
kodlanamaz. -
VARIANT
türü yalnızca 16 mb boyutuna kadar olan kayıtları tutabilir.
Not
Değişken, aşırı büyük kayıtları bozuk kayıtlara benzer şekilde ele alır. Varsayılan PERMISSIVE
işleme modunda, çok büyük kayıtlar _malformed_data
sütununda hatalı biçimlendirilmiş JSON kayıtlarıyla birlikte yakalanır.
JSON kaynağındaki tüm veriler tek bir VARIANT
sütunu olarak kaydedildiğinden, alma sırasında şema evrimi gerçekleşmez ve rescuedDataColumn
desteklenmez. Aşağıdaki örnekte, hedef tablonun tek bir VARIANT
sütunuyla zaten var olduğu varsayılır.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Ayrıca bir şema tanımlarken veya VARIANT
geçirirken schemaHints
belirtebilirsiniz. Başvuruda bulunılan kaynak alandaki veriler geçerli bir JSON dizesi içermelidir. Aşağıdaki örneklerde bu söz dizimi gösterilmektedir:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Varyant ile COPY INTO
kullan
Databricks, mevcut olduğu durumda COPY INTO
yerine Otomatik Yükleyici'nin kullanılmasını önerir.
COPY INTO
bir JSON veri kaynağının tüm içeriğinin tek bir sütun olarak alımını destekler. Aşağıdaki örnek, tek bir VARIANT
sütunuyla yeni bir tablo oluşturur ve ardından COPY INTO
kullanarak bir JSON dosya kaynağından kayıt alır.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Hedef tablodaki herhangi bir alanı VARIANT
olarak da tanımlayabilirsiniz. komutunu çalıştırdığınızda COPY INTO
, aşağıdaki örneklerde gösterildiği gibi veri kaynağındaki ilgili alanlar alınıp türe VARIANT
dönüştürülür:
-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
Kafka verilerini değişken olarak akışla aktarma
Birçok Kafka akışı JSON kullanarak yüklerini kodlar.
VARIANT
kullanarak Kafka akışlarının alımı, bu iş yüklerinin şema değişikliklerine karşı sağlam olmasını sağlar.
Aşağıdaki örnekte Kafka akış kaynağını okuma, key
STRING
ve value
VARIANT
olarak atama ve hedef tabloya yazma işlemleri gösterilmektedir.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)