SQL ile işlem hattı kodu geliştirme
DLT, işlem hatlarında gerçekleştirilmiş görünümleri ve akış tablolarını tanımlamaya yönelik birkaç yeni SQL anahtar sözcüğü ve işlevi sunar. İşlem hattı geliştirmeye yönelik SQL desteği, Spark SQL'in temellerini temel alır ve Yapılandırılmış Akış işlevselliği için destek ekler.
PySpark DataFrames hakkında bilgi sahibi olan kullanıcılar Python ile işlem hattı kodu geliştirmeyi tercih edebilir. Python, meta programlama işlemleri gibi SQL ile uygulanması zor olan daha kapsamlı test ve işlemleri destekler. Bkz. Pythonile işlem hattı kodu geliştirme.
DLT SQL söz diziminin tam başvurusu için bkz. DLT SQL dil başvurusu.
İşlem hattı geliştirme için SQL'in temelleri
DLT veri kümeleri oluşturan SQL kodu, sorgu sonuçlarına karşı malzemeleşmiş görünümler ve akış tabloları tanımlamak için CREATE OR REFRESH
söz dizimini kullanır.
STREAM
anahtar sözcüğü, bir SELECT
yan tümcesinde başvuruda bulunılan veri kaynağının akış semantiğiyle okunması gerekip gerekmediğini belirtir.
İşlem hattı yapılandırması sırasında belirtilen kataloğa ve şemaya varsayılan olarak okur ve yazar. Bakınız Hedef kataloğu ve şemayı ayarla.
DLT kaynak kodu, SQL betiklerinden kritik ölçüde farklıdır: DLT, bir işlem hattında yapılandırılan tüm kaynak kodu dosyalarındaki tüm veri kümesi tanımlarını değerlendirir ve sorgular çalıştırilmeden önce bir veri akışı grafiği oluşturur. Not defterinde veya betikte görünen sorguların sırası, kod değerlendirme sırasını tanımlar, ancak sorgu yürütme sırasını tanımlamaz.
SQL ile maddileştirilmiş görünüm oluşturma
Aşağıdaki kod örneği, SQL ile gerçekleştirilmiş görünüm oluşturmaya yönelik temel söz dizimini gösterir:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
SQL ile akış tablosu oluşturma
Aşağıdaki kod örneği, SQL ile akış tablosu oluşturmaya yönelik temel söz dizimini gösterir:
Not
Tüm veri kaynakları akış okumalarını desteklemez ve bazı veri kaynakları her zaman akış semantiği ile işlenmelidir.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Verileri nesne deposundan yükleme
DLT, Azure Databricks tarafından desteklenen tüm biçimlerden veri yüklemeyi destekler. bkz. Veri biçimi seçenekleri.
Not
Bu örneklerde, çalışma alanınıza otomatik olarak bağlanan /databricks-datasets
'da yer alan veriler kullanılır. Databricks, bulut nesne depolama alanında depolanan verilere başvurmak için birim yollarının veya bulut URI'lerinin kullanılmasını önerir. Bkz. Unity Kataloğu birimleri nelerdir?.
Databricks, artımlı alım iş yüklerini bulut nesne depolamasında depolanan verilere göre yapılandırırken Otomatik Yükleyici ve akış tablolarının kullanılmasını önerir. Bkz. Otomatik Yükleyici nedir?.
SQL, Otomatik Yükleyici işlevselliğini çağırmak için read_files
işlevini kullanır.
read_files
ile bir akış okuması yapılandırmak için STREAM
anahtar sözcüğünü de kullanmanız gerekir.
Aşağıdaki örnek, Otomatik Yükleyici kullanarak JSON dosyalarından bir akış tablosu oluşturur:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
read_files
işlevi, materyalize edilmiş görünümler oluşturmak için toplu semantiği de destekler. Aşağıdaki örnek, JSON dizinini okumak ve gerçekleştirilmiş bir görünüm oluşturmak için toplu semantiği kullanır:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Verileri beklentilerle doğrulama
Veri kalitesi kısıtlamalarını ayarlamak ve uygulamak için beklentileri kullanabilirsiniz. bkz. İşlem hattı beklentileriyle veri kalitesini yönetme.
Aşağıdaki kod, veri alımı sırasında null olan kayıtları düşüren valid_data
adlı bir beklenti tanımlar:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
İşlem hattınızda tanımlanan gerçekleştirilmiş görünümleri ve akış tablolarını sorgulayın
Aşağıdaki örnek dört veri kümesini tanımlar:
- JSON verilerini yükleyen
orders
adlı bir akış tablosu. -
customers
adlı, CSV verilerini yükleyen maddi görünüm. -
orders
vecustomers
veri kümelerindeki kayıtları birleştiren, sipariş zaman damgasını bir tarih formatına dönüştüren vecustomer_id
,order_number
,state
veorder_date
alanlarını seçencustomer_orders
adlı oluşturulmuş bir görünüm. - Her eyalet için günlük sipariş sayısını toplayan
daily_orders_by_state
adlı bir materialize edilmiş görünüm.
Not
İşlem hattınızdaki görünümleri veya tabloları sorgularken, kataloğu ve şemayı doğrudan belirtebilir veya işlem hattınızda yapılandırılan varsayılanları kullanabilirsiniz. Bu örnekte, orders
, customers
ve customer_orders
tabloları, işlem hattınız için yapılandırılan varsayılan katalogdan ve şemadan yazılır ve okunur.
Eski yayımlama modu, işlem hattınızda tanımlanan diğer gerçekleştirilmiş görünümleri ve akış tablolarını sorgulamak için LIVE
şemasını kullanır. Yeni işlem hatlarında, LIVE
şema söz dizimi görmezden gelinir. Bkz. LIVE şeması (eski).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;