Udostępnij za pośrednictwem


Upsert w tabeli Delta Lake przy użyciu operacji scalania

Możesz za pomocą operacji SQL MERGE upsertować dane z tabeli źródłowej, widoku lub ramy danych do docelowej tabeli Delta. Usługa Delta Lake obsługuje wstawianie, aktualizacje i usuwanie w MERGE oraz obsługuje rozszerzoną składnię wykraczającą poza standardy SQL, aby ułatwić zaawansowane przypadki użycia.

Mając tabelę źródłową o nazwie people10mupdates lub ścieżkę źródłową /tmp/delta/people-10m-updates, które zawierają nowe dane dla docelowej tabeli o nazwie people10m lub ścieżki docelowej pod adresem /tmp/delta/people-10m. Niektóre z tych nowych rekordów mogą już znajdować się w danych docelowych. Aby scalić nowe dane, chcesz zaktualizować wiersze, w których id danej osoby już istnieje, i wstawić nowe wiersze, gdzie nie ma pasujących id. Możesz uruchomić następujące zapytanie:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Ważne

Tylko jeden wiersz z tabeli źródłowej może być zgodny z danym wierszem w tabeli docelowej. W środowisku Databricks Runtime 16.0 lub nowszym, system ocenia warunki określone w klauzulach MERGE oraz WHEN MATCHED, aby określić zduplikowane dopasowania. W Databricks Runtime 15.4 LTS i wcześniejszych, MERGE operacje uwzględniają tylko warunki określone w klauzuli ON.

Zapoznaj się z dokumentacją interfejsu API usługi Delta Lake, aby uzyskać szczegółowe informacje o składni języka Scala i Python. Aby uzyskać szczegółowe informacje o składni SQL, zobacz MERGE INTO

Modyfikowanie wszystkich niedopasowanych wierszy przy użyciu scalania

W Databricks SQL i Databricks Runtime 12.2 LTS i nowszych można użyć WHEN NOT MATCHED BY SOURCE klauzuli do UPDATE lub DELETE rekordów w tabeli docelowej, które nie mają odpowiednich rekordów w tabeli źródłowej. Usługa Databricks zaleca dodanie opcjonalnej klauzuli warunkowej, aby uniknąć całkowitego ponownego zapisywania tabeli docelowej.

Poniższy przykład kodu przedstawia podstawową składnię używania tej metody do usuwania, zastępowanie tabeli docelowej zawartością tabeli źródłowej i usuwanie niezgodnych rekordów w tabeli docelowej. Aby uzyskać bardziej skalowalny wzorzec dla tabel, w których aktualizacje źródłowe i usunięcia są powiązane czasowo, zobacz Incrementally sync Delta table with source (Przyrostowa synchronizacja tabeli różnicowej ze źródłem).

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Poniższy przykład dodaje warunki do klauzuli WHEN NOT MATCHED BY SOURCE i określa wartości, które mają być aktualizowane w niedopasowanych wierszach docelowych.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Semantyka operacji scalania

Poniżej przedstawiono szczegółowy opis merge semantyki operacji programowych.

  • Może istnieć dowolna liczba klauzul whenMatched i whenNotMatched.

  • whenMatched Klauzule są wykonywane, gdy wiersz źródłowy pasuje do wiersza tabeli docelowej na podstawie warunku dopasowania. Te klauzule mają następującą semantyka.

    • whenMatched Klauzule mogą zawierać co najwyżej jedną update akcję i jedną delete . Akcja update w programie merge aktualizuje tylko określone kolumny (podobne do updateoperacji) dopasowanego wiersza docelowego. Akcja delete usuwa dopasowany wiersz.

    • Każda klauzula whenMatched może mieć opcjonalny warunek. Jeśli ten warunek klauzuli istnieje, akcja update lub delete jest wykonywana dla dowolnej pasującej pary wierszy źródłowych docelowych tylko wtedy, gdy warunek klauzuli jest spełniony.

    • Jeśli istnieje wiele whenMatched klauzul, są one oceniane w kolejności, w której są określone. Wszystkie whenMatched klauzule, z wyjątkiem ostatniego, muszą mieć warunki.

    • Jeśli żadna z whenMatched warunków nie zwróci wartości true dla pary wierszy źródłowych i docelowych pasujących do warunku scalania, wiersz docelowy pozostanie niezmieniony.

    • Aby zaktualizować wszystkie kolumny docelowej tabeli delty z odpowiednimi kolumnami źródłowego zestawu danych, użyj polecenia whenMatched(...).updateAll(). Jest to odpowiednik:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      dla wszystkich kolumn docelowej tabeli Delta. W związku z tym ta akcja zakłada, że tabela źródłowa ma te same kolumny co w tabeli docelowej. W przeciwnym razie zapytanie zgłasza błąd analizy.

      Uwaga

      To zachowanie zmienia się, gdy włączono automatyczną ewolucję schematu. Aby uzyskać szczegółowe informacje, zobacz automatyczną ewolucję schematu.

  • whenNotMatched Klauzule są wykonywane, gdy wiersz źródłowy nie pasuje do żadnego wiersza docelowego na podstawie warunku dopasowania. Te klauzule mają następującą semantyka.

    • whenNotMatched Klauzule mogą mieć tylko insert działanie. Nowy wiersz jest generowany na podstawie określonej kolumny i odpowiednich wyrażeń. Nie trzeba określać wszystkich kolumn w tabeli docelowej. W przypadku kolumn docelowych, które nie są określone, wstawiany jest NULL.

    • Każda klauzula whenNotMatched może mieć opcjonalny warunek. Jeśli warunek klauzuli jest obecny, wiersz źródłowy jest wstawiany tylko wtedy, gdy ten warunek jest spełniony dla tego wiersza. W przeciwnym razie kolumna źródłowa jest ignorowana.

    • Jeśli istnieje wiele whenNotMatched klauzul, są one oceniane w kolejności, w której są określone. Wszystkie whenNotMatched klauzule, z wyjątkiem ostatniego, muszą mieć warunki.

    • Aby wstawić wszystkie kolumny docelowej tabeli delty z odpowiednimi kolumnami źródłowego zestawu danych, użyj polecenia whenNotMatched(...).insertAll(). Jest to odpowiednik:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      dla wszystkich kolumn docelowej tabeli Delta. W związku z tym ta akcja zakłada, że tabela źródłowa ma te same kolumny co w tabeli docelowej. W przeciwnym razie zapytanie zgłasza błąd analizy.

      Uwaga

      To zachowanie zmienia się, gdy włączono automatyczną ewolucję schematu. Aby uzyskać szczegółowe informacje, zobacz automatyczną ewolucję schematu.

  • whenNotMatchedBySource Klauzule są wykonywane, gdy wiersz docelowy nie pasuje do żadnego wiersza źródłowego na podstawie warunku scalania. Te klauzule mają następującą semantyka.

    • whenNotMatchedBySource klauzule mogą określać delete i update akcje.
    • Każda klauzula whenNotMatchedBySource może mieć opcjonalny warunek. Jeśli warunek klauzuli jest obecny, wiersz docelowy jest modyfikowany tylko wtedy, gdy ten warunek jest spełniony dla tego wiersza. W przeciwnym razie wiersz docelowy pozostanie niezmieniony.
    • Jeśli istnieje wiele whenNotMatchedBySource klauzul, są one oceniane w kolejności, w której są określone. Wszystkie whenNotMatchedBySource klauzule, z wyjątkiem ostatniego, muszą mieć warunki.
    • Z definicji whenNotMatchedBySource klauzule nie mają wiersza źródłowego do ściągania wartości kolumn z, więc nie można odwoływać się do kolumn źródłowych. Dla każdej kolumny do zmodyfikowania można określić literał lub wykonać akcję w kolumnie docelowej, na przykład SET target.deleted_count = target.deleted_count + 1.

Ważne

  • Operacja merge może zakończyć się niepowodzeniem, jeśli wiele wierszy źródłowego zestawu danych pasuje, a scalanie próbuje zaktualizować te same wiersze w docelowej Tabeli Delta. Według semantyki SQL scalania taka operacja aktualizacji jest niejednoznaczna, ponieważ nie jest jasne, który wiersz źródłowy powinien być używany do aktualizowania dopasowanego wiersza docelowego. Możesz wstępnie przetworzyć tabelę źródłową, aby wyeliminować możliwość wielu dopasowań.
  • Operację SQL można zastosować w widoku SQL MERGE tylko wtedy, gdy widok został zdefiniowany jako CREATE VIEW viewName AS SELECT * FROM deltaTable.

Deduplikacja danych podczas zapisywania w tabelach Delta

Typowy przypadek użycia ETL polega na zbieraniu dzienników do tabeli Delta poprzez ich dodawanie. Jednak często źródła mogą generować zduplikowane rekordy dziennika, a kroki deduplikacji podrzędnej są potrzebne do ich obsługi. W programie mergemożna uniknąć wstawiania zduplikowanych rekordów.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Uwaga

Zestaw danych zawierający nowe dzienniki musi być deduplikowany wewnętrznie. Semantyka scalania w SQL dopasowuje nowe dane do istniejących i usuwa duplikaty, ale jeśli w nowym zestawie danych występują duplikaty, zostaną one wstawione. Dlatego usuń duplikaty nowych danych przed scaleniem z tabelą.

Jeśli wiesz, że możesz uzyskać zduplikowane rekordy tylko przez kilka dni, możesz zoptymalizować zapytanie jeszcze bardziej, poprzez partycjonowanie tabeli według daty, a następnie określając zakres dat tabeli docelowej, aby się z nim zgadzał.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Jest to bardziej wydajne niż poprzednie polecenie, ponieważ wyszukuje duplikaty tylko w ciągu ostatnich 7 dni dzienników, a nie w całej tabeli. Ponadto można użyć scalania tylko dodającego w połączeniu ze Strukturami Strumieniowymi do ciągłej deduplikacji dzienników.

  • W zapytaniu strumieniowym można użyć operacji scalania w foreachBatch, aby stale zapisywać dane strumieniowe do tabeli Delty z deduplikacją. Aby uzyskać więcej informacji, zobacz poniższy przykład przesyłania strumieniowego dotyczący foreachBatch.
  • W innym zapytaniu przesyłanym strumieniowo można stale odczytywać deduplikowane dane z tej tabeli delty. Jest to możliwe, ponieważ scalanie z tylko dodawaniem dołącza nowe dane do Tabeli Delta.

Powolne zmienianie danych (SCD) i przechwytywanie zmian w danych (CDC) z Delta Lake

DLT ma wbudowaną obsługę śledzenia i stosowania SCD Typ 1 i Typ 2. Użyj APPLY CHANGES INTO z DLT, aby upewnić się, że nieuporządkowane rekordy są prawidłowo obsługiwane podczas przetwarzania strumieni CDC. Zobacz API "APPLY CHANGES": Dla uproszczenia przechwytywania danych o zmianach za pomocą DLT.

Przyrostowa synchronizacja tabeli delty ze źródłem

W Databricks SQL i Databricks Runtime 12.2 LTS i nowszych można użyć WHEN NOT MATCHED BY SOURCE do tworzenia dowolnych warunków w celu atomowego usunięcia i zastąpienia części tabeli. Może to być szczególnie przydatne, gdy masz tabelę źródłową, w której rekordy mogą ulec zmianie lub zostaną usunięte przez kilka dni po początkowym wpisie danych, ale ostatecznie osiedlą się w stanie końcowym.

Poniższe zapytanie pokazuje użycie tego wzorca do wybrania 5 dni rekordów ze źródła, zaktualizowanie pasujących rekordów w obiekcie docelowym, wstawienie nowych rekordów ze źródła do miejsca docelowego i usunięcie wszystkich niezgodnych rekordów z ostatnich 5 dni w obiekcie docelowym.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Udostępniając ten sam filtr logiczny w tabelach źródłowych i docelowych, można dynamicznie propagować zmiany ze źródła do tabel docelowych, w tym usuwania.

Uwaga

Chociaż ten wzorzec może być używany bez żadnych klauzul warunkowych, może to prowadzić do pełnego ponownego zapisania tabeli docelowej, która może być kosztowna.