Aracılığıyla paylaş


Merge işlemi kullanılarak Delta Lake tablosuna güncelleme ve ekleme yapma

SQL işlemini kullanarak bir kaynak tablo, görünüm veya DataFrame'den hedef Delta tablosuna MERGE veri ekleyebilirsiniz. Delta Lake, eklemeleri, güncellemeleri ve silmeleri MERGE ile destekler ve ileri düzeydeki kullanım senaryolarını kolaylaştırmak için SQL standartlarının ötesinde genişletilmiş bir söz dizimini destekler.

Diyelim ki, people10mupdates adlı bir kaynak tablonuz veya /tmp/delta/people-10m-updates konumunda bir kaynak yolunuz var ve bunlar, people10m adlı bir hedef tablo veya /tmp/delta/people-10m konumunda bir hedef yol için yeni veriler içeriyor. Bu yeni kayıtlardan bazıları hedef verilerde zaten mevcut olabilir. Yeni verileri birleştirmek için, kişinin id zaten bulunduğu satırları güncellemek ve uygun id bulunmayan yeni satırları eklemek istiyorsunuz. Aşağıdaki sorguyu çalıştırabilirsiniz:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Önemli

Kaynak tablodan yalnızca tek bir satır hedef tablodaki belirli bir satırla eşleşebilir. Databricks Runtime 16.0 ve üzeri sürümlerde, MERGE, ON ve WHEN MATCHED maddelerinde belirtilen koşulları, yinelenen eşleşmeleri belirlemek için değerlendirir. Databricks Runtime 15.4 LTS ve altında, MERGE işlemler belirtilen koşulları içeren ON yan tümcesini yalnızca dikkate alır.

Scala ve Python söz dizimi ayrıntıları için Delta Lake API belgelerine bakın. SQL söz dizimi ayrıntıları için bkz. MERGE INTO

Birleştirmeyi kullanarak eşleşmeyen tüm satırları değiştirme

Databricks SQL ve Databricks Runtime 12.2 LTS ve üzeri sürümlerde, kaynak tabloda karşılık gelen kayıtları olmayan kayıtları hedef tablodan WHEN NOT MATCHED BY SOURCE tümcesini kullanarak UPDATE veya DELETE yapabilirsiniz. Databricks, hedef tablonun tam olarak yeniden yazılmasını önlemek için isteğe bağlı bir koşullu yan tümce eklenmesini önerir.

Aşağıdaki kod örneği, silme işlemleri için bunu kullanmanın temel söz dizimini, hedef tablonun üzerine kaynak tablonun içeriğiyle birlikte yazılmasını ve hedef tablodaki eşleşmeyen kayıtların silinmesini gösterir. Kaynak güncelleştirmeleri ve silmelerinin zamana bağlı olduğu tablolar için daha ölçeklenebilir bir desen hakkında bilgi almak için Delta tablosunu kaynakla artımlı olarak eşitleyin.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Aşağıdaki örnek yan tümcesine WHEN NOT MATCHED BY SOURCE koşullar ekler ve eşleşmeyen hedef satırlarda güncelleştirilecek değerleri belirtir.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Birleştirme işlemi semantiği

Programlı işlem semantiğinin merge ayrıntılı bir açıklaması aşağıdadır.

  • herhangi bir sayıda whenMatched ve whenNotMatched yan tümcesi olabilir.

  • whenMatched yan tümceleri, bir kaynak satırı eşleşme koşuluna göre bir hedef tablo satırıyla eşleştiğinde yürütülür. Bu yan tümceler aşağıdaki semantiklere sahiptir.

    • whenMatched yan tümcelerinin en fazla bir update ve bir delete eylemi olabilir. update içindeki merge eylemi, eşleşen hedef satırın belirtilen sütunlarını yalnızca (updateişleme benzer şekilde) güncelleştirir. Eylem, delete eşleşen satırı siler.

    • Her whenMatched yan tümce isteğe bağlı bir koşula sahip olabilir. Bu yan tümce koşulu varsa, update veya delete eylemi yalnızca yan tümce koşulu true olduğunda eşleşen herhangi bir kaynak hedef satır çifti için yürütülür.

    • Birden çok whenMatched yan tümce varsa, bunlar belirtildikleri sırayla değerlendirilir. Sonuncusu dışındaki tüm whenMatched yan tümceler koşul içermelidir.

    • Birleştirme koşuluyla whenMatched eşleşen bir kaynak ve hedef satır çifti için koşulların hiçbiri true olarak değerlendirilmezse, hedef satır değişmeden bırakılır.

    • Hedef Delta tablosunun tüm sütunlarını kaynak veri kümesinin ilgili sütunlarıyla güncelleştirmek için kullanın whenMatched(...).updateAll(). Bu, şuna eşdeğerdir:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      hedef Delta tablosunun tüm sütunları için. Bu nedenle, bu eylem kaynak tablonun hedef tablodakilerle aynı sütunlara sahip olduğunu varsayar, aksi takdirde sorgu bir çözümleme hatası oluşturur.

      Not

      Otomatik şema evrimi etkinleştirildiğinde bu davranış değişir. Ayrıntılar için bkz . otomatik şema evrimi .

  • whenNotMatched yan tümceleri, bir kaynak satırı eşleşme koşuluna göre herhangi bir hedef satırla uyuşmadığında yürütülür. Bu yan tümceler aşağıdaki semantiklere sahiptir.

    • whenNotMatched yan tümceleri yalnızca insert eylemini içerebilir. Yeni satır, belirtilen sütuna ve karşılık gelen ifadelere göre oluşturulur. Hedef tablodaki tüm sütunları belirtmeniz gerekmez. Belirtilmemiş hedef sütunlar için NULL eklenir.

    • Her whenNotMatched yan tümce isteğe bağlı bir koşula sahip olabilir. Şarta bağlı tümce varsa, yalnızca bu şart satır için doğruysa bir kaynak satır eklenir. Aksi takdirde, kaynak sütun yoksayılır.

    • Birden çok whenNotMatched yan tümce varsa, bunlar belirtildikleri sırayla değerlendirilir. Sonuncusu dışındaki tüm whenNotMatched cümlelerin koşulları olmalıdır.

    • Hedef Delta tablosunun tüm sütunlarını kaynak veri kümesinin ilgili sütunlarıyla eklemek için kullanın whenNotMatched(...).insertAll(). Bu, şuna eşdeğerdir:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      hedef Delta tablosunun tüm sütunları için. Bu nedenle, bu eylem kaynak tablonun hedef tablodakilerle aynı sütunlara sahip olduğunu varsayar, aksi takdirde sorgu bir çözümleme hatası oluşturur.

      Not

      Otomatik şema evrimi etkinleştirildiğinde bu davranış değişir. Ayrıntılar için bkz . otomatik şema evrimi .

  • whenNotMatchedBySource yan tümceleri, bir hedef satır birleştirme koşuluna göre hiçbir kaynak satırla eşleşmediğinde yürütülür. Bu yan tümceler aşağıdaki semantiklere sahiptir.

    • whenNotMatchedBySourcetümceler delete belirtebilir ve update eylemleri.
    • Her whenNotMatchedBySource yan tümce isteğe bağlı bir koşula sahip olabilir. Yan tümce koşulu varsa, hedef satır, yalnızca bu koşul söz konusu satır için doğru olduğunda değiştirilir. Aksi takdirde, hedef satır değişmeden bırakılır.
    • Birden çok whenNotMatchedBySource yan tümce varsa, bunlar belirtildikleri sırayla değerlendirilir. Sonuncusu dışındaki tüm whenNotMatchedBySource yan tümcelerin şartları olmalıdır.
    • Tanım gereği, whenNotMatchedBySource yan tümcelerde sütun değerlerinin çekileceği bir kaynak satır yoktur ve bu nedenle kaynak sütunlara başvurulamaz. Değiştirilecek her sütun için ya bir sabit belirtebilir ya da SET target.deleted_count = target.deleted_count + 1gibi bir eylemi hedef sütunda gerçekleştirebilirsiniz.

Önemli

  • merge Kaynak veri kümesinin birden çok satırı eşleşirse ve birleştirme hedef Delta tablosunun aynı satırlarını güncelleştirmeye çalışırsa işlem başarısız olabilir. Birleştirmenin SQL semantiğine göre, eşleşen hedef satırı güncelleştirmek için hangi kaynak satırın kullanılacağı belirsiz olduğundan bu tür bir güncelleştirme işlemi belirsizdir. Birden çok eşleşme olasılığını ortadan kaldırmak için kaynak tabloyu önceden işleyebilirsiniz.
  • Bir SQL GÖRÜNÜMÜne SQL MERGE işlemi uygulayabilmeniz için görünümün CREATE VIEW viewName AS SELECT * FROM deltaTable olarak tanımlanmış olması gerekir.

Delta tablolarına yazarken yinelenen verileri kaldırma

Yaygın bir ETL kullanım örneği, günlükleri bir tabloya ekleyerek Delta tablosuna toplamaktır. Ancak, genellikle kaynaklar yinelenen günlük kayıtları oluşturabilir ve bunları yönetmek için sonraki adımlarda yinelenenlerin kaldırılması gerekir. ile merge, yinelenen kayıtları eklemekten kaçınabilirsiniz.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Not

Yeni günlükleri içeren veri kümesinin içindeki yinelemeleri gidermesi gerekir. Birleştirmenin SQL semantiğiyle, yeni verileri tablodaki mevcut verilerle eşleştirir ve yinelenenleri kaldırır, ancak yeni veri kümesinde yinelenen veriler varsa eklenir. Bu nedenle, tabloya birleştirmeden önce yeni verilerdeki tekrar edenleri kaldırın.

Yalnızca birkaç gün boyunca yinelenen kayıtlar alabileceğinizi biliyorsanız, tabloyu tarihe göre bölümleyerek ve ardından eşleşecek hedef tablonun tarih aralığını belirterek sorgunuzu daha da iyileştirebilirsiniz.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Bu, tablonun tamamında değil yalnızca son 7 günlükte yinelenenleri ararken önceki komuttan daha verimlidir. Ayrıca, kayıtların sürekli tekrarlananlarını kaldırmak için bu salt eklemeli birleştirmeyi Structured Streaming ile kullanabilirsiniz.

  • Akış sorgusunda, yeniden duplaka önleme ile bir Delta tablosuna sürekli olarak herhangi bir akış verisi yazmak için foreachBatch içindeki birleştirme işlemini kullanabilirsiniz. Daha fazla bilgi için aşağıdaki akış örneğine bakın.
  • Başka bir akış sorgusunda, Delta tablosundan tekrarlanan verileri kaldırılmış halde devamlı olarak okuyabilirsiniz. Yalnızca ekleme olarak yapılan birleştirme, Delta tablosuna yalnızca yeni veriler eklediği için bu mümkündür.

Delta Lake ile yavaş değişen veriler (SCD) ve veri yakalamayı (CDC) değiştirme

DLT, SCD Tür 1 ve Tür 2'yi izlemek ve uygulamak için yerel desteğe sahiptir. CDC beslemeleri işlenirken sırasız kayıtların doğru ele alındığından emin olmak için DLT ile APPLY CHANGES INTO kullanın. Bkz. DEĞIŞIKLIKLERI UYGULA API'leri: DLTile değişiklik verilerini yakalamayı basitleştirme.

Delta tablosunu kaynakla artımlı olarak eşitleme

WHEN NOT MATCHED BY SOURCE kullanarak Databricks SQL ve Databricks Runtime 12.2 LTS ve sonrasında, bir tablonun bir bölümünü atomik olarak silmek ve değiştirmek için keyfi koşullar oluşturabilirsiniz. Bu, özellikle kayıtların ilk veri girdisi sonrasında birkaç gün boyunca değişebileceği veya silinebileceği, ancak sonunda son duruma geçebileceği bir kaynak tablonuz olduğunda yararlı olabilir.

Aşağıdaki sorguda, kaynaktan 5 günlük kayıtları seçmek, hedefteki eşleşen kayıtları güncelleştirmek, kaynaktan hedefe yeni kayıtlar eklemek ve hedefte son 5 günün eşleşmeyen tüm kayıtlarını silmek için bu desenin kullanılması gösterilmektedir.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Kaynak ve hedef tablolarda aynı boole filtresini sağlayarak, silme işlemleri de dahil olmak üzere değişiklikleri kaynağınızdan hedef tablolara dinamik olarak yayabilirsiniz.

Not

Bu desen herhangi bir koşullu yan tümce olmadan kullanılabilse de, bu durum hedef tablonun tamamen yeniden yazılmasıyla sonuçlanabilir ve bu da pahalı olabilir.