Ładowanie danych przy użyciu tabel strumieniowych w Databricks SQL
Databricks rekomenduje używanie tabel strumieniowych do ładowania danych przy użyciu Databricks SQL. Tabela przesyłania strumieniowego to tabela zarejestrowana w Katalogu Unity z dodatkową obsługą przesyłania strumieniowego lub przetwarzania danych przyrostowych. Przepływ DLT jest tworzony automatycznie dla każdej tabeli przesyłania strumieniowego. Można używać tabel przesyłania strumieniowego do przyrostowego ładowania danych z systemu Kafka oraz magazynów obiektów w chmurze.
W tym artykule pokazano, jak wykorzystywać tabele strumieniowe do ładowania danych z magazynu obiektów w chmurze, skonfigurowanego jako wolumin Unity Catalog (zalecane) lub zewnętrzne miejsce przechowywania danych.
Uwaga
Aby dowiedzieć się, jak używać tabel usługi Delta Lake jako źródeł i odbiorników przesyłania strumieniowego, zobacz artykuł Delta table streaming reads and writes.
Ważne
Tabele przesyłania strumieniowego utworzone w usłudze Databricks SQL są wspierane przez bezserwerowy potok DLT. Aby korzystać z tej funkcji, obszar roboczy musi obsługiwać potoki bezserwerowe.
Przed rozpoczęciem
Przed rozpoczęciem należy spełnić następujące wymagania.
Wymagania dotyczące obszaru roboczego:
- Konto usługi Azure Databricks z włączoną obsługą bezserwerową. Aby uzyskać więcej informacji, zobacz Włączanie bezserwerowych magazynów SQL.
- Obszar roboczy z włączonym Unity Catalog. Aby uzyskać więcej informacji, zobacz Konfigurowanie katalogu Unity i zarządzanie nim.
Wymagania dotyczące obliczeń:
Należy użyć jednej z następujących opcji:
- Magazyn SQL korzystający z kanału
Current
. - Obliczenia ze standardowym trybem dostępu (dawniej trybem dostępu współdzielonego) na środowisku Databricks Runtime 13.3 LTS lub nowszym.
Środowisko obliczeniowe z dedykowanym trybem dostępu (dawniej trybem dostępu pojedynczego użytkownika) w środowisku Databricks Runtime 15.4 LTS lub nowszym.
W środowisku Databricks Runtime 15.3 lub nowszym nie można używać dedykowanych obliczeń do wykonywania zapytań dotyczących tabel przesyłania strumieniowego należących do innych użytkowników. Możesz użyć dedykowanych zasobów obliczeniowych w środowisku Databricks Runtime 15.3 i nowszym tylko wtedy, gdy jesteś właścicielem tabeli przesyłania strumieniowego. Stwórcą tabeli jest właściciel.
Środowisko Databricks Runtime 15.4 LTS i nowsze obsługują zapytania dotyczące tabel generowanych przez bibliotekę DLT w dedykowanych obliczeniach, niezależnie od własności tabel. Aby skorzystać z filtrowania danych dostępnego w środowisku Databricks Runtime 15.4 LTS lub nowszym, należy upewnić się, że obszar roboczy jest włączony dla bezserwerowych obliczeniowych, ponieważ funkcja filtrowania danych obsługując tabele generowane przez bibliotekę DLT jest uruchamiana na bezserwerowych obliczeniach. Opłaty mogą być naliczane za zasoby obliczeniowe bezserwerowe w przypadku korzystania z dedykowanych zasobów obliczeniowych do uruchamiania operacji filtrowania danych. Zobacz szczegółową kontrolę dostępu w przypadku dedykowanych obliczeń (dawniej obliczenia pojedynczego użytkownika).
Wymagania dotyczące uprawnień:
- Uprawnienie
READ FILES
w odniesieniu do zewnętrznej lokalizacji Unity Catalog. Aby uzyskać informacje, zobacz Tworzenie lokalizacji zewnętrznej w celu połączenia magazynu w chmurze z usługą Azure Databricks. - Uprawnienie
USE CATALOG
w katalogu, w którym tworzysz tabelę przesyłania strumieniowego. - Uprawnienie
USE SCHEMA
w schemacie, w którym tworzysz tabelę przesyłania strumieniowego. - Uprawnienie
CREATE TABLE
w schemacie, w którym tworzysz tabelę przesyłania strumieniowego.
Inne wymagania:
Ścieżka do danych źródłowych.
Przykład ścieżki woluminu:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
Przykład ścieżki lokalizacji zewnętrznej:
abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis
Uwaga
W tym artykule założono, że dane, które chcesz załadować, znajdują się w lokalizacji magazynowej w chmurze, która odpowiada woluminowi lub lokalizacji zewnętrznej Unity Catalog, do której masz dostęp.
Odnajdywanie i wyświetlanie podglądu danych źródłowych
Na pasku bocznym obszaru roboczego kliknij pozycję Zapytania, a następnie kliknij pozycję Utwórz zapytanie.
W edytorze zapytań wybierz magazyn SQL, który używa kanału
Current
z listy rozwijanej.Wklej następujący kod do edytora, podstawiając wartości w nawiasach kątowych (
<>
) w celu uzyskania informacji identyfikujących dane źródłowe, a następnie kliknij przycisk Uruchom.Uwaga
Podczas uruchamiania funkcji zwracającej tabelę
read_files
mogą wystąpić błędy wnioskowania schematu, jeśli wartości domyślne funkcji nie są w stanie przetworzyć twoich danych. Na przykład może być konieczne skonfigurowanie trybu wielowierszowego dla wielowierszowych plików CSV lub JSON. Aby uzyskać listę opcji analizatora, zobacz funkcję tabelarycznąread_files
./* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
Załaduj dane do tabeli strumieniowej
Aby stworzyć tabelę strumieniową na podstawie danych w magazynie obiektów w chmurze, wklej poniższe w edytorze zapytań, a następnie kliknij przycisk Uruchom:
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')
Ustaw kanał środowiska uruchomieniowego
Tabele przesyłania strumieniowego utworzone przy użyciu usługi SQL Warehouse są automatycznie odświeżane przy użyciu potoku DLT. Potoki DLT domyślnie używają środowiska uruchomieniowego w kanale current
. Zobacz informacje o wersji DLT i proces uaktualniania wersji, aby dowiedzieć się więcej o procesie wydawania.
Databricks rekomenduje użycie kanału current
dla obciążeń produkcyjnych. Nowe funkcje są po raz pierwszy udostępniane w preview
kanale. Możesz ustawić pipeline na kanał podglądowy DLT, aby przetestować nowe funkcje, poprzez określenie preview
jako właściwości tabeli. Tę właściwość można określić podczas tworzenia tabeli lub po utworzeniu tabeli przy użyciu instrukcji ALTER.
W poniższym przykładzie kodu pokazano, jak ustawić kanał na podgląd w instrukcji CREATE:
CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
*
FROM
range(5)
Odśwież tabelę przesyłania strumieniowego przy użyciu potoku DLT
W tej sekcji opisano wzorce odświeżania tabeli przesyłania strumieniowego z najnowszymi danymi dostępnymi ze źródeł zdefiniowanych w zapytaniu.
Gdy wykonasz operację CREATE
lub REFRESH
na tabeli przesyłania strumieniowego, procesy aktualizacji są wykonywane przy użyciu bezserwerowego potoku DLT. Każda zdefiniowana tabela przesyłania strumieniowego ma skojarzony potok DLT.
Po uruchomieniu polecenia REFRESH
zostanie zwrócony link do potoku DLT. Aby sprawdzić stan odświeżania, możesz użyć linku do pipeline'u DLT.
Uwaga
Tylko właściciel tabeli może odświeżyć tabelę przesyłania strumieniowego, aby pobrać najnowsze dane. Użytkownik tworzący tabelę jest właścicielem i nie można zmienić właściciela. Może być konieczne odświeżenie tabeli przesyłania strumieniowego przed użyciem zapytań podróży w czasie.
Zobacz Co to jest DLT?.
Tylko wprowadzanie nowych danych
Domyślnie funkcja read_files
odczytuje wszystkie istniejące dane w katalogu źródłowym podczas tworzenia tabeli, a następnie przetwarza nowo przybywające rekordy przy każdym odświeżeniu.
Aby uniknąć pozyskiwania danych, które już istnieją w katalogu źródłowym w momencie tworzenia tabeli, ustaw opcję includeExistingFiles
na false
. Oznacza to, że tylko dane, które docierają do katalogu po utworzeniu tabeli, są przetwarzane. Na przykład:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
includeExistingFiles => false)
W pełni odśwież tabelę transmisji strumieniowej
Pełne odświeżenia ponownie przetwarzają wszystkie dane dostępne w źródle przy użyciu najnowszej definicji. Nie zaleca się wywoływania pełnych odświeżeń w źródłach, które nie przechowują całej historii danych lub mają krótkie okresy przechowywania, takie jak Kafka, ponieważ pełne odświeżanie obcina istniejące dane. Odzyskanie starych danych może nie być możliwe, jeśli dane nie są już dostępne w źródle.
Na przykład:
REFRESH STREAMING TABLE my_bronze_table FULL
Zaplanuj tabelę transmisji strumieniowej do automatycznego odświeżania
Aby skonfigurować tabelę przesyłania strumieniowego w celu automatycznego odświeżania na podstawie zdefiniowanego harmonogramu, wklej następujące polecenie w edytorze zapytań, a następnie kliknij przycisk Uruchom:
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
Na przykład, aby zobaczyć zapytania dotyczące harmonogramu odświeżania, sprawdź ALTER STREAMING TABLE.
Śledź stan odświeżania
Stan odświeżania tabeli przesyłania strumieniowego można wyświetlić, wyświetlając potok, który zarządza tabelą przesyłania strumieniowego w interfejsie użytkownika DLT lub wyświetlając Odśwież informacje zwrócone przez polecenie DESCRIBE EXTENDED
dla tabeli przesyłania strumieniowego.
DESCRIBE EXTENDED <table-name>
Pozyskiwanie strumieniowe z platformy Kafka
Aby zapoznać się z przykładem strumieniowego pobierania danych z platformy Kafka, zobacz read_kafka.
Udzielanie użytkownikom dostępu do tabeli strumieniowej
Aby przyznać użytkownikom uprawnienia SELECT
w tabeli przesyłania strumieniowego, aby mogli wykonywać względem niej zapytania, wklej następujące polecenie w edytorze zapytań, a następnie kliknij przycisk Uruchom:
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
Aby uzyskać więcej informacji na temat udzielania uprawnień do obiektów zabezpieczalnych Unity Catalogu, zobacz temat uprawnienia Unity Catalogu i obiekty zabezpieczalne.
Trwale usunąć rekordy z tabeli przesyłania strumieniowego
Ważne
Obsługa instrukcji REORG
z tabelami przesyłania strumieniowego jest dostępna w publicznej wersji zapoznawczej.
Uwaga
- Użycie instrukcji
REORG
z tabelą strumieniową wymaga środowiska Databricks Runtime w wersji 15.4 lub nowszej. - Chociaż można użyć instrukcji
REORG
z dowolną tabelą przesyłania strumieniowego, jest ona wymagana tylko podczas usuwania rekordów z takiej tabeli z włączonymi wektorami usuwania . Polecenie nie ma wpływu po użyciu z tabelą przesyłania strumieniowego bez włączonych wektorów usuwania.
Aby fizycznie usunąć rekordy z magazynu bazowego dla tabeli przesyłania strumieniowego z włączonymi wektorami usuwania, takimi jak zgodność z RODO, należy wykonać dodatkowe kroki, aby upewnić się, że operacja VACUUM jest uruchamiana na danych tabeli przesyłania strumieniowego.
Poniżej opisano te kroki bardziej szczegółowo:
- Zaktualizuj rekordy lub usuń rekordy z tabeli przesyłania strumieniowego.
- Wykonaj instrukcję
REORG
względem tabeli strumieniowej, określając parametrAPPLY (PURGE)
. Na przykładREORG TABLE <streaming-table-name> APPLY (PURGE);
. - Poczekaj, aż minie okres przechowywania danych tabeli przesyłania strumieniowego. Domyślny okres przechowywania danych wynosi siedem dni, ale można go skonfigurować za pomocą właściwości tabeli
delta.deletedFileRetentionDuration
. Zobacz Konfigurowanie przechowywania danych dla zapytań dotyczących podróży w czasie. -
REFRESH
tabela przesyłania strumieniowego. Zobacz Odświeżanie tabeli przesyłania strumieniowego przy użyciu potoku DLT. W ciągu 24 godzin od operacjiREFRESH
, zadania konserwacji DLT, w tym operacjaVACUUM
, która jest wymagana, aby zapewnić trwałe usunięcie rekordów, są uruchamiane automatycznie. Sprawdź Zadania konserwacji wykonywane przez technologię DLT.
Monitorowanie przebiegów przy użyciu historii zapytań
Możesz użyć strony historii zapytań, aby uzyskać dostęp do szczegółów zapytań i profilów zapytań, które mogą pomóc w identyfikowaniu słabych zapytań i wąskich gardeł w potoku DLT używanym do uruchamiania aktualizacji tabeli przesyłania strumieniowego. Aby zapoznać się z omówieniem rodzaju informacji dostępnych w historiach zapytań i profilach zapytań, zobacz Historia zapytań i Profil zapytania.
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej. Administratorzy obszaru roboczego mogą włączyć tę funkcję na stronie Podglądy . Zobacz Zarządzanie wersjami zapoznawczymi Azure Databricks.
Wszystkie instrukcje związane z tabelami strumieniowymi występują w historii zapytań. Możesz użyć rozwijanej listy filtrującej Statement, aby wybrać dowolne polecenie i przejrzeć powiązane zapytania. Po wszystkich instrukcjach CREATE
następuje instrukcja REFRESH
wykonywana asynchronicznie w pipeline DLT. Instrukcje REFRESH
zwykle zawierają szczegółowe plany zapytań, które zapewniają wgląd w optymalizację wydajności.
Aby uzyskać dostęp do REFRESH
instrukcji w interfejsie użytkownika historii zapytań, wykonaj następujące kroki:
- Kliknij
na lewym pasku bocznym, aby otworzyć Historię zapytań.
- Zaznacz pole wyboru REFRESH z rozwijanego filtra Oświadczenia.
- Kliknij nazwę instrukcji zapytania, aby wyświetlić szczegóły podsumowania, takie jak czas trwania zapytania i zagregowane metryki.
- Kliknij pozycję Zobacz profil zapytania, aby otworzyć profil zapytania. Aby uzyskać szczegółowe informacje na temat nawigowania w profilu zapytania, zobacz Profil zapytania.
- Możesz użyć linków w sekcji Źródło zapytania, aby otworzyć powiązane zapytanie lub potok danych opcjonalnie.
Możesz również uzyskać dostęp do szczegółów zapytania przy użyciu linków w edytorze SQL lub notesu dołączonego do usługi SQL Warehouse.