Aracılığıyla paylaş


İşlem hatları ile verileri dönüştürme

Bu makalede, veri kümelerinde dönüşümleri bildirmek ve kayıtların sorgu mantığı aracılığıyla nasıl işleneceğini belirtmek için DLT'yi nasıl kullanabileceğiniz açıklanmaktadır. Ayrıca DLT işlem hatları oluşturmaya yönelik yaygın dönüştürme desenlerine örnekler içerir.

DataFrame döndüren herhangi bir sorguda bir veri kümesi tanımlayabilirsiniz. DLT işlem hattınızda dönüşüm olarak Apache Spark yerleşik işlemlerini, UDF'leri, özel mantığı ve MLflow modellerini kullanabilirsiniz. Veriler DLT işlem hattınıza alındıktan sonra, kaynaklardan gelen veriler üzerinden yeni akış tabloları, maddileştirilmiş görünümler ve görünümler oluşturmak için yeni veri kümeleri tanımlayabilirsiniz.

DLT ile durum bilgisi olan işlemeyi etkili bir şekilde gerçekleştirmeyi öğrenmek için bkz. Filigranlarla DLT'de durum bilgisi işlemeyi optimize etme.

Görünümler, gerçekleştirilmiş görünümler ve akış tabloları ne zaman kullanılır?

İşlem hattı sorgularınızı uygularken verimli ve sürdürülebilir olduklarından emin olmak için en iyi veri kümesi türünü seçin.

Aşağıdakileri yapmak için bir görünüm kullanmayı düşünün:

  • İstediğiniz büyük veya karmaşık bir sorguyu daha kolay yönetilebilir sorgulara bölün.
  • Beklentileri kullanarak ara sonuçları doğrulayın.
  • Kalıcı olması gerekmeyen sonuçlar için depolama ve işlem maliyetlerini azaltın. Tablolar materyalize edildiğinden, ek hesaplama ve depolama kaynakları gerektirir.

Aşağıdaki durumlarda materyalize edilmiş bir görünüm kullanmayı göz önünde bulundurun.

  • Birden çok alt sorgu tabloyu tüketir. Görünümler isteğe bağlı olarak hesaplandığından, görünüm her sorgulandığında yeniden hesaplanır.
  • Diğer işlem hatları, işler veya sorgular tabloyu tüketir. Görünümler gerçekleştirilmediğinden, bunları yalnızca aynı işlem hattında kullanabilirsiniz.
  • Geliştirme sırasında sorgunun sonuçlarını görüntülemek istiyorsunuz. Tablolar gerçekleştirilmiş olduğundan ve işlem hattı dışında görüntülenebildiği ve sorgulanabildiği için, geliştirme sırasında tabloların kullanılması hesaplamaların doğruluğunu doğrulamaya yardımcı olabilir. Doğruladıktan sonra, gerçekleştirme gerektirmeyen sorguları görünümlere dönüştürün.

Aşağıdaki durumlarda akış tablosu kullanmayı göz önünde bulundurun:

  • Sürekli veya artımlı olarak büyüyen bir veri kaynağında sorgu tanımlanır.
  • Sorgu sonuçları artımlı olarak hesaplanmalıdır.
  • İşlem hattı için yüksek aktarım hızı ve düşük gecikme süresi gerekir.

Not

Akış tabloları her zaman akış kaynaklarına göre tanımlanır. Güncellemeleri CDC akışlarından uygulamak için APPLY CHANGES INTO ile yayın kaynaklarını da kullanabilirsiniz. Bkz. DEĞIŞIKLIKLERI UYGULA API'leri: DLTile değişiklik verilerini yakalamayı basitleştirme.

Tabloları hedef şemanın dışında tutma

Dış tüketime yönelik olmayan ara tabloları hesaplamanız gerekiyorsa, TEMPORARY anahtar sözcüğünü kullanarak bunların bir şemada yayımlanmasını engelleyebilirsiniz. Geçici tablolar verileri DLT semantiğine göre depolamaya ve işlemeye devam eder ancak geçerli işlem hattı dışından erişilmemelidir. Geçici bir tablo, bunu oluşturan işlem hattının ömrü boyunca kalır. Geçici tabloları bildirmek için aşağıdaki söz dizimini kullanın:

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

Piton

@dlt.table(
  temporary=True)
def temp_table():
  return ("...")

Akış tablolarını ve gerçekleştirilmiş görünümleri tek bir işlem hattında birleştirme

Akış tabloları, Apache Spark Yapılandırılmış Akış'ın işleme garantilerini devralır ve yeni satırların her zaman değiştirilmek yerine kaynak tabloya eklendiği, yalnızca ekleme veri kaynaklarından gelen sorguları işlemek üzere yapılandırılır.

Not

Varsayılan olarak, akış tabloları yalnızca ek veri kaynakları gerektirir, ancak akış kaynağı güncelleştirme veya silme gerektiren başka bir akış tablosuysa, skipChangeCommits bayrağıyla bu davranışı geçersiz kılabilirsiniz.

Yaygın bir akış düzeni, bir işlem hattında ilk veri kümelerini oluşturmak için kaynak verilerin alımını içerir. Bu ilk veri kümeleri genellikle bronz tablolar olarak adlandırılır ve genellikle basit dönüştürmeler gerçekleştirir.

Buna karşılık, genellikle altın tablolar olarak adlandırılan işlem hattındaki son tablolar genellikle karmaşık toplamalar veya APPLY CHANGES INTO işleminin hedeflerinden okuma gerektirir. Bu işlemler doğal olarak eklemeler yerine güncelleştirmeler oluşturduğundan, akış tablolarına giriş olarak desteklenmez. Bu dönüşümler materyalize edilmiş görünümler için daha uygundur.

Akış tablolarını ve gerçekleştirilmiş görünümleri tek bir işlem hattında birleştirerek işlem hattınızı basitleştirebilir, ham verilerin yüksek maliyetli bir şekilde yeniden alımını veya yeniden işlenmesini önleyebilir ve verimli bir şekilde kodlanmış ve filtrelenmiş bir veri kümesi üzerinde karmaşık toplamaları hesaplamak için SQL'in tüm gücüne sahip olabilirsiniz. Aşağıdaki örnekte bu tür bir karma işleme gösterilmektedir:

Not

Bu örneklerde, bulut depolamadan dosya yüklemek için Otomatik Yükleyici kullanılır. Unity Kataloğu etkin bir işlem hattında Otomatik Yükleyici ile dosya yüklemek için dış konumları kullanmanız gerekir. Unity Kataloğu'nu DLT ile kullanma hakkında daha fazla bilgi edinmek için bkz. Unity Kataloğu'nu DLT işlem hatlarınızla kullanma.

Piton

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return spark.readStream.table("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return spark.readStream.table("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
  "abfss://path/to/raw/data",
  format => "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id

JSON dosyalarını Azure depolama alanından artımlı olarak almak için Otomatik Yükleyici kullanma hakkında daha fazla bilgi edinin.

Akış-statik birleşimler

Akış-statik birleşimler, öncelikle statik bir boyut tablosu içeren yalnızca eklenebilir verilerin sürekli akışını denormalize ettiğinizde iyi bir seçimdir.

Her işlem hattı güncelleştirmesi ile akıştan yeni kayıtlar statik tablonun en güncel anlık görüntüsüyle birleştirilir. Akış tablosundan karşılık gelen veriler işlendikten sonra statik tabloya kayıtlar eklenir veya güncelleştirilirse, tam yenileme yapılmadığı sürece sonuç kayıtları yeniden hesaplanmaz.

Tetiklenen yürütme için yapılandırılmış işlem hatlarında, statik tablo güncelleştirmenin başlatıldığı anda sonuçları döndürür. Sürekli yürütme için yapılandırılmış işlem hatlarında, tablo bir güncelleştirmeyi her işlediğinde statik tablonun en son sürümü sorgulanır.

Aşağıda bir akış-statistik birleştirme örneği verilmiştir.

Piton

@dlt.table
def customer_sales():
  return spark.readStream.table("sales").join(spark.readStream.table("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
  INNER JOIN LEFT customers USING (customer_id)

Toplamaları verimli bir şekilde hesaplama

Akış tablolarını kullanarak count, min, max veya sum gibi basit dağıtım toplamalarını ve ortalama veya standart sapma gibi cebirsel toplamaları artımlı olarak hesaplayabilirsiniz. Databricks, GROUP BY country yan tümcesine sahip bir sorgu gibi sınırlı sayıda gruba sahip sorgular için artımlı toplama önerir. Her güncelleştirmede yalnızca yeni giriş verileri okunur.

DLT sorguları yazarken artımlı toplamalar hakkında daha fazla bilgi edinmek için bkz. Filigranlarla pencereli toplamalar gerçekleştirme.

DLT işlem hattında MLflow modellerini kullanma

Not

Unity Kataloğu özellikli bir işlem hattında MLflow modellerini kullanmak için işlem hattınızın preview kanalını kullanacak şekilde yapılandırılması gerekir. current kanalını kullanmak için işlem hattınızı Hive meta veri deposunda yayımlayacak şekilde yapılandırmanız gerekir.

DLT işlem hatlarında MLflow tarafından eğitilen modelleri kullanabilirsiniz. MLflow modelleri, Azure Databricks'te dönüşüm olarak değerlendirilir; bu da spark DataFrame girişi üzerine hareket ettikleri ve sonuçları Spark DataFrame olarak döndürdikleri anlamına gelir. DLT, DataFrame'ler için veri kümelerini tanımladığından, MLflow kullanan Apache Spark iş yüklerini yalnızca birkaç satır kodla DLT'ye dönüştürebilirsiniz. MLflow hakkında daha fazla bilgi için bkz. MLflow for gen AI agent and ML model lifecycle.

MLflow modelini çağıran bir Python not defteriniz varsa, @dlt.table dekoratörünü kullanarak ve dönüştürme sonuçlarını döndürmek için işlevlerin tanımlandığından emin olarak bu kodu DLT'ye uyarlayabilirsiniz. DLT varsayılan olarak MLflow'u yüklemez, bu nedenle MLFlow kitaplıklarını %pip install mlflow ile yüklediğinizi ve not defterinizin en üstünde mlflow ve dlt içeri aktardığınızdan emin olun. DLT söz dizimine giriş için bkz. Python ile işlem hattı kodu geliştirme.

DLT'de MLflow modellerini kullanmak için aşağıdaki adımları tamamlayın:

  1. MLflow modelinin çalıştırma kimliğini ve model adını alın. Çalıştırma kimliği ve model adı, MLflow modelinin URI'sini oluşturmak için kullanılır.
  2. MLflow modelini yüklemek üzere Spark UDF tanımlamak için URI'yi kullanın.
  3. MLflow modelini kullanmak için tablo tanımlarınızdaki UDF'yi çağırın.

Aşağıdaki örnekte bu desen için temel söz dizimi gösterilmektedir:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return spark.read.table(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

Tam bir örnek olarak aşağıdaki kod, kredi riski verileri üzerinde eğitilmiş bir MLflow modelini yükleyen loaded_model_udf adlı bir Spark UDF tanımlar. Tahmin yapmak için kullanılan veri sütunları, bağımsız değişken olarak UDF'ye iletilir. tablo loan_risk_predictionsloan_risk_input_dataiçindeki her satır için tahminleri hesaplar.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return spark.read.table("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

El ile silmeleri veya güncelleştirmeleri saklama

DLT, bir tablodan kayıtları el ile silmenize veya güncelleştirmenize ve aşağı akış tablolarını yeniden derlemek için yenileme işlemi yapmanıza olanak tanır.

Varsayılan olarak, DLT bir işlem hattı her güncelleştirildiğinde giriş verilerini temel alarak tablo sonuçlarını yeniden derler, bu nedenle silinen kaydın kaynak verilerden yeniden yüklenmediğinden emin olmanız gerekir. pipelines.reset.allowed tablo özelliğinin false olarak ayarlanması, tablodaki yenilemeleri engeller, ancak tablolara artımlı yazmaların veya yeni verilerin tabloya akmasını engellemez.

Aşağıdaki diyagramda iki akış tablosunun kullanıldığı bir örnek gösterilmektedir:

  • raw_user_table bir kaynaktan ham kullanıcı verilerini alır.
  • bmi_table, raw_user_tableağırlık ve boyunu kullanarak BMI puanlarını artımlı olarak hesaplar.

raw_user_table kullanıcı kayıtlarını el ile silmek veya güncellemek ve bmi_table'i yeniden hesaplamak istiyorsunuz.

Veri diyagramını tutma

Aşağıdaki kodda, pipelines.reset.allowed tablo özelliğinin false olarak ayarlandığı ve raw_user_table için tam yenilemeyi devre dışı bırakılacak şekilde ayarlandığı gösterilmektedir; böylece hedeflenen değişiklikler zaman içinde korunur, ancak işlem hattı güncelleştirmesi çalıştırıldığında aşağı akış tabloları yeniden derlenir:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);