Beszúrás vagy frissítés egy Delta Lake táblába összevonás használatával
Az SQL-művelettel MERGE
adatokat állíthat be egy forrástáblából, nézetből vagy DataFrame-ből egy cél Delta-táblába. A Delta Lake támogatja a beszúrásokat, frissítéseket és törléseket MERGE
, és támogatja az SQL-szabványokon túli kiterjesztett szintaxist a speciális használati esetek megkönnyítése érdekében.
Feltételezzük, hogy van egy forrástáblája people10mupdates
, vagy egy forrásútvonala /tmp/delta/people-10m-updates
, amely új adatokat tartalmaz egy adott people10m
nevű céltáblához vagy egy célútvonalhoz /tmp/delta/people-10m
. Előfordulhat, hogy az új rekordok némelyike már szerepel a céladatokban. Az új adatok egyesítéséhez frissítenie kell azokat a sorokat, ahol a személy id
már jelen van, és be kell szúrni az új sorokat, ahol nincs egyező id
. A következő lekérdezést futtathatja:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Fontos
A céltábla egy adott sorához csak a forrástábla egyetlen sora felelhet meg. A Databricks Runtime 16.0-s és újabb verzióiban a MERGE
kiértékeli a WHEN MATCHED
és ON
záradékokban megadott feltételeket az ismétlődő egyezések meghatározásához. A Databricks Runtime 15.4 LTS és újabb MERGE
verziókban a műveletek csak a ON
záradékban megadott feltételeket veszik figyelembe.
A Scala és a Python szintaxisának részleteiért tekintse meg a Delta Lake API dokumentációját . Az SQL-szintaxis részleteiért lásd: MERGE INTO
Az összes nem egyező sor módosítása egyesítéssel
A Databricks SQL-ben és a Databricks Runtime 12.2 LTS-ben és újabb verziókban használhatja a WHEN NOT MATCHED BY SOURCE
záradékot a céltábla azon rekordjaihoz vagy UPDATE
rekordjaihozDELETE
, amelyek nem rendelkeznek megfelelő rekordokkal a forrástáblában. A Databricks egy opcionális feltételes záradék hozzáadását javasolja a céltábla teljes újraírásának elkerülése érdekében.
Az alábbi példakód azt mutatja be, hogy milyen alapszintaxissal használható ez a törléshez, felülírja a céltáblát a forrástábla tartalmával, és törli a céltáblában a nem egyező rekordokat. A forrásfrissítéseket és törléseket időkorláttal rendelkező táblák méretezhetőbb mintáját lásd : Delta-tábla növekményes szinkronizálása a forrással.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Az alábbi példa feltételeket ad a WHEN NOT MATCHED BY SOURCE
záradékhoz, és a nem egyező célsorokban frissíteni kívánt értékeket adja meg.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Egyesítési művelet szemantikája
A programozott művelet szemantikájának részletes leírása az merge
alábbiakban található.
Az
whenMatched
éswhenNotMatched
záradékok száma tetszőleges lehet.whenMatched
záradékok akkor lesznek végrehajtva, ha egy forrássor megfelel egy céltáblasornak az egyeztetési feltétel alapján. Ezek a záradékok a következő szemantikával rendelkeznek.whenMatched
a záradékok legfeljebb egyupdate
és egydelete
műveletet tartalmazhatnak. Aupdate
műveletmerge
csak a megfeleltetett célsor megadott oszlopait frissíti (aupdate
művelethez hasonlóan). Adelete
művelet törli a egyeztetett sort.Minden
whenMatched
záradék rendelkezhet opcionális feltétellel. Ha ez a záradékfeltétel létezik, a program csak akkor hajtja végre aupdate
delete
műveletet az egyező forrás-cél sorpár esetében, ha a záradékfeltétel igaz.Ha több
whenMatched
záradék is létezik, a rendszer a megadott sorrendben értékeli ki őket. Az utolsó kivételével mindenwhenMatched
záradéknak feltételekkel kell rendelkeznie.Ha az
whenMatched
egyesítési feltételnek megfelelő forrás- és célsorpár egyik feltétele sem lesz igaz, a célsor változatlan marad.Ha frissíteni szeretné a cél Delta-tábla összes oszlopát a forrásadatkészlet megfelelő oszlopaival, használja a következőt
whenMatched(...).updateAll()
: . Ez egyenértékű a következő értékekkel:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
a cél Delta-tábla összes oszlopához. Ezért ez a művelet feltételezi, hogy a forrástábla oszlopai megegyeznek a céltáblában szereplő oszlopokkal, ellenkező esetben a lekérdezés elemzési hibát jelez.
Megjegyzés
Ez a viselkedés megváltozik, ha engedélyezve van az automatikus sémafejlődés. Részletekért tekintse meg az automatikus sémafejlődést .
whenNotMatched
záradékok akkor lesznek végrehajtva, ha egy forrássor nem egyezik meg egyetlen célsorsal sem az egyeztetési feltétel alapján. Ezek a záradékok a következő szemantikával rendelkeznek.whenNotMatched
záradékok csak ainsert
műveletet tartalmazhatják. Az új sor a megadott oszlop és a hozzájuk tartozó kifejezések alapján jön létre. Nem kell megadnia a céltábla összes oszlopát. A nem meghatározott céloszlopok esetébenNULL
beillesztésre kerül.Minden
whenNotMatched
záradék rendelkezhet opcionális feltétellel. Ha a záradékfeltétel jelen van, a forrássor csak akkor lesz beszúrva, ha ez a feltétel igaz az adott sorra. Ellenkező esetben a forrásoszlop figyelmen kívül lesz hagyva.Ha több
whenNotMatched
záradék is létezik, a rendszer a megadott sorrendben értékeli ki őket. Az utolsó kivételével mindenwhenNotMatched
záradéknak feltételekkel kell rendelkeznie.Ha a cél Delta-tábla összes oszlopát a forrásadatkészlet megfelelő oszlopaival szeretné összekapcsolni, használja a
whenNotMatched(...).insertAll()
. Ez egyenértékű a következővel:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
a cél Delta-tábla összes oszlopához. Ezért ez a művelet feltételezi, hogy a forrástábla oszlopai megegyeznek a céltáblában szereplő oszlopokkal, ellenkező esetben a lekérdezés elemzési hibát jelez.
Feljegyzés
Ez a viselkedés megváltozik, ha engedélyezve van az automatikus sémafejlődés. Részletekért tekintse meg az automatikus sémafejlődést .
whenNotMatchedBySource
záradékok akkor lesznek végrehajtva, ha a célsor nem egyezik meg egyetlen forrássorsal sem az egyesítési feltétel alapján. Ezek a záradékok a következő szemantikával rendelkeznek.-
whenNotMatchedBySource
záradékok megadhatják adelete
feltételeket ésupdate
cselekvéseket határozhatnak meg. - Minden
whenNotMatchedBySource
záradék rendelkezhet opcionális feltétellel. Ha a záradékfeltétel jelen van, a célsor csak akkor módosul, ha ez a feltétel igaz az adott sorra. Ellenkező esetben a célsor változatlan marad. - Ha több
whenNotMatchedBySource
záradék is létezik, a rendszer a megadott sorrendben értékeli ki őket. Az utolsó kivételével mindenwhenNotMatchedBySource
záradéknak feltételekkel kell rendelkeznie. - A záradékok definíció szerint
whenNotMatchedBySource
nem rendelkeznek forrássorsal az oszlopértékek lekéréséhez, ezért a forrásoszlopokra nem lehet hivatkozni. Az egyes oszlopok módosításához megadhat egy literális értéket, vagy végrehajthat egy műveletet a céloszlopon, mint példáulSET target.deleted_count = target.deleted_count + 1
.
-
Fontos
- A
merge
művelet meghiúsulhat, ha a forrásadatkészlet több sora egyezik, és az egyesítés megkísérli frissíteni a cél deltatábla ugyanazon sorait. Az egyesítés SQL-szemantikája szerint az ilyen frissítési művelet nem egyértelmű, mivel nem világos, hogy melyik forrássort kell használni a megfeleltetett célsor frissítéséhez. A forrástábla előfeldolgozásával kiküszöbölheti a több egyezés lehetőségét. - SQL-műveletet
MERGE
csak akkor lehet alkalmazni egy SQL nézetre, ha a nézetCREATE VIEW viewName AS SELECT * FROM deltaTable
lett definiálva.
Adatdeduplikáció Delta-táblákba való íráskor
Az ETL gyakori használati esete, hogy naplókat gyűjt a Delta táblába úgy, hogy hozzáfűzi őket egy táblához. A források azonban gyakran duplikált naplórekordokat hozhatnak létre, és az alsóbb rétegbeli deduplikációs lépésekre van szükség a kezelésükhöz. Ezzel merge
elkerülheti az ismétlődő rekordok beszúrását.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Megjegyzés
Az új naplókat tartalmazó adatkészletet önmagában kell deduplikálni. SQL szemantika szerinti egyesítés összepárosítja és deduplikálja az új adatokat a táblában lévő meglévő adatokkal, de ha az új adatkészletben vannak duplikált adatok, akkor ezek be lesznek szúrva. Ezért deduplikálja az új adatokat a táblába való egyesítés előtt.
Ha tudja, hogy csak néhány napig kaphat duplikált rekordokat, a lekérdezést tovább optimalizálhatja a tábla dátum szerinti particionálásával, majd a céltábla dátumtartományának megadásával.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Ez hatékonyabb, mint az előző parancs, mivel csak a naplók utolsó 7 napjában keresi az ismétlődéseket, nem pedig a teljes táblát. Emellett ezt a csak beszúrásos egyesítést a strukturált streameléssel is használhatja a naplók folyamatos deduplikációjának végrehajtásához.
- Egy streamelési lekérdezésben az egyesítési művelet segítségével folyamatosan írhat bármilyen streamelési adatot egy Delta-táblába deduplikációval. További információt a következő streamelési példában
foreachBatch
talál. - Egy másik streamelési lekérdezésben folyamatosan olvashat deduplikált adatokat ebből a Delta-táblából. Ez azért lehetséges, mert egy csak beszúrásos egyesítés csak új adatokat fűz a Delta táblához.
Lassan változó adatok (SCD) és változáskövetés (CDC) a Delta Lake használatával
A DLT natív támogatást nyújt az SCD 1. és 2. típusának nyomon követéséhez és alkalmazásához. A APPLY CHANGES INTO
-t használja a DLT-vel, hogy biztosítsa a CDC-hírcsatornák feldolgozásakor a sorrendben nem lévő rekordok megfelelő kezelését. Lásd: A MÓDOSÍTÁSOK ALKALMAZÁSA API-k: A változásadatok rögzítésének egyszerűsítése A DLThasználatával.
Delta-tábla növekményes szinkronizálása a forrással
A Databricks SQL és a Databricks Runtime 12.2 LTS és újabb verzióiban tetszőleges feltételeket hozhat WHEN NOT MATCHED BY SOURCE
létre a tábla egy részének atomi törléséhez és cseréjéhez. Ez különösen akkor lehet hasznos, ha olyan forrástáblával rendelkezik, amelyben a rekordok a kezdeti adatbevitel után néhány napig változhatnak vagy törölhetők, de végül végleges állapotba kerülnek.
Az alábbi lekérdezés azt mutatja be, hogy ezzel a mintával 5 napnyi rekordot választhat ki a forrásból, frissítheti a cél egyező rekordjait, új rekordokat szúrhat be a forrásból a célhelyre, és törölheti az összes nem egyező rekordot a célban az elmúlt 5 napból.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Ha ugyanazt a logikai szűrőt adja meg a forrás- és céltáblákon, dinamikusan propagálja a módosításokat a forrásból a céltáblákba, beleértve a törléseket is.
Feljegyzés
Bár ez a minta feltételes záradékok nélkül is használható, ez a céltábla teljes újraírásához vezet, ami költséges lehet.