Używanie strumienia danych zmian w Delta Lake na platformie Azure Databricks
Zestawienie zmian danych umożliwia usłudze Azure Databricks śledzenie zmian na poziomie wiersza między wersjami tabeli delty. Po włączeniu na tabeli Delta, środowisko uruchomieniowe rejestruje zdarzenia zmiany dla wszystkich danych zapisanych w tabeli. Obejmuje to dane wierszy wraz z metadanymi wskazującymi, czy określony wiersz został wstawiony, usunięty lub zaktualizowany.
Ważne
Zestawienie zmian danych działa razem z historią tabel w celu udostępnienia informacji o zmianach. Ponieważ klonowanie tabeli delty tworzy oddzielną historię, zestawienie danych zmian w sklonowanych tabelach nie jest zgodne z oryginalną tabelą.
Przyrostowe przetwarzanie zmian danych
Firma Databricks zaleca użycie funkcji zmian danych w połączeniu z Structured Streaming, aby stopniowo przetwarzać zmiany w tabelach Delta. Musisz użyć strukturalnego przesyłania strumieniowego w usłudze Azure Databricks, aby automatycznie śledzić wersje informacji o zmianach danych w tabeli.
Uwaga
DLT zapewnia funkcjonalność łatwego propagowania danych zmian i przechowywania wyników jako SCD (wolnozmiennego wymiaru) typu 1 lub typu 2 tabel. Zobacz Interfejsy API ZASTOSUJ ZMIANY: Uprość przechwytywanie danych zmian za pomocą DLT.
Aby odczytać strumień danych o zmianach z tabeli, musisz włączyć go dla tej tabeli. Zobacz Włączanie zestawienia danych zmian.
Ustaw opcję readChangeFeed
na true
podczas konfigurowania strumienia względem tabeli, aby odczytać strumień danych zmian, jak pokazano w poniższym przykładzie składni.
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
Domyślnie strumień zwraca najnowszą migawkę tabeli przy pierwszym uruchomieniu jako INSERT
oraz przyszłe zmiany jako dane zmian.
Zatwierdzenie danych następuje jako część transakcji Delta Lake i stają się dostępne jednocześnie, gdy nowe dane zostają zatwierdzone w tabeli.
Opcjonalnie możesz określić wersję początkową. Zobacz Czy należy określić wersję początkową?.
Kanał danych zmian obsługuje również przetwarzanie wsadowe, które wymaga określenia wersji początkowej. Zobacz Odczyt zmian w zapytaniach wsadowych.
Opcje, takie jak limity szybkości (maxFilesPerTrigger
, maxBytesPerTrigger
) i excludeRegex
są również obsługiwane podczas odczytywania danych zmiany.
Ograniczanie szybkości może być atomowe dla wersji innych niż początkowa wersja migawki. Oznacza to, że cała wersja zatwierdzenia będzie ograniczona lub zostanie zwrócone całe zatwierdzenie.
Czy należy określić wersję początkową?
Opcjonalnie możesz określić wersję początkową, jeśli chcesz zignorować zmiany, które wystąpiły przed określoną wersją. Wersję można określić przy użyciu znacznika czasu lub numeru identyfikatora wersji zarejestrowanego w dzienniku transakcji delty.
Uwaga
Do odczytu wsadowego wymagana jest wersja początkowa, a wiele wzorców wsadowych może skorzystać z ustawienia opcjonalnej wersji końcowej.
Podczas konfigurowania obciążeń Structured Streaming z feedem danych zmian ważne jest, aby zrozumieć, jak określenie wersji początkowej wpływa na przetwarzanie.
Wiele obciążeń przesyłania strumieniowego, zwłaszcza nowych potoków przetwarzania danych, korzysta z domyślnego zachowania. W przypadku zachowania domyślnego pierwsza partia jest przetwarzana, gdy strumień najpierw rejestruje wszystkie istniejące rekordy w tabeli jako INSERT
operacje w strumieniu danych zmian.
Jeśli tabela docelowa zawiera już wszystkie zapisy z odpowiednimi zmianami do pewnego momentu, określ wersję początkową, aby uniknąć przetwarzania stanu tabeli źródłowej jako INSERT
zdarzeń.
Następująca przykładowa składnia odzyskiwania po niepowodzeniu przesyłania strumieniowego, w którym punkt kontrolny został uszkodzony. W tym przykładzie przyjęto założenie, że spełnione są następujące warunki:
- Strumień danych o zmianach został włączony w tabeli źródłowej podczas jej tworzenia.
- Docelowa tabela podrzędna przetworzyła wszystkie zmiany do wersji 75 włącznie.
- Historia wersji tabeli źródłowej jest dostępna dla wersji 70 lub nowszych.
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
W tym przykładzie należy również określić nową lokalizację punktu kontrolnego.
Ważne
Jeśli określisz wersję początkową, strumień nie może rozpocząć się od nowego punktu kontrolnego, jeśli wersja początkowa nie jest już obecna w historii tabeli. Usługa Delta Lake automatycznie czyści historyczne wersje, co oznacza, że wszystkie określone wersje początkowe zostaną ostatecznie usunięte.
Zobacz Czy mogę użyć zestawienia zmian danych, aby odtworzyć całą historię tabeli?.
Odczyt zmian w zapytaniach wsadowych
Składnia zapytań wsadowych umożliwia odczytywanie wszystkich zmian rozpoczynających się od określonej wersji lub odczytywanie zmian w określonym zakresie wersji.
Należy określić wersję jako liczbę całkowitą oraz znacznik czasu jako ciąg znaków w formacie yyyy-MM-dd[ HH:mm:ss[.SSS]]
.
Wersje początkowe i końcowe są uwzględniane w zapytaniach. Aby odczytać zmiany z określonej wersji początkowej do najnowszej wersji tabeli, określ tylko wersję początkową.
Jeśli podasz wersję niższą lub sygnaturę czasową starszą niż ta, która zarejestrowała zdarzenia zmiany — to znaczy, gdy strumień danych o zmianach został włączony — zostanie zgłoszony błąd wskazujący, że strumień danych o zmianach nie został włączony.
W poniższych przykładach składni pokazano, jak używać opcji wersji początkowej i końcowej przy odczytach wsadowych.
SQL
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
Python
# version as ints or longs
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala
// version as ints or longs
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Uwaga
Domyślnie jeśli użytkownik przekazuje wersję lub znacznik czasu przekraczający ostatnie zatwierdzenie w tabeli, zostanie zgłoszony błąd timestampGreaterThanLatestCommit
. W środowisku Databricks Runtime 11.3 LTS lub nowszym kanał danych zmiany może obsłużyć sytuację wersji spoza zakresu, jeśli użytkownik skonfiguruje następującą konfigurację na true
:
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Jeśli podasz wersję początkową większą niż ostatnie zatwierdzenie w tabeli lub znacznik czasu rozpoczęcia nowszy niż ostatnie zatwierdzenie w tabeli, po włączeniu poprzedniej konfiguracji zwracany jest pusty wynik odczytu.
Jeśli podasz wersję końcową większą niż ostatnie zatwierdzenie w tabeli lub znacznik czasu zakończenia nowszy niż ostatnie zatwierdzenie w tabeli, po włączeniu poprzedniej konfiguracji w trybie odczytu wsadowego zostaną zwrócone wszystkie zmiany między wersją początkową a ostatnim zatwierdzeniem.
Jaki jest schemat zestawienia zmian danych?
Podczas odczytywania ze źródła danych zmian dla tabeli jest używany schemat najnowszej wersji tabeli.
Uwaga
Większość operacji zmiany schematu i ewolucji jest w pełni obsługiwana. Tabela z włączonym mapowaniem kolumn nie obsługuje wszystkich przypadków użycia i demonstruje różne zachowanie. Zobacz Zmienianie ograniczeń źródła danych dla tabel z włączonym mapowaniem kolumn.
Oprócz kolumn danych ze schematu tabeli delta zestawienie danych zawiera kolumny metadanych identyfikujące typ zdarzenia zmiany:
Nazwa kolumny | Typ | Wartości |
---|---|---|
_change_type |
Struna/Sznurek (if applicable) |
insert , update_preimage , update_postimage delete (1) |
_commit_version |
Długi | Wersja dziennika Delta lub tabeli, która zawiera zmianę. |
_commit_timestamp |
Sygnatura czasowa | Sygnatura czasowa skojarzona podczas tworzenia zatwierdzenia. |
(1)preimage
jest wartością przed aktualizacją, postimage
jest wartością po aktualizacji.
Uwaga
Nie można włączyć strumienia danych zmiany w tabeli, jeśli struktura zawiera kolumny o takich samych nazwach jak te dodane kolumny. Zmień nazwy kolumn w tabeli, aby rozwiązać ten konflikt przed próbą włączenia strumienia danych zmian.
Włącz strumień danych o zmianach
Zmiany w danych można odczytywać tylko dla tablic, które mają włączoną obsługę. Należy jawnie włączyć opcję zmiany strumienia danych przy użyciu jednej z następujących metod:
Nowa tabela: ustaw właściwość
delta.enableChangeDataFeed = true
tabeli w poleceniuCREATE TABLE
.CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
Istniejąca tabela: ustaw właściwość
delta.enableChangeDataFeed = true
tabeli w poleceniuALTER TABLE
.ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Wszystkie nowe tabele:
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
Ważne
Rejestrowane są tylko zmiany wprowadzone po włączeniu kanału danych o zmianach. Wcześniejsze zmiany w tabeli nie są przechwytywane.
Zmienianie magazynu danych
Włączenie mechanizmu śledzenia zmian danych powoduje niewielki wzrost kosztów przechowywania dla tabeli. Rekordy danych zmiany są generowane podczas uruchamiania zapytania i są zazwyczaj znacznie mniejsze niż całkowity rozmiar przepisanych plików.
Usługa Azure Databricks rejestruje dane zmian dla operacji UPDATE
, DELETE
i MERGE
w folderze _change_data
w katalogu tabeli. Niektóre operacje, takie jak operacje tylko wstawiania i usuwanie pełnej partycji, nie generują danych w katalogu _change_data
, ponieważ usługa Azure Databricks może efektywnie obliczyć strumień danych zmian bezpośrednio z dziennika transakcji.
Wszystkie operacje odczytu dotyczące plików danych w folderze _change_data
powinny przechodzić przez obsługiwane API Delta Lake.
Pliki w folderze _change_data
są zgodne z zasadami przechowywania tabeli. Dane strumienia zmian są usuwane, gdy polecenie VACUUM
zostaje uruchomione.
Czy mogę użyć zestawienia zmian danych, aby odtworzyć całą historię tabeli?
Zestawienie danych zmian nie ma służyć jako trwały rekord wszystkich zmian w tabeli. Zmiana źródła danych rejestruje tylko zmiany, które występują po jej włączeniu.
Kanał danych zmian i Delta Lake umożliwiają zawsze odtworzenie pełnej migawki tabeli źródłowej, co oznacza, że możesz rozpocząć nowy proces odczytu strumieniowego względem tabeli z włączonym kanałem danych zmian i przechwycić bieżącą wersję tej tabeli oraz wszystkie zmiany, które następują później.
Rekordy w kanale danych zmian należy traktować jako przejściowe i dostępne tylko dla określonego okna przechowywania. Dziennik transakcji delty usuwa wersje tabel i odpowiadające im wersje zestawienia danych zmian w regularnych odstępach czasu. Po usunięciu wersji z dziennika transakcji nie można już odczytać zestawienia danych zmian dla tej wersji.
Jeśli przypadek użycia wymaga zachowania trwałej historii wszystkich zmian w tabeli, należy użyć logiki przyrostowej do zapisywania rekordów ze źródła danych zmian do nowej tabeli. W poniższym przykładzie kodu pokazano użycie funkcjonalności trigger.AvailableNow
, która wykorzystuje inkrementalne przetwarzanie w Strukturalnym Przesyłaniu Strumieniowym, ale przetwarza dostępne dane jako obciążenie wsadowe. Można zaplanować to obciążenie asynchronicznie przy użyciu głównych potoków przetwarzania, aby utworzyć kopię zapasową strumienia danych zmian do celów audytowych lub pełnej odtwarzalności.
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Zmienianie ograniczeń źródła danych dla tabel z włączonym mapowaniem kolumn
Po włączeniu mapowania kolumn w tabeli delty można usuwać lub zmieniać nazwy kolumn w tabeli bez ponownego zapisywania plików danych dla istniejących danych. Po włączeniu mapowania kolumn zmiana źródła danych ma ograniczenia po wprowadzeniu zmian schematu nie addytywnego, takich jak zmiana nazwy lub usunięcie kolumny, zmiana typu danych lub zmiany wartości null.
Ważne
- Nie można odczytać strumienia danych zmian dla transakcji lub zakresu, w którym następuje nieaddytywna zmiana schematu przy użyciu semantyki wsadowej.
- W środowisku Databricks Runtime 12.2 LTS i wcześniejszych wersjach tabele z włączonym mapowaniem kolumn, które doświadczyły nieaddytywnych zmian schematu, nie obsługują odczytów przesyłanych strumieniowo na kanale zmiany danych. Zobacz Przesyłanie strumieniowe za pomocą mapowania kolumn i zmian schematu.
- W wersji Databricks Runtime 11.3 LTS i wcześniejszych nie można odczytać strumienia zmian danych dla tabel z włączonym mapowaniem kolumn, które przeszły zmianę nazwy lub usunięcie kolumn.
W środowisku Databricks Runtime 12.2 LTS lub nowszym można wykonywać operacje odczytu wsadowego dla strumienia danych zmian dla tabel z włączonym mapowaniem kolumn, które doświadczyły zmian schematu, które nie są addytywne. Zamiast używać schematu najnowszej wersji tabeli, operacje odczytu używają schematu końcowej wersji tabeli określonej w zapytaniu. Zapytania nadal kończą się niepowodzeniem, jeśli określony zakres wersji obejmuje zmianę schematu nie addytywnego.