Udostępnij za pośrednictwem


Przesyłanie strumieniowe i pozyskiwanie przyrostowe

Usługa Azure Databricks używa Strukturalnego Przesyłania Strumieniowego Apache Spark do obsługi wielu produktów związanych z zadaniami związanymi z pozyskiwaniem danych, w tym:

  • Automatyczna ładowarka
  • COPY INTO
  • Potoki DLT
  • Zmaterializowane widoki i tabele strumieniowe w usłudze Databricks SQL

W tym artykule omówiono niektóre różnice między semantyką przetwarzania strumieniowego a przyrostowego przetwarzania wsadowego oraz przedstawiono ogólny zarys konfigurowania obciążeń związanych z pozyskiwaniem danych dla żądanej semantyki w usłudze Databricks.

Jaka jest różnica między przesyłaniem strumieniowym a partiowym przyrostowym pobieraniem danych?

Możliwe konfiguracje przepływu pracy związanej z przyjmowaniem danych wahają się od przetwarzania niemal w czasie rzeczywistym do rzadkiego przyrostowego przetwarzania wsadowego. Oba wzorce korzystają ze strumieniowego przesyłania danych w Apache Spark do obsługi przetwarzania przyrostowego, ale mają różną semantykę. Dla uproszczenia ten artykuł odnosi się do pozyskiwania danych w czasie rzeczywistym jako pozyskiwania przesyłania strumieniowego oraz do rzadszego przyrostowego przetwarzania wsadowego jako przyrostowego pozyskiwania wsadowego.

Pozyskiwanie danych za pośrednictwem przesyłania strumieniowego

Przetwarzanie strumieniowe w kontekście pobierania danych i aktualizacji tabel oznacza przetwarzanie danych niemal w czasie rzeczywistym, gdzie Azure Databricks pobiera rekordy ze źródła do miejsca docelowego w mikropakietach, wykorzystując infrastrukturę działającą bez przerwy. Obciążenie przesyłania strumieniowego stale pozyskiwa aktualizacje ze skonfigurowanych źródeł danych, chyba że wystąpi błąd, który zatrzymuje pozyskiwanie.

Przyrostowe wczytywanie wsadowe

Przyrostowy import wsadowy odnosi się do wzorca, w którym wszystkie nowe rekordy z źródła danych są przetwarzane w krótkotrwałym zadaniu. Pozyskiwanie wsadowe przyrostowe często występuje zgodnie z harmonogramem, ale może być również wyzwalane ręcznie lub na podstawie przybycia pliku.

Przyrostowe pozyskiwanie wsadowe różni się od pozyskiwania wsadowego, ponieważ automatycznie wykrywa nowe rekordy w źródle danych i ignoruje rekordy, które zostały już pozyskane.

Pozyskiwanie przy użyciu zadań

Zadania Databricks umożliwiają orkiestrację przepływów pracy i harmonogramowanie zadań, które obejmują notatniki, biblioteki, potoki DLT oraz zapytania SQL Databricks.

Uwaga

Aby skonfigurować przyrostowe pozyskiwanie wsadowe, można użyć wszystkich typów jednostek obliczeniowych i rodzajów zadań usługi Azure Databricks. Strumieniowe przyjmowanie danych jest obsługiwane tylko w produkcji na klasycznych maszynach obliczeniowych zadań i DLT.

Zadania mają dwa podstawowe tryby działania:

  • Zadania ciągłe są automatycznie ponawiane w przypadku wystąpienia awarii. Ten tryb jest przeznaczony do pozyskiwania danych przesyłanych strumieniowo.
  • Wyzwalane zadania uruchamiają zadania po wyzwoleniu. Wyzwalacze obejmują:
    • Wyzwalacze oparte na czasie, które uruchamiają zadania zgodnie z określonym harmonogramem.
    • Wyzwalacze oparte na plikach, które uruchamiają zadania, gdy pliki trafiają do określonej lokalizacji.
    • Inne wyzwalacze to na przykład wywołania interfejsu API REST, wykonywanie poleceń wiersza poleceń usługi Azure Databricks lub kliknięcie przycisku Uruchom teraz w interfejsie użytkownika obszaru roboczego.

W przypadku przyrostowych obciążeń wsadowych skonfiguruj zadania, używając trybu wyzwalacza AvailableNow, w następujący sposób:

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("table_name")

W przypadku obciążeń przesyłania strumieniowego domyślny interwał wyzwalacza to processingTime ="500ms". W poniższym przykładzie pokazano, jak przetwarzać mikrosadę co 5 sekund:

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(processingTime="5 seconds")
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.ProcessingTime, "5 seconds")
  .toTable("table_name")

Ważne

Zadania bezserwerowe nie obsługują języka Scala, trybu ciągłego ani interwałów wyzwalaczy opartych na czasie dla przesyłania strumieniowego ze strukturą. Użyj zadań klasycznych, jeśli potrzebujesz semantyki pozyskiwania danych niemal w czasie rzeczywistym.

Ingestowanie z użyciem DLT

Podobnie jak w przypadku zadań, potoki DLT mogą być uruchamiane w trybie na żądanie lub ciągłym. Aby uzyskać semantykę niemal rzeczywistego czasu w tabelach przesyłania strumieniowego, użyj trybu ciągłego.

Użyj tabel strumieniowych, aby skonfigurować pozyskiwanie danych strumieniowych lub przyrostowe przetwarzanie partii z magazynu obiektów w chmurze, Apache Kafka, Amazon Kinesis, Google Pub/Sub lub Apache Pulsar.

Program Lakeflow Connect używa biblioteki DLT do konfigurowania potoków pozyskiwania z połączonych systemów. Zobacz Lakeflow Connect.

Zmaterializowane widoki gwarantują semantykę operacji równoważną obciążeniom wsadowym, ale mogą zoptymalizować wiele operacji w celu obliczenia wyników przyrostowo. Zobacz Odświeżanie przyrostowe, aby uzyskać zmaterializowane widoki.

Wczytywanie danych przy użyciu Databricks SQL

Możesz używać tabel strumieniowych do konfigurowania przyrostowego pobierania danych partiami z magazynów obiektów w chmurze, Apache Kafka, Amazon Kinesis, Google Pub/Sub lub Apache Pulsar.

Za pomocą zmaterializowanych widoków można skonfigurować przyrostowe przetwarzanie wsadowe z źródeł Delta. Zobacz Odświeżanie przyrostowe, aby uzyskać zmaterializowane widoki.

COPY INTO Udostępnia znaną składnię SQL na potrzeby przyrostowego przetwarzania wsadowego dla plików danych w magazynie obiektów w chmurze. COPY INTO zachowanie jest podobne do wzorców obsługiwanych przez tabele strumieniowe dla przechowywania obiektów w chmurze, ale nie wszystkie ustawienia domyślne są odpowiednie dla wszystkich obsługiwanych formatów plików.