Aracılığıyla paylaş


SQL ile işlem hattı kodu geliştirme

DLT, işlem hatlarında gerçekleştirilmiş görünümleri ve akış tablolarını tanımlamaya yönelik birkaç yeni SQL anahtar sözcüğü ve işlevi sunar. İşlem hattı geliştirmeye yönelik SQL desteği, Spark SQL'in temellerini temel alır ve Yapılandırılmış Akış işlevselliği için destek ekler.

PySpark DataFrames hakkında bilgi sahibi olan kullanıcılar Python ile işlem hattı kodu geliştirmeyi tercih edebilir. Python, meta programlama işlemleri gibi SQL ile uygulanması zor olan daha kapsamlı test ve işlemleri destekler. Bkz. Pythonile işlem hattı kodu geliştirme.

DLT SQL söz diziminin tam başvurusu için bkz. DLT SQL dil başvurusu.

İşlem hattı geliştirme için SQL'in temelleri

DLT veri kümeleri oluşturan SQL kodu, sorgu sonuçlarına karşı malzemeleşmiş görünümler ve akış tabloları tanımlamak için CREATE OR REFRESH söz dizimini kullanır.

STREAM anahtar sözcüğü, bir SELECT yan tümcesinde başvuruda bulunılan veri kaynağının akış semantiğiyle okunması gerekip gerekmediğini belirtir.

İşlem hattı yapılandırması sırasında belirtilen kataloğa ve şemaya varsayılan olarak okur ve yazar. Bakınız Hedef kataloğu ve şemayı ayarla.

DLT kaynak kodu, SQL betiklerinden kritik ölçüde farklıdır: DLT, bir işlem hattında yapılandırılan tüm kaynak kodu dosyalarındaki tüm veri kümesi tanımlarını değerlendirir ve sorgular çalıştırilmeden önce bir veri akışı grafiği oluşturur. Not defterinde veya betikte görünen sorguların sırası, kod değerlendirme sırasını tanımlar, ancak sorgu yürütme sırasını tanımlamaz.

SQL ile maddileştirilmiş görünüm oluşturma

Aşağıdaki kod örneği, SQL ile gerçekleştirilmiş görünüm oluşturmaya yönelik temel söz dizimini gösterir:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

SQL ile akış tablosu oluşturma

Aşağıdaki kod örneği, SQL ile akış tablosu oluşturmaya yönelik temel söz dizimini gösterir:

Not

Tüm veri kaynakları akış okumalarını desteklemez ve bazı veri kaynakları her zaman akış semantiği ile işlenmelidir.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Verileri nesne deposundan yükleme

DLT, Azure Databricks tarafından desteklenen tüm biçimlerden veri yüklemeyi destekler. bkz. Veri biçimi seçenekleri.

Not

Bu örneklerde, çalışma alanınıza otomatik olarak bağlanan /databricks-datasets'da yer alan veriler kullanılır. Databricks, bulut nesne depolama alanında depolanan verilere başvurmak için birim yollarının veya bulut URI'lerinin kullanılmasını önerir. Bkz. Unity Kataloğu birimleri nelerdir?.

Databricks, artımlı alım iş yüklerini bulut nesne depolamasında depolanan verilere göre yapılandırırken Otomatik Yükleyici ve akış tablolarının kullanılmasını önerir. Bkz. Otomatik Yükleyici nedir?.

SQL, Otomatik Yükleyici işlevselliğini çağırmak için read_files işlevini kullanır. read_filesile bir akış okuması yapılandırmak için STREAM anahtar sözcüğünü de kullanmanız gerekir.

Aşağıdaki örnek, Otomatik Yükleyici kullanarak JSON dosyalarından bir akış tablosu oluşturur:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

read_files işlevi, materyalize edilmiş görünümler oluşturmak için toplu semantiği de destekler. Aşağıdaki örnek, JSON dizinini okumak ve gerçekleştirilmiş bir görünüm oluşturmak için toplu semantiği kullanır:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Verileri beklentilerle doğrulama

Veri kalitesi kısıtlamalarını ayarlamak ve uygulamak için beklentileri kullanabilirsiniz. bkz. İşlem hattı beklentileriyle veri kalitesini yönetme.

Aşağıdaki kod, veri alımı sırasında null olan kayıtları düşüren valid_data adlı bir beklenti tanımlar:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

İşlem hattınızda tanımlanan gerçekleştirilmiş görünümleri ve akış tablolarını sorgulayın

Aşağıdaki örnek dört veri kümesini tanımlar:

  • JSON verilerini yükleyen orders adlı bir akış tablosu.
  • customers adlı, CSV verilerini yükleyen maddi görünüm.
  • orders ve customers veri kümelerindeki kayıtları birleştiren, sipariş zaman damgasını bir tarih formatına dönüştüren ve customer_id, order_number, stateve order_date alanlarını seçen customer_orders adlı oluşturulmuş bir görünüm.
  • Her eyalet için günlük sipariş sayısını toplayan daily_orders_by_state adlı bir materialize edilmiş görünüm.

Not

İşlem hattınızdaki görünümleri veya tabloları sorgularken, kataloğu ve şemayı doğrudan belirtebilir veya işlem hattınızda yapılandırılan varsayılanları kullanabilirsiniz. Bu örnekte, orders, customersve customer_orders tabloları, işlem hattınız için yapılandırılan varsayılan katalogdan ve şemadan yazılır ve okunur.

Eski yayımlama modu, işlem hattınızda tanımlanan diğer gerçekleştirilmiş görünümleri ve akış tablolarını sorgulamak için LIVE şemasını kullanır. Yeni işlem hatlarında, LIVE şema söz dizimi görmezden gelinir. Bkz. LIVE şeması (eski).

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;