Udostępnij za pośrednictwem


Przekształcanie danych za pomocą potoków

W tym artykule opisano, jak można użyć biblioteki DLT do deklarowania przekształceń w zestawach danych i określania sposobu przetwarzania rekordów za pomocą logiki zapytań. Zawiera również przykłady typowych wzorców przekształcania do budowania pipeline'ów DLT.

Zestaw danych można zdefiniować względem dowolnego zapytania, które zwraca ramkę danych. Możesz użyć wbudowanych operacji platformy Apache Spark, funkcji zdefiniowanych przez użytkownika, niestandardowej logiki oraz modeli MLflow do przekształceń w potoku DLT. Po zaimportowaniu danych do potoku DLT można zdefiniować nowe zestawy danych na podstawie źródeł nadrzędnych w celu utworzenia nowych tabel strumieniowych, widoków zmaterializowanych i widoków.

Aby dowiedzieć się, jak skutecznie wykonywać przetwarzanie stanowe z DLT, zobacz Optymalizowanie przetwarzania stanowego w DLT za pomocą znaków wodnych.

Kiedy używać widoków, zmaterializowanych widoków i tabel przesyłania strumieniowego

Podczas implementowania zapytań w ramach potoków wybierz najlepszy typ zestawu danych, aby upewnić się, że są wydajne i możliwe do utrzymania.

Rozważ użycie widoku, aby wykonać następujące czynności:

  • Podziel duże lub złożone zapytania na łatwiejsze do zarządzania części.
  • Zweryfikuj wyniki pośrednie przy użyciu oczekiwań.
  • Zmniejsz koszty magazynowania i zasobów obliczeniowych, aby uzyskać wyniki, których nie trzeba utrwalać. Ponieważ tabele są zmaterializowane, wymagają dodatkowych zasobów obliczeniowych i magazynu.

Rozważ użycie zmaterializowanego widoku, gdy:

  • Wiele zapytań podrzędnych korzysta z tabeli. Ponieważ widoki są obliczane na żądanie, widok jest obliczany ponownie za każdym razem, gdy jest wykonywane zapytanie dotyczące widoku.
  • Inne potoki, zadania lub zapytania używają tabeli. Ponieważ widoki nie są zmaterializowane, można ich używać tylko w tym samym przepływie.
  • Chcesz wyświetlić wyniki zapytania podczas opracowywania. Ponieważ tabele są zmaterializowane i mogą być wyświetlane i odpytywane poza potokiem, użycie tabel podczas programowania może pomóc zweryfikować poprawność obliczeń. Po zweryfikowaniu przekonwertuj zapytania, które nie wymagają materializacji w widoki.

Rozważ użycie tabeli przesyłania strumieniowego, gdy:

  • Zapytanie jest definiowane względem źródła danych, które stale lub przyrostowo rośnie.
  • Wyniki zapytania powinny być obliczane przyrostowo.
  • Potok wymaga wysokiej przepływności i małych opóźnień.

Notatka

Tabele przesyłania strumieniowego są zawsze definiowane względem źródeł przesyłania strumieniowego. Można również używać źródeł przesyłania strumieniowego z APPLY CHANGES INTO do stosowania aktualizacji z kanałów CDC. Zobacz API ZASTOSUJ ZMIANY: ułatwiają przechwytywanie zmian danych za pomocą DLT.

Wykluczanie tabel ze schematu docelowego

Jeśli musisz obliczyć tabele pośrednie, które nie są przeznaczone do użycia zewnętrznego, możesz uniemożliwić ich publikowanie w schemacie przy użyciu słowa kluczowego TEMPORARY. Tabele tymczasowe nadal przechowują i przetwarzają dane zgodnie z semantyką DLT, ale nie powinno się do nich uzyskiwać dostępu poza bieżącym potokiem. Tabela tymczasowa jest utrwalana przez okres istnienia potoku, który go tworzy. Użyj następującej składni, aby zadeklarować tabele tymczasowe:

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

Pyton

@dlt.table(
  temporary=True)
def temp_table():
  return ("...")

Łączenie tabel przesyłania strumieniowego i zmaterializowanych widoków w jednym potoku

Tabele przesyłania strumieniowego dziedziczą gwarancje przetwarzania przesyłania strumieniowego ze strukturą platformy Apache Spark i są skonfigurowane do przetwarzania zapytań ze źródeł danych tylko do dołączania, gdzie nowe wiersze są zawsze wstawiane do tabeli źródłowej, a nie modyfikowane.

Notatka

Mimo że domyślnie tabele przesyłania strumieniowego wymagają źródeł danych tylko do dołączania, gdy źródło przesyłania strumieniowego jest inną tabelą przesyłania strumieniowego, która wymaga aktualizacji lub usunięcia, można zastąpić to zachowanie za pomocą flagi skipChangeCommits .

Typowy wzorzec przesyłania strumieniowego obejmuje pozyskiwanie danych źródłowych w celu utworzenia początkowych zestawów danych w potoku. Te początkowe zestawy danych są często nazywane tabelami z brązu i często wykonują proste przekształcenia.

Natomiast ostateczne tabele w potoku, nazywane często złotymi tabelami, często wymagają złożonych agregacji lub odczytywania z celów operacji typu APPLY CHANGES INTO. Ponieważ te operacje z natury tworzą aktualizacje, a nie dołączają, nie są one obsługiwane jako dane wejściowe do tabel przesyłania strumieniowego. Te przekształcenia są bardziej odpowiednie dla zmaterializowanych widoków.

Łącząc tabele strumieniowe i widoki zmaterializowane w jedną linię przetwarzania, można uprościć przetwarzanie, uniknąć kosztownego ponownego pozyskiwania lub ponownego przetwarzania surowych danych oraz mieć pełną moc języka SQL do obliczania złożonych agregacji w wydajnie zakodowanym i filtrowanym zbiorze danych. W poniższym przykładzie przedstawiono ten typ przetwarzania mieszanego:

Notatka

W tych przykładach użyto modułu automatycznego ładującego do ładowania plików z magazynu w chmurze. Aby załadować pliki za pomocą automatycznego modułu ładującego w potoku z włączonym Unity Catalog, należy użyć lokalizacji zewnętrznych. Aby dowiedzieć się więcej na temat korzystania z Unity Catalog z DLT, zobacz Use Unity Catalog with your DLT pipelines.

Pyton

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return spark.readStream.table("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return spark.readStream.table("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
  "abfss://path/to/raw/data",
  format => "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id

Dowiedz się więcej na temat używania Auto Loader do przyrostowego pozyskiwania plików JSON z Azure Storage.

statyczne łączenia strumieniowe

Statyczne sprzężenia strumienia są dobrym wyborem w przypadku denormalizacji ciągłego strumienia danych tylko do dodawania z głównie statyczną tabelą wymiarów.

Po każdej aktualizacji potoku nowe rekordy ze strumienia są łączone z najbardziej bieżącą migawką tabeli statycznej. Jeśli rekordy są dodawane lub aktualizowane w tabeli statycznej po przetworzeniu odpowiednich danych z tabeli przesyłania strumieniowego, wynikowe rekordy nie zostaną ponownie obliczone, chyba że zostanie wykonane pełne odświeżenie.

W potokach skonfigurowanych do wyzwalanego wykonywania tabela statyczna zwraca wyniki z chwilą rozpoczęcia aktualizacji. W potokach skonfigurowanych do ciągłego wykonywania najbardziej aktualna wersja tabeli statycznej jest odpytywana za każdym razem, gdy tabela przetwarza aktualizację.

Poniżej przedstawiono przykład łączenia strumienia z danymi statycznymi:

Pyton

@dlt.table
def customer_sales():
  return spark.readStream.table("sales").join(spark.readStream.table("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
  INNER JOIN LEFT customers USING (customer_id)

Wydajne obliczanie agregacji

Tabele przesyłania strumieniowego umożliwiają przyrostowe obliczanie prostych agregacji dystrybucyjnych, takich jak liczba, minimalna, maksymalna lub suma oraz agregacje algebraiczne, takie jak średnia lub odchylenie standardowe. Usługa Databricks zaleca agregację przyrostową dla zapytań z ograniczoną liczbą grup, takich jak zapytanie z klauzulą GROUP BY country. Tylko nowe dane wejściowe są odczytywane przy każdej aktualizacji.

Aby dowiedzieć się więcej na temat pisania zapytań DLT wykonujących agregacje przyrostowe, zobacz Wykonywanie agregacji opartych na oknach czasowych z użyciem znaczników czasowych.

Użyj modeli MLflow w potoku DLT

Notatka

Aby używać modeli MLflow w potoku z włączonym katalogiem Unity, potok musi być skonfigurowany do korzystania z kanału preview. Aby użyć kanału current, należy skonfigurować swój potok danych do publikowania w magazynie metadanych Hive.

Można używać modeli wytrenowanych w MLflow w potokach DLT. Modele MLflow są traktowane jako przekształcenia w usłudze Azure Databricks, co oznacza, że działają na wejściowej ramce danych Apache Spark i zwracają wyniki jako ramkę danych Apache Spark. Ponieważ biblioteka DLT definiuje zestawy danych względem ramek danych, można przekonwertować obciążenia platformy Apache Spark, które używają biblioteki MLflow do biblioteki DLT z zaledwie kilkoma wierszami kodu. Aby uzyskać więcej informacji na temat platformy MLflow, zobacz MLflow for gen AI agent and ML model lifecycle.

Jeśli masz już notatnik języka Python wywołujący model MLflow, możesz dostosować ten kod do DLT, używając dekoratora @dlt.table i zapewniając, że funkcje są zdefiniowane, aby zwracały wyniki transformacji. Biblioteka DLT domyślnie nie instaluje bibliotek MLflow, dlatego upewnij się, że zainstalowano biblioteki MLFlow z %pip install mlflow i zaimportowano mlflow i dlt w górnej części notesu. Aby uzyskać wprowadzenie do składni DLT, zobacz Tworzenie kodu potoku za pomocą języka Python.

Aby użyć modeli MLflow w technologii DLT, wykonaj następujące kroki:

  1. Uzyskaj identyfikator przebiegu i nazwę modelu MLflow. Identyfikator przebiegu i nazwa modelu służą do tworzenia identyfikatora URI modelu MLflow.
  2. Użyj identyfikatora URI, aby zdefiniować funkcję UDF platformy Spark w celu załadowania modelu MLflow.
  3. Wywołaj funkcję UDF w definicjach tabeli, aby użyć modelu MLflow.

W poniższym przykładzie przedstawiono podstawową składnię dla tego wzorca:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return spark.read.table(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

Jako kompletny przykład poniższy kod definiuje funkcję UDF platformy Spark o nazwie loaded_model_udf, która ładuje model MLflow wyszkolony na podstawie danych o ryzyku kredytowym. Kolumny danych używane do przewidywania są przekazywane jako argument do funkcji zdefiniowanej przez użytkownika. Tabela loan_risk_predictions oblicza przewidywania dla każdego wiersza w loan_risk_input_data.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return spark.read.table("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

zachowywać ręczne usuwanie oraz aktualizacje

Biblioteka DLT umożliwia ręczne usuwanie lub aktualizowanie rekordów z tabeli i wykonywanie operacji odświeżania w celu ponownego skompilowania tabel podrzędnych.

Domyślnie funkcja DLT ponownie skompiluje wyniki tabeli na podstawie danych wejściowych za każdym razem, gdy potok zostanie zaktualizowany, dlatego należy się upewnić, że usunięty rekord nie zostanie ponownie załadowany z danych źródłowych. Ustawienie właściwości tabeli pipelines.reset.allowed na wartość false uniemożliwia odświeżanie tabeli, ale nie zapobiega przyrostowym zapisom w tabelach lub nowym danych przepływanych do tabeli.

Na poniższym diagramie przedstawiono przykład użycia dwóch tabel przesyłania strumieniowego:

  • raw_user_table pozyskuje nieprzetworzone dane użytkownika ze źródła.
  • bmi_table przyrostowo oblicza wyniki BMI przy użyciu wagi i wysokości z raw_user_table.

Chcesz ręcznie usunąć lub zaktualizować rekordy użytkowników z raw_user_table i ponownie skompilować bmi_table.

Zachowaj diagram danych

Poniższy kod demonstruje ustawienie właściwości tabeli pipelines.reset.allowed na false, aby wyłączyć pełne odświeżanie dla raw_user_table tak, aby zamierzone zmiany zostały zachowane w czasie, ale tabele podrzędne są ponownie obliczane po uruchomieniu aktualizacji potoku:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);