Merge işlemi kullanılarak Delta Lake tablosuna güncelleme ve ekleme yapma
SQL işlemini kullanarak bir kaynak tablo, görünüm veya DataFrame'den hedef Delta tablosuna MERGE
veri ekleyebilirsiniz. Delta Lake, eklemeleri, güncellemeleri ve silmeleri MERGE
ile destekler ve ileri düzeydeki kullanım senaryolarını kolaylaştırmak için SQL standartlarının ötesinde genişletilmiş bir söz dizimini destekler.
Diyelim ki, people10mupdates
adlı bir kaynak tablonuz veya /tmp/delta/people-10m-updates
konumunda bir kaynak yolunuz var ve bunlar, people10m
adlı bir hedef tablo veya /tmp/delta/people-10m
konumunda bir hedef yol için yeni veriler içeriyor. Bu yeni kayıtlardan bazıları hedef verilerde zaten mevcut olabilir. Yeni verileri birleştirmek için, kişinin id
zaten bulunduğu satırları güncellemek ve uygun id
bulunmayan yeni satırları eklemek istiyorsunuz. Aşağıdaki sorguyu çalıştırabilirsiniz:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Önemli
Kaynak tablodan yalnızca tek bir satır hedef tablodaki belirli bir satırla eşleşebilir. Databricks Runtime 16.0 ve üzeri sürümlerde, MERGE
, ON
ve WHEN MATCHED
maddelerinde belirtilen koşulları, yinelenen eşleşmeleri belirlemek için değerlendirir. Databricks Runtime 15.4 LTS ve altında, MERGE
işlemler belirtilen koşulları içeren ON
yan tümcesini yalnızca dikkate alır.
Scala ve Python söz dizimi ayrıntıları için Delta Lake API belgelerine bakın. SQL söz dizimi ayrıntıları için bkz. MERGE INTO
Birleştirmeyi kullanarak eşleşmeyen tüm satırları değiştirme
Databricks SQL ve Databricks Runtime 12.2 LTS ve üzeri sürümlerde, kaynak tabloda karşılık gelen kayıtları olmayan kayıtları hedef tablodan WHEN NOT MATCHED BY SOURCE
tümcesini kullanarak UPDATE
veya DELETE
yapabilirsiniz. Databricks, hedef tablonun tam olarak yeniden yazılmasını önlemek için isteğe bağlı bir koşullu yan tümce eklenmesini önerir.
Aşağıdaki kod örneği, silme işlemleri için bunu kullanmanın temel söz dizimini, hedef tablonun üzerine kaynak tablonun içeriğiyle birlikte yazılmasını ve hedef tablodaki eşleşmeyen kayıtların silinmesini gösterir. Kaynak güncelleştirmeleri ve silmelerinin zamana bağlı olduğu tablolar için daha ölçeklenebilir bir desen hakkında bilgi almak için Delta tablosunu kaynakla artımlı olarak eşitleyin.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Aşağıdaki örnek yan tümcesine WHEN NOT MATCHED BY SOURCE
koşullar ekler ve eşleşmeyen hedef satırlarda güncelleştirilecek değerleri belirtir.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Birleştirme işlemi semantiği
Programlı işlem semantiğinin merge
ayrıntılı bir açıklaması aşağıdadır.
herhangi bir sayıda
whenMatched
vewhenNotMatched
yan tümcesi olabilir.whenMatched
yan tümceleri, bir kaynak satırı eşleşme koşuluna göre bir hedef tablo satırıyla eşleştiğinde yürütülür. Bu yan tümceler aşağıdaki semantiklere sahiptir.whenMatched
yan tümcelerinin en fazla birupdate
ve birdelete
eylemi olabilir.update
içindekimerge
eylemi, eşleşen hedef satırın belirtilen sütunlarını yalnızca (update
işleme benzer şekilde) güncelleştirir. Eylem,delete
eşleşen satırı siler.Her
whenMatched
yan tümce isteğe bağlı bir koşula sahip olabilir. Bu yan tümce koşulu varsa,update
veyadelete
eylemi yalnızca yan tümce koşulu true olduğunda eşleşen herhangi bir kaynak hedef satır çifti için yürütülür.Birden çok
whenMatched
yan tümce varsa, bunlar belirtildikleri sırayla değerlendirilir. Sonuncusu dışındaki tümwhenMatched
yan tümceler koşul içermelidir.Birleştirme koşuluyla
whenMatched
eşleşen bir kaynak ve hedef satır çifti için koşulların hiçbiri true olarak değerlendirilmezse, hedef satır değişmeden bırakılır.Hedef Delta tablosunun tüm sütunlarını kaynak veri kümesinin ilgili sütunlarıyla güncelleştirmek için kullanın
whenMatched(...).updateAll()
. Bu, şuna eşdeğerdir:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
hedef Delta tablosunun tüm sütunları için. Bu nedenle, bu eylem kaynak tablonun hedef tablodakilerle aynı sütunlara sahip olduğunu varsayar, aksi takdirde sorgu bir çözümleme hatası oluşturur.
Not
Otomatik şema evrimi etkinleştirildiğinde bu davranış değişir. Ayrıntılar için bkz . otomatik şema evrimi .
whenNotMatched
yan tümceleri, bir kaynak satırı eşleşme koşuluna göre herhangi bir hedef satırla uyuşmadığında yürütülür. Bu yan tümceler aşağıdaki semantiklere sahiptir.whenNotMatched
yan tümceleri yalnızcainsert
eylemini içerebilir. Yeni satır, belirtilen sütuna ve karşılık gelen ifadelere göre oluşturulur. Hedef tablodaki tüm sütunları belirtmeniz gerekmez. Belirtilmemiş hedef sütunlar içinNULL
eklenir.Her
whenNotMatched
yan tümce isteğe bağlı bir koşula sahip olabilir. Şarta bağlı tümce varsa, yalnızca bu şart satır için doğruysa bir kaynak satır eklenir. Aksi takdirde, kaynak sütun yoksayılır.Birden çok
whenNotMatched
yan tümce varsa, bunlar belirtildikleri sırayla değerlendirilir. Sonuncusu dışındaki tümwhenNotMatched
cümlelerin koşulları olmalıdır.Hedef Delta tablosunun tüm sütunlarını kaynak veri kümesinin ilgili sütunlarıyla eklemek için kullanın
whenNotMatched(...).insertAll()
. Bu, şuna eşdeğerdir:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
hedef Delta tablosunun tüm sütunları için. Bu nedenle, bu eylem kaynak tablonun hedef tablodakilerle aynı sütunlara sahip olduğunu varsayar, aksi takdirde sorgu bir çözümleme hatası oluşturur.
Not
Otomatik şema evrimi etkinleştirildiğinde bu davranış değişir. Ayrıntılar için bkz . otomatik şema evrimi .
whenNotMatchedBySource
yan tümceleri, bir hedef satır birleştirme koşuluna göre hiçbir kaynak satırla eşleşmediğinde yürütülür. Bu yan tümceler aşağıdaki semantiklere sahiptir.-
whenNotMatchedBySource
tümcelerdelete
belirtebilir veupdate
eylemleri. - Her
whenNotMatchedBySource
yan tümce isteğe bağlı bir koşula sahip olabilir. Yan tümce koşulu varsa, hedef satır, yalnızca bu koşul söz konusu satır için doğru olduğunda değiştirilir. Aksi takdirde, hedef satır değişmeden bırakılır. - Birden çok
whenNotMatchedBySource
yan tümce varsa, bunlar belirtildikleri sırayla değerlendirilir. Sonuncusu dışındaki tümwhenNotMatchedBySource
yan tümcelerin şartları olmalıdır. - Tanım gereği,
whenNotMatchedBySource
yan tümcelerde sütun değerlerinin çekileceği bir kaynak satır yoktur ve bu nedenle kaynak sütunlara başvurulamaz. Değiştirilecek her sütun için ya bir sabit belirtebilir ya daSET target.deleted_count = target.deleted_count + 1
gibi bir eylemi hedef sütunda gerçekleştirebilirsiniz.
-
Önemli
-
merge
Kaynak veri kümesinin birden çok satırı eşleşirse ve birleştirme hedef Delta tablosunun aynı satırlarını güncelleştirmeye çalışırsa işlem başarısız olabilir. Birleştirmenin SQL semantiğine göre, eşleşen hedef satırı güncelleştirmek için hangi kaynak satırın kullanılacağı belirsiz olduğundan bu tür bir güncelleştirme işlemi belirsizdir. Birden çok eşleşme olasılığını ortadan kaldırmak için kaynak tabloyu önceden işleyebilirsiniz. - Bir SQL GÖRÜNÜMÜne SQL
MERGE
işlemi uygulayabilmeniz için görünümünCREATE VIEW viewName AS SELECT * FROM deltaTable
olarak tanımlanmış olması gerekir.
Delta tablolarına yazarken yinelenen verileri kaldırma
Yaygın bir ETL kullanım örneği, günlükleri bir tabloya ekleyerek Delta tablosuna toplamaktır. Ancak, genellikle kaynaklar yinelenen günlük kayıtları oluşturabilir ve bunları yönetmek için sonraki adımlarda yinelenenlerin kaldırılması gerekir. ile merge
, yinelenen kayıtları eklemekten kaçınabilirsiniz.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Not
Yeni günlükleri içeren veri kümesinin içindeki yinelemeleri gidermesi gerekir. Birleştirmenin SQL semantiğiyle, yeni verileri tablodaki mevcut verilerle eşleştirir ve yinelenenleri kaldırır, ancak yeni veri kümesinde yinelenen veriler varsa eklenir. Bu nedenle, tabloya birleştirmeden önce yeni verilerdeki tekrar edenleri kaldırın.
Yalnızca birkaç gün boyunca yinelenen kayıtlar alabileceğinizi biliyorsanız, tabloyu tarihe göre bölümleyerek ve ardından eşleşecek hedef tablonun tarih aralığını belirterek sorgunuzu daha da iyileştirebilirsiniz.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Bu, tablonun tamamında değil yalnızca son 7 günlükte yinelenenleri ararken önceki komuttan daha verimlidir. Ayrıca, kayıtların sürekli tekrarlananlarını kaldırmak için bu salt eklemeli birleştirmeyi Structured Streaming ile kullanabilirsiniz.
- Akış sorgusunda, yeniden duplaka önleme ile bir Delta tablosuna sürekli olarak herhangi bir akış verisi yazmak için
foreachBatch
içindeki birleştirme işlemini kullanabilirsiniz. Daha fazla bilgi için aşağıdaki akış örneğine bakın. - Başka bir akış sorgusunda, Delta tablosundan tekrarlanan verileri kaldırılmış halde devamlı olarak okuyabilirsiniz. Yalnızca ekleme olarak yapılan birleştirme, Delta tablosuna yalnızca yeni veriler eklediği için bu mümkündür.
Delta Lake ile yavaş değişen veriler (SCD) ve veri yakalamayı (CDC) değiştirme
DLT, SCD Tür 1 ve Tür 2'yi izlemek ve uygulamak için yerel desteğe sahiptir. CDC beslemeleri işlenirken sırasız kayıtların doğru ele alındığından emin olmak için DLT ile APPLY CHANGES INTO
kullanın. Bkz. DEĞIŞIKLIKLERI UYGULA API'leri: DLTile değişiklik verilerini yakalamayı basitleştirme.
Delta tablosunu kaynakla artımlı olarak eşitleme
WHEN NOT MATCHED BY SOURCE
kullanarak Databricks SQL ve Databricks Runtime 12.2 LTS ve sonrasında, bir tablonun bir bölümünü atomik olarak silmek ve değiştirmek için keyfi koşullar oluşturabilirsiniz. Bu, özellikle kayıtların ilk veri girdisi sonrasında birkaç gün boyunca değişebileceği veya silinebileceği, ancak sonunda son duruma geçebileceği bir kaynak tablonuz olduğunda yararlı olabilir.
Aşağıdaki sorguda, kaynaktan 5 günlük kayıtları seçmek, hedefteki eşleşen kayıtları güncelleştirmek, kaynaktan hedefe yeni kayıtlar eklemek ve hedefte son 5 günün eşleşmeyen tüm kayıtlarını silmek için bu desenin kullanılması gösterilmektedir.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Kaynak ve hedef tablolarda aynı boole filtresini sağlayarak, silme işlemleri de dahil olmak üzere değişiklikleri kaynağınızdan hedef tablolara dinamik olarak yayabilirsiniz.
Not
Bu desen herhangi bir koşullu yan tümce olmadan kullanılabilse de, bu durum hedef tablonun tamamen yeniden yazılmasıyla sonuçlanabilir ve bu da pahalı olabilir.