Upsert w tabeli Delta Lake przy użyciu operacji scalania
Możesz za pomocą operacji SQL MERGE
upsertować dane z tabeli źródłowej, widoku lub ramy danych do docelowej tabeli Delta. Usługa Delta Lake obsługuje wstawianie, aktualizacje i usuwanie w MERGE
oraz obsługuje rozszerzoną składnię wykraczającą poza standardy SQL, aby ułatwić zaawansowane przypadki użycia.
Mając tabelę źródłową o nazwie people10mupdates
lub ścieżkę źródłową /tmp/delta/people-10m-updates
, które zawierają nowe dane dla docelowej tabeli o nazwie people10m
lub ścieżki docelowej pod adresem /tmp/delta/people-10m
. Niektóre z tych nowych rekordów mogą już znajdować się w danych docelowych. Aby scalić nowe dane, chcesz zaktualizować wiersze, w których id
danej osoby już istnieje, i wstawić nowe wiersze, gdzie nie ma pasujących id
. Możesz uruchomić następujące zapytanie:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Ważne
Tylko jeden wiersz z tabeli źródłowej może być zgodny z danym wierszem w tabeli docelowej. W środowisku Databricks Runtime 16.0 lub nowszym, system ocenia warunki określone w klauzulach MERGE
oraz WHEN MATCHED
, aby określić zduplikowane dopasowania. W Databricks Runtime 15.4 LTS i wcześniejszych, MERGE
operacje uwzględniają tylko warunki określone w klauzuli ON
.
Zapoznaj się z dokumentacją interfejsu API usługi Delta Lake, aby uzyskać szczegółowe informacje o składni języka Scala i Python. Aby uzyskać szczegółowe informacje o składni SQL, zobacz MERGE INTO
Modyfikowanie wszystkich niedopasowanych wierszy przy użyciu scalania
W Databricks SQL i Databricks Runtime 12.2 LTS i nowszych można użyć WHEN NOT MATCHED BY SOURCE
klauzuli do UPDATE
lub DELETE
rekordów w tabeli docelowej, które nie mają odpowiednich rekordów w tabeli źródłowej. Usługa Databricks zaleca dodanie opcjonalnej klauzuli warunkowej, aby uniknąć całkowitego ponownego zapisywania tabeli docelowej.
Poniższy przykład kodu przedstawia podstawową składnię używania tej metody do usuwania, zastępowanie tabeli docelowej zawartością tabeli źródłowej i usuwanie niezgodnych rekordów w tabeli docelowej. Aby uzyskać bardziej skalowalny wzorzec dla tabel, w których aktualizacje źródłowe i usunięcia są powiązane czasowo, zobacz Incrementally sync Delta table with source (Przyrostowa synchronizacja tabeli różnicowej ze źródłem).
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Poniższy przykład dodaje warunki do klauzuli WHEN NOT MATCHED BY SOURCE
i określa wartości, które mają być aktualizowane w niedopasowanych wierszach docelowych.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Semantyka operacji scalania
Poniżej przedstawiono szczegółowy opis merge
semantyki operacji programowych.
Może istnieć dowolna liczba klauzul
whenMatched
iwhenNotMatched
.whenMatched
Klauzule są wykonywane, gdy wiersz źródłowy pasuje do wiersza tabeli docelowej na podstawie warunku dopasowania. Te klauzule mają następującą semantyka.whenMatched
Klauzule mogą zawierać co najwyżej jednąupdate
akcję i jednądelete
. Akcjaupdate
w programiemerge
aktualizuje tylko określone kolumny (podobne doupdate
operacji) dopasowanego wiersza docelowego. Akcjadelete
usuwa dopasowany wiersz.Każda klauzula
whenMatched
może mieć opcjonalny warunek. Jeśli ten warunek klauzuli istnieje, akcjaupdate
lubdelete
jest wykonywana dla dowolnej pasującej pary wierszy źródłowych docelowych tylko wtedy, gdy warunek klauzuli jest spełniony.Jeśli istnieje wiele
whenMatched
klauzul, są one oceniane w kolejności, w której są określone. WszystkiewhenMatched
klauzule, z wyjątkiem ostatniego, muszą mieć warunki.Jeśli żadna z
whenMatched
warunków nie zwróci wartości true dla pary wierszy źródłowych i docelowych pasujących do warunku scalania, wiersz docelowy pozostanie niezmieniony.Aby zaktualizować wszystkie kolumny docelowej tabeli delty z odpowiednimi kolumnami źródłowego zestawu danych, użyj polecenia
whenMatched(...).updateAll()
. Jest to odpowiednik:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
dla wszystkich kolumn docelowej tabeli Delta. W związku z tym ta akcja zakłada, że tabela źródłowa ma te same kolumny co w tabeli docelowej. W przeciwnym razie zapytanie zgłasza błąd analizy.
Uwaga
To zachowanie zmienia się, gdy włączono automatyczną ewolucję schematu. Aby uzyskać szczegółowe informacje, zobacz automatyczną ewolucję schematu.
whenNotMatched
Klauzule są wykonywane, gdy wiersz źródłowy nie pasuje do żadnego wiersza docelowego na podstawie warunku dopasowania. Te klauzule mają następującą semantyka.whenNotMatched
Klauzule mogą mieć tylkoinsert
działanie. Nowy wiersz jest generowany na podstawie określonej kolumny i odpowiednich wyrażeń. Nie trzeba określać wszystkich kolumn w tabeli docelowej. W przypadku kolumn docelowych, które nie są określone, wstawiany jestNULL
.Każda klauzula
whenNotMatched
może mieć opcjonalny warunek. Jeśli warunek klauzuli jest obecny, wiersz źródłowy jest wstawiany tylko wtedy, gdy ten warunek jest spełniony dla tego wiersza. W przeciwnym razie kolumna źródłowa jest ignorowana.Jeśli istnieje wiele
whenNotMatched
klauzul, są one oceniane w kolejności, w której są określone. WszystkiewhenNotMatched
klauzule, z wyjątkiem ostatniego, muszą mieć warunki.Aby wstawić wszystkie kolumny docelowej tabeli delty z odpowiednimi kolumnami źródłowego zestawu danych, użyj polecenia
whenNotMatched(...).insertAll()
. Jest to odpowiednik:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
dla wszystkich kolumn docelowej tabeli Delta. W związku z tym ta akcja zakłada, że tabela źródłowa ma te same kolumny co w tabeli docelowej. W przeciwnym razie zapytanie zgłasza błąd analizy.
Uwaga
To zachowanie zmienia się, gdy włączono automatyczną ewolucję schematu. Aby uzyskać szczegółowe informacje, zobacz automatyczną ewolucję schematu.
whenNotMatchedBySource
Klauzule są wykonywane, gdy wiersz docelowy nie pasuje do żadnego wiersza źródłowego na podstawie warunku scalania. Te klauzule mają następującą semantyka.-
whenNotMatchedBySource
klauzule mogą określaćdelete
iupdate
akcje. - Każda klauzula
whenNotMatchedBySource
może mieć opcjonalny warunek. Jeśli warunek klauzuli jest obecny, wiersz docelowy jest modyfikowany tylko wtedy, gdy ten warunek jest spełniony dla tego wiersza. W przeciwnym razie wiersz docelowy pozostanie niezmieniony. - Jeśli istnieje wiele
whenNotMatchedBySource
klauzul, są one oceniane w kolejności, w której są określone. WszystkiewhenNotMatchedBySource
klauzule, z wyjątkiem ostatniego, muszą mieć warunki. - Z definicji
whenNotMatchedBySource
klauzule nie mają wiersza źródłowego do ściągania wartości kolumn z, więc nie można odwoływać się do kolumn źródłowych. Dla każdej kolumny do zmodyfikowania można określić literał lub wykonać akcję w kolumnie docelowej, na przykładSET target.deleted_count = target.deleted_count + 1
.
-
Ważne
- Operacja
merge
może zakończyć się niepowodzeniem, jeśli wiele wierszy źródłowego zestawu danych pasuje, a scalanie próbuje zaktualizować te same wiersze w docelowej Tabeli Delta. Według semantyki SQL scalania taka operacja aktualizacji jest niejednoznaczna, ponieważ nie jest jasne, który wiersz źródłowy powinien być używany do aktualizowania dopasowanego wiersza docelowego. Możesz wstępnie przetworzyć tabelę źródłową, aby wyeliminować możliwość wielu dopasowań. - Operację SQL można zastosować w widoku SQL
MERGE
tylko wtedy, gdy widok został zdefiniowany jakoCREATE VIEW viewName AS SELECT * FROM deltaTable
.
Deduplikacja danych podczas zapisywania w tabelach Delta
Typowy przypadek użycia ETL polega na zbieraniu dzienników do tabeli Delta poprzez ich dodawanie. Jednak często źródła mogą generować zduplikowane rekordy dziennika, a kroki deduplikacji podrzędnej są potrzebne do ich obsługi. W programie merge
można uniknąć wstawiania zduplikowanych rekordów.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Uwaga
Zestaw danych zawierający nowe dzienniki musi być deduplikowany wewnętrznie. Semantyka scalania w SQL dopasowuje nowe dane do istniejących i usuwa duplikaty, ale jeśli w nowym zestawie danych występują duplikaty, zostaną one wstawione. Dlatego usuń duplikaty nowych danych przed scaleniem z tabelą.
Jeśli wiesz, że możesz uzyskać zduplikowane rekordy tylko przez kilka dni, możesz zoptymalizować zapytanie jeszcze bardziej, poprzez partycjonowanie tabeli według daty, a następnie określając zakres dat tabeli docelowej, aby się z nim zgadzał.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Jest to bardziej wydajne niż poprzednie polecenie, ponieważ wyszukuje duplikaty tylko w ciągu ostatnich 7 dni dzienników, a nie w całej tabeli. Ponadto można użyć scalania tylko dodającego w połączeniu ze Strukturami Strumieniowymi do ciągłej deduplikacji dzienników.
- W zapytaniu strumieniowym można użyć operacji scalania w
foreachBatch
, aby stale zapisywać dane strumieniowe do tabeli Delty z deduplikacją. Aby uzyskać więcej informacji, zobacz poniższy przykład przesyłania strumieniowego dotyczącyforeachBatch
. - W innym zapytaniu przesyłanym strumieniowo można stale odczytywać deduplikowane dane z tej tabeli delty. Jest to możliwe, ponieważ scalanie z tylko dodawaniem dołącza nowe dane do Tabeli Delta.
Powolne zmienianie danych (SCD) i przechwytywanie zmian w danych (CDC) z Delta Lake
DLT ma wbudowaną obsługę śledzenia i stosowania SCD Typ 1 i Typ 2. Użyj APPLY CHANGES INTO
z DLT, aby upewnić się, że nieuporządkowane rekordy są prawidłowo obsługiwane podczas przetwarzania strumieni CDC. Zobacz API "APPLY CHANGES": Dla uproszczenia przechwytywania danych o zmianach za pomocą DLT.
Przyrostowa synchronizacja tabeli delty ze źródłem
W Databricks SQL i Databricks Runtime 12.2 LTS i nowszych można użyć WHEN NOT MATCHED BY SOURCE
do tworzenia dowolnych warunków w celu atomowego usunięcia i zastąpienia części tabeli. Może to być szczególnie przydatne, gdy masz tabelę źródłową, w której rekordy mogą ulec zmianie lub zostaną usunięte przez kilka dni po początkowym wpisie danych, ale ostatecznie osiedlą się w stanie końcowym.
Poniższe zapytanie pokazuje użycie tego wzorca do wybrania 5 dni rekordów ze źródła, zaktualizowanie pasujących rekordów w obiekcie docelowym, wstawienie nowych rekordów ze źródła do miejsca docelowego i usunięcie wszystkich niezgodnych rekordów z ostatnich 5 dni w obiekcie docelowym.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Udostępniając ten sam filtr logiczny w tabelach źródłowych i docelowych, można dynamicznie propagować zmiany ze źródła do tabel docelowych, w tym usuwania.
Uwaga
Chociaż ten wzorzec może być używany bez żadnych klauzul warunkowych, może to prowadzić do pełnego ponownego zapisania tabeli docelowej, która może być kosztowna.