A strukturált streamelés gyártási megfontolásai
Ez a cikk javaslatokat tartalmaz a strukturált streamelési számítási feladatok Azure Databricks-feladatok használatával történő ütemezéséhez.
A Databricks mindig a következőket javasolja:
- Távolítsa el a szükségtelen kódot olyan jegyzetfüzetekből, amelyek eredményeket adnak vissza, például
display
éscount
. - Ne futtasson strukturált streamelési számítási feladatokat általános célú számítással. A streameket mindig feladatokként ütemezze a feladatok számítóegységével.
- Feladatok ütemezése a
Continuous
mód használatával. - Ne engedélyezze az automatikus skálázást a strukturált streamelési feladatok számításához.
Egyes számítási feladatok a következőkből profitálnak:
- A RocksDB állapottároló konfigurálása az Azure Databricksben
- Állapotalapú lekérdezések aszinkron állapot-ellenőrzőpontja
- Mi az aszinkron folyamatkövetés?
Az Azure Databricks bevezette a DLT-t, hogy csökkentse a strukturált streamelési számítási feladatok éles infrastruktúrájának kezelésének összetettségét. A Databricks a DLT használatát javasolja az új strukturált streamelési folyamatokhoz. Lásd Mi az a DLT?.
Megjegyzés
A számítási erőforrások automatikus skálázása korlátozott a strukturált streamelési számítási feladatok fürt méretének csökkentésében. A Databricks a DLT használatát javasolja továbbfejlesztett automatikus skálázással a streamelési számítási feladatokhoz. Lásd: A DLT-folyamatok fürtkihasználtságának optimalizálását az automatikus skálázás továbbfejlesztésével.
A Databricks azt javasolja, hogy mindig úgy konfigurálja a streamelési feladatokat, hogy azok automatikusan újrainduljanak hibák esetén. Egyes funkciók, beleértve a sémafejlődést, feltételezik, hogy a strukturált streamelési számítási feladatok automatikus újrapróbálkozására vannak konfigurálva. Lásd: Strukturált streamelési feladatok konfigurálása a streamelési lekérdezések sikertelen újraindításához.
Egyes műveletek, például foreachBatch
a pontos garancia helyett legalább egyszer nyújtanak garanciát. Annak érdekében, hogy ezek a műveletek megfelelőek legyenek, biztosítania kell, hogy a feldolgozási folyamat idempotens legyen. Lásd: A foreachBatch használata tetszőleges adatelnyelőkbe való íráshoz című témakört.
Megjegyzés
Amikor egy lekérdezés újraindul, az előző futtatás során tervezett mikrorészlet kerül feldolgozásra. Ha a feladat memóriahiba miatt meghiúsult, vagy egy túlméretezett mikroköteg miatt manuálisan megszakított egy feladatot, előfordulhat, hogy fel kell skáláznia a számítást a mikro köteg sikeres feldolgozásához.
Ha a futtatások közötti konfigurációkat módosítja, ezek a konfigurációk az első tervezett új kötegre vonatkoznak. Lásd: Helyreállítás a strukturált streamelési lekérdezés változásai után.
Egy Azure Databricks-feladat részeként több feladatot is ütemezhet. Ha egy feladatot a folyamatos eseményindítóval konfigurál, nem állíthat be függőségeket a tevékenységek között.
Az alábbi módszerek egyikével több streamet ütemezhet egy feladatba:
- Több tevékenység: Több feladattal rendelkező feladat definiálása, amely folyamatos eseményindítóval futtat streamelési számítási feladatokat.
- Több lekérdezés: Több streamelési lekérdezés definiálása egyetlen tevékenység forráskódjában.
Ezeket a stratégiákat kombinálhatja is. Az alábbi táblázat ezeket a megközelítéseket hasonlítja össze.
Több feladat | Több lekérdezés | |
---|---|---|
Hogyan osztják meg a számítást? | A Databricks azt javasolja, hogy az egyes streamelési feladatokhoz megfelelő méretű számítási feladatokat helyezzen üzembe. Igény szerint megoszthatja a számítást a tevékenységek között. | Minden lekérdezés ugyanazt a számítást használja. Opcionálisan lekérdezéseket rendelhet az ütemezőkészletekhez. |
Hogyan kezelik az újrapróbálkozásokat? | A feladat újrapróbálkozása előtt minden tevékenységnek sikertelennek kell lennie. | A feladat újrapróbálkozza, ha valamelyik lekérdezés meghiúsul. |
Strukturált streamelési feladatok konfigurálása a streamelési lekérdezések sikertelen újraindításához
A Databricks azt javasolja, hogy a folyamatos eseményindítóval konfigurálja az összes streamelési számítási feladatot. Tekintse meg a feladatok folyamatos futtatása című témakört.
A folyamatos eseményindító alapértelmezés szerint a következő viselkedést biztosítja:
- Megakadályozza, hogy egy feladat egyszerre többször is fusson.
- Új futtatás indítása, ha egy korábbi futtatás meghiúsul.
- Exponenciális visszalépést használ az újrapróbálkozáshoz.
A Databricks azt javasolja, hogy a munkafolyamatok ütemezése során mindig a feladatok számítását használja a teljes körű számítás helyett. Feladathiba és újrapróbálkozás esetén új számítási erőforrások kerülnek telepítésre.
Megjegyzés
Nem kell használnia streamingQuery.awaitTermination()
vagy spark.streams.awaitAnyTermination()
. A feladatok automatikusan megakadályozzák a futtatás végrehajtását, ha egy streamelési lekérdezés aktív.
Az ütemezési készleteket úgy konfigurálhatja, hogy számítási kapacitást rendeljenek a lekérdezésekhez, ha ugyanazon forráskódból több streamelési lekérdezést futtat.
Alapértelmezés szerint az összes lekérdezés ugyanabban a tisztességes ütemezési csoportban fut egy jegyzetfüzetben. A jegyzetfüzetben lévő összes streamlekérdezés eseményindítói által létrehozott Apache Spark-feladatok egymás után futnak "first in, first out" (FIFO) sorrendben. Ez szükségtelen késéseket okozhat a lekérdezésekben, mivel nem hatékonyan osztják meg a klaszter erőforrásait.
A Scheduler-készletek lehetővé teszik, hogy deklarálja, mely strukturált streaming lekérdezések osztanak meg számítási erőforrásokat.
Az alábbi példa query1
-t egy dedikált készlethez rendel hozzá, míg query2
és query3
megosztanak egy ütemezőkészletet.
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
Megjegyzés
A helyi tulajdonság konfigurációjának ugyanabban a jegyzetfüzetcellában kell lennie, ahol a streamelési lekérdezést elindítja.
További részletekért tekintse meg az Apache fair scheduler dokumentációját .