Zagadnienia dotyczące produkcji związane ze strukturalnym przesyłaniem strumieniowym
Ten artykuł zawiera zalecenia dotyczące planowania obciążeń przesyłania strumieniowego ze strukturą przy użyciu zadań w usłudze Azure Databricks.
Usługa Databricks zaleca, aby zawsze wykonywać następujące czynności:
- Usuń niepotrzebny kod z notatników, który może zwrócić wyniki, takie jak
display
icount
. - Nie uruchamiaj obciążeń Structured Streaming przy użyciu obliczeń o ogólnym przeznaczeniu. Zawsze planuj strumienie jako zadania, korzystając z obliczeń zadaniowych.
- Planowanie zadań w trybie
Continuous
. - Nie włączaj automatycznego skalowania dla obliczeń dla zadań przesyłania strumieniowego ze strukturą.
Niektóre obciążenia korzystają z następujących elementów:
- Konfigurowanie magazynu stanów bazy danych RocksDB w usłudze Azure Databricks
- Asynchroniczne sprawdzanie stanu dla zapytań stanowych
- Co to jest śledzenie postępu asynchronicznego?
Usługa Azure Databricks wprowadziła bibliotekę DLT w celu zmniejszenia złożoności zarządzania infrastrukturą produkcyjną dla obciążeń przesyłania strumieniowego ze strukturą. Databricks zaleca użycie DLT dla nowych potoków przesyłania strumieniowego w architekturze strukturalnej. Zobacz Co to jest DLT?.
Uwaga
Automatyczne skalowanie zasobów obliczeniowych ma ograniczenia w zmniejszaniu rozmiaru klastra dla obciążeń w ramach przesyłania strumieniowego. Firma Databricks zaleca używanie DLT z rozszerzonym skalowaniem automatycznym dla obciążeń strumieniowych. Zobacz Optymalizowanie wykorzystania klastra potoków DLT za pomocą rozszerzonego skalowania automatycznego.
Projektuj obciążenia strumieniowe, aby uwzględniały możliwość awarii
Usługa Databricks zaleca zawsze konfigurowanie zadań streamingu do automatycznego restartu po awarii. Niektóre funkcje, w tym ewolucja schematu, zakładają, że obciążenia Structured Streaming są skonfigurowane w celu automatycznego ponawiania prób. Zobacz Konfigurowanie zadań strumieniowania strukturalnego do ponownego uruchamiania zapytań strumieniowych w przypadku niepowodzenia.
Niektóre operacje, takie jak foreachBatch
, zapewniają gwarancje co najmniej jednokrotne zamiast gwarancji dokładnie jednokrotnych. W przypadku tych operacji należy się upewnić, że potok przetwarzania jest idempotentny. Zobacz Używanie polecenia foreachBatch do zapisywania do dowolnego ujść danych.
Uwaga
Po ponownym uruchomieniu zapytania mikropartia przetwarzana podczas poprzedniego uruchomienia. Jeśli twoje zadanie nie powiodło się z powodu błędu braku pamięci lub ręcznie anulowałeś je z powodu zbyt dużego mikrosadowego przetwarzania, może być konieczne zwiększenie mocy obliczeniowej, aby pomyślnie przetworzyć mikrosadę.
Jeśli zmienisz konfiguracje między przebiegami, te konfiguracje zostaną zastosowane do pierwszej nowej partii zaplanowanej. Zobacz Odzyskiwanie po zmianach w zapytaniu Strukturowanego przesyłania strumieniowego.
Kiedy zadanie jest ponawiane?
Możesz zaplanować wiele zadań w ramach zadania usługi Azure Databricks. Podczas konfigurowania zadania przy użyciu wyzwalacza ciągłego nie można ustawić zależności między zadaniami.
Możesz zaplanować wiele strumieni w jednym zadaniu przy użyciu jednego z następujących podejść:
- Wiele zadań: Zdefiniuj zadanie obejmujące wiele zadań, które przetwarzają obciążenia strumieniowe przy użyciu wyzwalacza ciągłego.
- Wiele zapytań: zdefiniuj wiele zapytań przesyłanych strumieniowo w kodzie źródłowym dla jednego zadania.
Można również połączyć te strategie. W poniższej tabeli porównaliśmy te podejścia.
Wiele zadań | Wiele zapytań | |
---|---|---|
Jak współużytkowane są zasoby obliczeniowe? | Databricks zaleca wdrożenie zasobów obliczeniowych o odpowiednim rozmiarze do każdego zadania przesyłania strumieniowego. Opcjonalnie możesz udostępniać zasoby obliczeniowe między zadaniami. | Wszystkie zapytania współdzielą te same obliczenia. Możesz przypisywać zapytania opcjonalnie do pul harmonogramu. |
Jak są obsługiwane ponawianie prób? | Wszystkie zadania muszą zakończyć się niepowodzeniem, zanim praca zostanie ponownie podjęta. | Zadanie ponawia próbę, jeśli jakiekolwiek zapytanie zakończy się niepowodzeniem. |
Konfigurowanie zadań przesyłania strumieniowego ze strukturą w celu ponownego uruchamiania zapytań przesyłanych strumieniowo w przypadku niepowodzenia
Databricks zaleca skonfigurowanie wszystkich obciążeń przesyłania strumieniowego za pomocą wyzwalacza ciągłego. Zobacz Uruchamiaj zadania w sposób ciągły.
Wyzwalacz ciągły domyślnie zapewnia następujące zachowanie:
- Zapobiega więcej niż jednemu współbieżnemu uruchomieniu zadania.
- Uruchamia nowy przebieg, gdy poprzedni przebieg zakończy się niepowodzeniem.
- Używa wycofywania wykładniczego dla ponownych prób.
Databricks zaleca zawsze używanie zasobów obliczeniowych dla zadań zamiast ogólnych zasobów obliczeniowych podczas planowania przepływów pracy. W przypadku niepowodzenia zadania i ponawiania próby nowe zasoby obliczeniowe są wdrażane.
Uwaga
Nie trzeba używać streamingQuery.awaitTermination()
ani spark.streams.awaitAnyTermination()
. Zadania automatycznie uniemożliwiają ukończenie uruchomienia, gdy zapytanie przesyłane strumieniowo jest aktywne.
Używanie pul harmonogramu dla wielu zapytań przesyłania strumieniowego
Pule harmonogramu można skonfigurować tak, aby przypisywać pojemność obliczeniową do zapytań podczas uruchamiania wielu zapytań przesyłanych strumieniowo z tego samego kodu źródłowego.
Domyślnie wszystkie zapytania uruchomione w notesie są uruchamiane w tej samej puli równomiernego przydzielania zasobów. Zadania platformy Apache Spark generowane przez wyzwalacze ze wszystkich zapytań przesyłanych strumieniowo w notesie są uruchamiane jeden po drugim w kolejności "pierwszy na wejściu, pierwszy na wyjściu" (FIFO). Może to spowodować niepotrzebne opóźnienia w zapytaniach, ponieważ nie współdzielą zasobów klastra.
Pule harmonogramów umożliwiają deklarowanie, które ustrukturyzowane zapytania przesyłania strumieniowego współdzielą zasoby obliczeniowe.
Poniższy przykład przypisuje query1
do dedykowanej puli, a następnie query2
query3
udostępnia pulę harmonogramu.
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
Uwaga
Konfiguracja właściwości lokalnej musi znajdować się w tej samej komórce notesu, w której uruchamiasz zapytanie przesyłania strumieniowego.
Aby uzyskać więcej informacji, zobacz dokumentację usługi Apache Fair Scheduler.