Funkcja delta table przesyła strumieniowo odczyty i zapisy
Usługa Delta Lake jest głęboko zintegrowana z przesyłaniem strumieniowym ze strukturą platformy Spark za pośrednictwem usług readStream
i writeStream
. Usługa Delta Lake pokonuje wiele ograniczeń zwykle związanych z systemami przesyłania strumieniowego i plikami, w tym:
- Łączenie małych plików generowanych przez pozyskiwanie małych opóźnień.
- Obsługa przetwarzania "dokładnie raz" przy użyciu więcej niż jednego strumienia (lub współbieżnych zadań wsadowych).
- Efektywne odnajdywanie nowych plików podczas korzystania z plików jako źródła strumienia.
Uwaga
W tym artykule opisano używanie tabel Delta Lake jako źródeł i odbiorników przesyłania strumieniowego. Aby dowiedzieć się, jak ładować dane przy użyciu tabel przesyłania strumieniowego w usłudze Databricks SQL, zobacz Load data using streaming tables in Databricks SQL.
Aby uzyskać informacje na temat sprzężeń statycznych strumieniowych za pomocą usługi Delta Lake, zobacz Stream-static joins (Sprzężenia statyczne strumienia).
tabela Delta jako źródło
Przesyłanie strumieniowe Structured Streaming przyrostowo odczytuje tabele Delta. Podczas gdy zapytanie przesyłane strumieniowo jest aktywne względem tabeli delty, nowe rekordy są przetwarzane idempotentnie, gdy nowe wersje tabeli są zatwierdzane w tabeli źródłowej.
Poniższe przykłady kodu pokazują, jak skonfigurować odczyt strumieniowy przy użyciu nazwy tabeli lub ścieżki pliku.
Python
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Scala
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Ważne
Jeśli schemat tabeli usługi Delta zmieni się po rozpoczęciu odczytu przesyłania strumieniowego względem tabeli, zapytanie zakończy się niepowodzeniem. W przypadku większości zmian schematu można ponownie uruchomić strumień, aby rozwiązać niezgodność schematu i kontynuować przetwarzanie.
W środowisku Databricks Runtime 12.2 LTS i poniżej nie można przesyłać strumieniowo z tabeli Delta z włączonym mapowaniem kolumn, które przeszły ewolucję schematu, która nie jest dodatnia, na przykład zmianę nazwy lub usuwanie kolumn. Aby uzyskać szczegółowe informacje, zobacz Przesyłanie strumieniowe za pomocą mapowania kolumn i zmian schematu.
Ogranicz szybkość wprowadzania
Dostępne są następujące opcje do kontrolowania mikrosadów:
-
maxFilesPerTrigger
: ile nowych plików należy wziąć pod uwagę w każdej mikrosadowej partii. Wartość domyślna to 1000. -
maxBytesPerTrigger
: ile danych jest przetwarzanych w każdej mikrosadowej partii. Ta opcja ustawia wartość "nietrwałą maksymalną", co oznacza, że partia przetwarza w przybliżeniu tę ilość danych i może przetwarzać więcej niż limit, aby zapytanie przesyłane strumieniowo przechodziło do przodu w przypadkach, gdy najmniejsza jednostka wejściowa jest większa niż ten limit. Ta opcja nie jest domyślnie ustawiona.
Jeśli używasz maxBytesPerTrigger
w połączeniu z maxFilesPerTrigger
, mikrosadowe przetwarzanie danych trwa do momentu osiągnięcia limitu maxFilesPerTrigger
lub maxBytesPerTrigger
.
Uwaga
W przypadkach, gdy transakcje tabeli źródłowej są czyszczone z powodu logRetentionDuration
konfiguracji, a zapytanie przesyłane strumieniowo próbuje przetworzyć te wersje, domyślnie zapytanie nie może uniknąć utraty danych. Możesz ustawić opcję z failOnDataLoss
na false
, aby ignorować utracone dane i kontynuować przetwarzanie.
Przesyłanie strumieniowe źródła danych zmian usługi Delta Lake (CDC)
Delta Lake kanał danych o zmianach rejestruje zmiany w tabeli Delta, w tym aktualizacje i usuwanie. Po włączeniu można przesyłać strumieniowo ze źródła danych zmian i zapisywać logikę do przetwarzania wstawień, aktualizacji i usuwania do tabel podrzędnych. Mimo że dane wyjściowe zmiany nieznacznie różnią się od tabeli Delta, którą opisują, zapewnia to rozwiązanie do propagowania zmian przyrostowych do poniższych tabel w architekturze medalionu .
Ważne
W środowisku Databricks Runtime 12.2 LTS i poniżej nie można przesyłać strumieniowo ze źródła danych zmian dla tabeli delty z włączonym mapowaniem kolumn, które przeszły ewolucję schematu nie addytywnego, na przykład zmiany nazw lub upuszczania kolumn. Zobacz Przesyłanie strumieniowe za pomocą mapowania kolumn i zmian schematu.
Ignorowanie aktualizacji i usuwania
Przesyłanie strumieniowe ze strukturą nie obsługuje danych wejściowych, które nie są dołączane i zgłasza wyjątek, jeśli jakiekolwiek modyfikacje występują w tabeli używanej jako źródło. Istnieją dwie główne strategie radzenia sobie ze zmianami, których nie można automatycznie propagować podrzędnie:
- Możesz usunąć dane wyjściowe i punkt kontrolny oraz ponownie uruchomić strumień od początku.
- Możesz ustawić jedną z tych dwóch opcji:
-
ignoreDeletes
: ignoruj transakcje, które usuwają dane na granicach partycji. -
skipChangeCommits
: ignoruje transakcje, które usuwają lub modyfikują istniejące rekordy.skipChangeCommits
obejmujeignoreDeletes
.
-
Uwaga
W środowisku Databricks Runtime 12.2 LTS i nowszym skipChangeCommits
oznacza wycofanie poprzedniego ustawienia ignoreChanges
. W środowisku Databricks Runtime 11.3 LTS i niższym jest ignoreChanges
jedyną obsługiwaną opcją.
Semantyka dla ignoreChanges
elementu różni się znacznie od skipChangeCommits
. Z włączoną opcją ignoreChanges
ponownie zapisane pliki danych w tabeli źródłowej są ponownie emitowane po operacji zmiany danych, takiej jak UPDATE
, , MERGE INTO
DELETE
(w ramach partycji) lub OVERWRITE
. Niezmienione wiersze są często emitowane obok nowych wierszy, więc odbiorcy podrzędni muszą mieć możliwość obsługi duplikatów. Usunięcia nie są propagowane w dół.
ignoreChanges
obejmuje ignoreDeletes
.
skipChangeCommits
całkowicie ignoruje operacje zmiany plików. Pliki danych, które zostały przepisane w tabeli źródłowej z powodu operacji zmiany danych, takiej jak UPDATE
, MERGE INTO
, DELETE
i OVERWRITE
, są całkowicie ignorowane. Aby odzwierciedlić zmiany w nadrzędnych tabelach źródłowych, należy zaimplementować oddzielną logikę, aby propagować te zmiany.
Obciążenia skonfigurowane z dalszym działaniem ignoreChanges
przy użyciu znanych semantyki, ale usługa Databricks zaleca korzystanie ze skipChangeCommits
wszystkich nowych obciążeń. Migrowanie obciążeń przy użyciu polecenia ignoreChanges
w celu skipChangeCommits
wymaga logiki refaktoryzacji.
Przykład
Załóżmy na przykład, że masz tabelę user_events
z kolumnami date
, user_email
i action
, które są partycjonowane przez date
. Przesyłasz dane z tabeli user_events
i musisz usunąć z niej dane ze względu na RODO.
Po usunięciu na granicach partycji (czyli WHERE
znajduje się w kolumnie partycji), pliki są już podzielone według wartości, więc usunięcie po prostu usuwa te pliki z metadanych. Po usunięciu całej partycji danych można użyć następujących elementów:
spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
Jeśli usuniesz dane w wielu partycjach (w tym przykładzie filtrowanie według user_email
), użyj następującej składni:
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")
Jeśli zaktualizujesz user_email
za pomocą instrukcji UPDATE
, plik zawierający dany user_email
zostanie przepisany. Użyj polecenia skipChangeCommits
, aby zignorować zmienione pliki danych.
Określanie pozycji początkowej
Poniższe opcje umożliwiają określenie punktu początkowego źródła przesyłania strumieniowego usługi Delta Lake bez przetwarzania całej tabeli.
startingVersion
: wersja usługi Delta Lake do uruchomienia od. Usługa Databricks zaleca pominięcie tej opcji w przypadku większości obciążeń. Gdy nie jest ustalony, strumień rozpoczyna się od najnowszej dostępnej wersji, w tym pełnej migawki tabeli w tym momencie.Jeśli zostanie określony, strumień odczytuje wszystkie zmiany w tabeli delta, począwszy od określonej wersji (włącznie). Jeśli określona wersja nie jest już dostępna, uruchomienie strumienia nie powiedzie się. Wersje zatwierdzeń można uzyskać z kolumny
version
danych wyjściowych polecenia DESCRIBE HISTORY.Aby zwrócić tylko najnowsze zmiany, określ wartość
latest
.startingTimestamp
: znacznik czasu do rozpoczęcia od. Wszystkie zmiany tabeli zatwierdzone w momencie znacznika czasu lub później (włącznie) są odczytywane przez czytnik przesyłania strumieniowego. Jeśli podany znacznik czasu poprzedza wszystkie zatwierdzenia w tabeli, odczyt przesyłania strumieniowego rozpoczyna się od najwcześniejszego dostępnego znacznika czasu. Jeden z:- Ciąg znacznika czasu. Na przykład
"2019-01-01T00:00:00.000Z"
. - Ciąg daty. Na przykład
"2019-01-01"
.
- Ciąg znacznika czasu. Na przykład
Nie można jednocześnie ustawić obu opcji. Zaczynają obowiązywać tylko podczas uruchamiania nowego zapytania przesyłania strumieniowego. Jeśli zapytanie przesyłania strumieniowego zostało uruchomione i postęp został zarejestrowany w punkcie kontrolnym, te opcje są ignorowane.
Ważne
Chociaż źródło przesyłania strumieniowego można uruchomić z określonej wersji lub znacznika czasu, schemat źródła przesyłania strumieniowego jest zawsze najnowszym schematem tabeli Delta. Musisz upewnić się, że w tabeli Delta nie zaszła niezgodna zmiana schematu po określonej wersji lub znaczniku czasu. W przeciwnym razie źródło przesyłania strumieniowego może zwracać nieprawidłowe wyniki podczas odczytywania danych z nieprawidłowym schematem.
Przykład
Załóżmy na przykład, że masz tabelę user_events
. Jeśli chcesz odczytać zmiany od wersji 5, użyj:
spark.readStream
.option("startingVersion", "5")
.table("user_events")
Jeśli chcesz odczytać zmiany od 2018-10-18, użyj:
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")
Przetwórz początkową migawkę bez utraty danych
Ta funkcja jest dostępna w środowisku Databricks Runtime 11.3 LTS lub nowszym.
W przypadku używania tabeli delty jako źródła strumienia zapytanie najpierw przetwarza wszystkie dane obecne w tabeli. Tabela delta w tej wersji jest nazywana początkową migawką. Domyślnie pliki danych tabeli delty są przetwarzane na podstawie tego, który plik został ostatnio zmodyfikowany. Jednak czas ostatniej modyfikacji nie musi reprezentować kolejności czasu zdarzenia rekordu.
W zapytaniu strumieniowym stanowego ze zdefiniowanym znakiem wodnym przetwarzanie plików według czasu modyfikacji może spowodować przetworzenie rekordów w niewłaściwej kolejności. Może to prowadzić do upuszczania rekordów jako opóźnionych zdarzeń przez znak wodny.
Możesz uniknąć problemu z usuwaniem danych, włączając następującą opcję:
- withEventTimeOrder: czy początkowa migawka powinna być przetwarzana z kolejnością czasu zdarzenia.
Po włączeniu kolejności czasu zdarzenia zakres czasu początkowych danych migawki jest podzielony na przedziały czasu. Każda mikrosadowa partia przetwarza zasobnik, filtrując dane w zakresie czasu. Opcje konfiguracji maxFilesPerTrigger i maxBytesPerTrigger nadal mają zastosowanie do kontrolowania rozmiaru mikrobajta, ale tylko w przybliżony sposób ze względu na charakter przetwarzania.
Na poniższej ilustracji przedstawiono ten proces:
Istotne informacje o tej funkcji:
- Problem z usuwaniem danych występuje tylko wtedy, gdy początkowa migawka delty zapytania przesyłania strumieniowego stanowego jest przetwarzana w domyślnej kolejności.
- Nie można zmienić
withEventTimeOrder
po uruchomieniu zapytania strumienia podczas przetwarzania początkowej migawki. Aby ponownie uruchomić polecenie zewithEventTimeOrder
zmianą, należy usunąć punkt kontrolny. - Jeśli uruchamiasz zapytanie strumienia z włączoną funkcjąEventTimeOrder, nie można obniżyć jej do wersji DBR, która nie obsługuje tej funkcji do momentu ukończenia początkowego przetwarzania migawki. Jeśli musisz obniżyć dół, możesz poczekać na zakończenie początkowej migawki lub usunąć punkt kontrolny i ponownie uruchomić zapytanie.
- Ta funkcja nie jest obsługiwana w następujących nietypowych scenariuszach:
- Kolumna czasu zdarzenia jest kolumną generowaną, a między źródłem Delta a znakiem wodnym istnieją przekształcenia niezwiązane z projekcją.
- Istnieje znak wodny, który ma więcej niż jedno źródło Delta w zapytaniu strumieniowym.
- Po włączeniu kolejności czasu zdarzenia wydajność początkowego przetwarzania migawek różnicowych może być niższa.
- Każda mikrosadowa skanuje początkową migawkę w celu filtrowania danych w odpowiednim zakresie czasu zdarzenia. Dla szybszego działania filtru zaleca się użycie źródłowej kolumny Delta jako czasu zdarzenia, aby zastosować pomijanie danych (sprawdź pomijanie danych dla Delta Lake, kiedy jest to możliwe). Ponadto partycjonowanie tabeli wzdłuż kolumny czasu zdarzenia może przyspieszyć przetwarzanie. Możesz sprawdzić interfejs użytkownika platformy Spark, aby zobaczyć, ile plików różnicowych jest skanowanych pod kątem określonej mikrosadowej partii.
Przykład
Załóżmy, że masz tabelę user_events
z kolumną event_time
. Zapytanie przesyłane strumieniowo jest zapytaniem agregacji. Jeśli chcesz mieć pewność, że podczas początkowego przetwarzania migawek nie ma żadnych danych, możesz użyć następujących funkcji:
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")
Uwaga
Można również włączyć tę funkcję za pomocą konfiguracji platformy Spark w klastrze, która będzie stosowana do wszystkich zapytań przesyłania strumieniowego: spark.databricks.delta.withEventTimeOrder.enabled true
tabela Delta jako odbiornik
Dane można również zapisywać w tabeli Delta przy użyciu Structured Streaming. Dziennik transakcji umożliwia usłudze Delta Lake zagwarantowanie dokładnie jednokrotnego przetwarzania, nawet jeśli istnieją inne strumienie lub zapytania wsadowe uruchomione współbieżnie względem tabeli.
Uwaga
Funkcja Delta Lake VACUUM
usuwa wszystkie pliki, które nie są zarządzane przez usługę Delta Lake, ale pomija wszystkie katalogi rozpoczynające się od _
. Punkty kontrolne można bezpiecznie przechowywać wraz z innymi danymi i metadanymi dla tabeli delty przy użyciu struktury katalogów, takiej jak <table-name>/_checkpoints
.
Metryki
Liczbę bajtów i liczbę plików, które nie są jeszcze przetwarzane, można znaleźć w procesie zapytań przesyłanych strumieniowo jako numBytesOutstanding
metryki i numFilesOutstanding
. Dodatkowe metryki obejmują:
-
numNewListedFiles
: liczba plików usługi Delta Lake wymienionych w celu obliczenia listy prac dla tej partii.-
backlogEndOffset
: wersja tabeli używana do obliczania listy prac.
-
Jeśli używasz strumienia w notesie, możesz zobaczyć te metryki na karcie Nieprzetworzone dane na pulpicie nawigacyjnym postępu zapytania przesyłania strumieniowego:
{
"sources": [
{
"description": "DeltaSource[file:/path/to/source]",
"metrics": {
"numBytesOutstanding": "3456",
"numFilesOutstanding": "8"
}
}
]
}
Tryb dołączania
Domyślnie strumienie są uruchamiane w trybie dołączania, który dodaje nowe rekordy do tabeli.
Użyj metody toTable
podczas przesyłania strumieniowego do tabel, co pokazano w poniższym przykładzie.
Python
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)
Scala
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
Tryb ukończenia
Możesz również użyć Structured Streaming, aby zastąpić całą tabelę przy każdym przetwarzaniu partii. Przykładem przypadku użycia jest obliczenie podsumowania przy użyciu agregacji:
Python
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)
Scala
spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
Powyższy przykład stale aktualizuje tabelę zawierającą zagregowaną liczbę zdarzeń według klienta.
W przypadku aplikacji z bardziej łagodnymi wymaganiami dotyczącymi opóźnienia można zaoszczędzić zasoby obliczeniowe za pomocą wyzwalaczy jednorazowych. Służą do aktualizowania tabel podsumowujących agregację według ustalonego harmonogramu, przetwarzając tylko nowe dane, które dotarły od ostatniej aktualizacji.
Upsert z zapytań przesyłanych strumieniowo przy użyciu polecenia foreachBatch
Możesz użyć kombinacji merge
i foreachBatch
do zapisywania złożonych operacji upsert z zapytania przesyłania strumieniowego do Tabeli Delta. Zobacz Używanie polecenia foreachBatch do zapisywania do losowego ujściach danych.
Ten wzorzec zawiera wiele aplikacji, w tym następujące:
- Zapisuj agregacje przesyłania strumieniowego w trybie aktualizacji: Jest to znacznie bardziej wydajne niż tryb końcowy.
-
Zapisywanie strumienia zmian bazy danych w tabeli Delta: zapytanie scalania do zapisywania danych o zmianach może być używane w
foreachBatch
w celu ciągłego stosowania strumienia zmian do tabeli Delta. -
Zapisywanie strumienia danych w Tabeli Delta z deduplikacją: Zapytanie scalające wyłącznie do wstawiania do deduplikacji może być stosowane w
foreachBatch
do ciągłego zapisywania danych (z duplikatami) do Tabeli Delta z automatyczną deduplikacją.
Uwaga
- Upewnij się, że instrukcja
merge
wewnątrzforeachBatch
jest idempotentna, ponieważ ponowne uruchomienia zapytania przesyłania strumieniowego mogą wielokrotnie stosować operację na tej samej partii danych. - W
merge
przypadku użycia wforeachBatch
programie szybkość danych wejściowych zapytania przesyłania strumieniowego (zgłoszonego iStreamingQueryProgress
widocznego na wykresie szybkości notesu) może być zgłaszana jako wielokrotność rzeczywistej szybkości generowania danych w źródle. Dzieje się tak, ponieważmerge
odczytuje dane wejściowe wiele razy, co powoduje pomnożenie metryk wejściowych. Jeśli jest to wąskie gardło, możesz buforować partię DataFrame przedmerge
, a następnie cofnąć jej buforowanie pomerge
.
W poniższym przykładzie pokazano, jak można użyć programu SQL w programie foreachBatch
, aby wykonać to zadanie:
Scala
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
# Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
# Use the view name to apply MERGE
# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
# In Databricks Runtime 10.5 and below, you must use the following:
# microBatchOutputDF._jdf.sparkSession().sql("""
microBatchOutputDF.sparkSession.sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Możesz również użyć interfejsów API usługi Delta Lake do wykonywania operacji upsert przesyłania strumieniowego, jak w poniższym przykładzie:
Scala
import io.delta.tables.*
val deltaTable = DeltaTable.forName(spark, "table_name")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "table_name")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
(deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
zapisy tabeli idempotentnych w foreachBatch
Uwaga
Usługa Databricks zaleca skonfigurowanie oddzielnego zapisu strumieniowego dla każdego ujścia, które chcesz zaktualizować, zamiast używania foreachBatch
. Dzieje się tak, ponieważ zapisy w wielu tabelach są serializowane podczas korzystania z polecenia "foreachBatch", co zmniejsza równoległość i zwiększa ogólne opóźnienie.
Tabele różnicowe obsługują następujące opcje DataFrameWriter
, aby uczynić zapisy do wielu tabel w foreachBatch
idempotentnymi:
-
txnAppId
: unikatowy ciąg, który można przekazać do każdego zapisu ramki danych. Na przykład możesz użyć identyfikatora StreamingQuery jakotxnAppId
. -
txnVersion
: monotonicznie rosnąca liczba, która działa jako wersja transakcji.
Usługa Delta Lake używa kombinacji elementów i txnAppId
do identyfikowania txnVersion
zduplikowanych zapisów i ich ignorowania.
Jeśli zapis wsadowy zostanie przerwany z powodu awarii, ponowne uruchomienie partii używa tej samej aplikacji i identyfikatora partii, aby ułatwić środowisku uruchomieniowemu poprawne zidentyfikowanie zduplikowanych zapisów i zignorowanie ich. Identyfikator aplikacji (txnAppId
) może być dowolnym unikatowym ciągiem generowanym przez użytkownika i nie musi być powiązany z identyfikatorem strumienia. Zobacz Używanie polecenia foreachBatch do zapisywania do losowego ujściach danych.
Ostrzeżenie
Jeśli usuniesz punkt kontrolny przesyłania strumieniowego i uruchomisz ponownie zapytanie przy użyciu nowego punktu kontrolnego, musisz podać inny txnAppId
element . Nowe punkty kontrolne zaczynają się od identyfikatora partii .0
Usługa Delta Lake używa identyfikatora partii i txnAppId
jako unikatowego klucza i pomija partie z już widocznymi wartościami.
W poniższym przykładzie kodu pokazano ten wzorzec:
Python
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
Scala
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2
}