Az első ETL-számítási feladat futtatása az Azure Databricksben
Megtudhatja, hogyan fejlesztheti és helyezheti üzembe az első kinyerési, átalakító és betöltési (ETL-) folyamatokat az Azure Databricks éles üzemre kész eszközeinek használatával az adatvezényléshez.
A cikk végére kényelmesen érezheti magát:
- Teljes célú számítási fürt indítása a Databricksben.
- Databricks-jegyzetfüzet létrehozása.
- Növekményes adatbetöltés konfigurálása a Delta Lake-be automatikus betöltővel.
- Jegyzetfüzetcellák végrehajtása az adatok feldolgozásához, lekérdezéséhez és előnézetéhez.
- Jegyzetfüzet ütemezése Databricks-feladatként.
Ez az oktatóanyag interaktív jegyzetfüzeteket használ a Pythonban vagy a Scalában végzett gyakori ETL-feladatok elvégzéséhez.
A DLT használatával ETL-folyamatokat is létrehozhat. A Databricks azért hozta létre a DLT-t, hogy csökkentse az éles ETL-csővezetékek létrehozásának, üzembe helyezésének és karbantartásának összetettségét. Lásd oktatóanyagot: Az első DLT-folyamat futtatása.
A Databricks Terraform-szolgáltatóval is létrehozhatja a cikk erőforrásait. Lásd: Fürtök, jegyzetfüzetek és feladatok létrehozása a Terraformmal.
Követelmények
- Bejelentkeztél egy Azure Databricks-munkaterületre.
- Önnek engedélye van klaszter létrehozására.
Feljegyzés
Ha nem rendelkezik fürtvezérlési jogosultságokkal, akkor is elvégezheti az alábbi lépések többségét, amíg hozzáférése van egy fürthöz.
1. lépés: Klaszter létrehozása
Feltáró adatelemzés és adatinfrastruktúra fejlesztés elvégzéséhez hozzon létre egy fürtöt, amely biztosítja a parancsok végrehajtásához szükséges számítási erőforrásokat.
- Kattintson a Számítás gombra
az oldalsávon.
- A Compute oldalon kattintson a Cluster létrehozása elemre. Ekkor megnyílik az Új klaszter oldal.
- Adjon meg egy egyedi nevet a fürtnek, hagyja meg a fennmaradó értékeket az alapértelmezett állapotban, és kattintson a Fürt létrehozása gombra.
A Databricks-fürtökkel kapcsolatos további információkat tekintse meg: Compute.
2. lépés: Databricks-jegyzetfüzet létrehozása
Ha jegyzetfüzetet szeretne létrehozni a munkaterületen, kattintson az Oldalsáv Új gombjára, majd a Jegyzetfüzet elemre. Megnyílik egy üres jegyzetfüzet a munkaterületen.
A jegyzetfüzetek létrehozásáról és kezeléséről további információt a Jegyzetfüzetek kezelése című témakörben talál.
3. lépés: Az automatikus betöltő konfigurálása adatok Delta Lake-be való betöltéséhez
A Databricks az Automatikus betöltő használatát javasolja a növekményes adatbetöltéshez. Az automatikus betöltő automatikusan észleli és feldolgozza az új fájlokat, amikor megérkeztek a felhőobjektum-tárolóba.
A Databricks azt javasolja, hogy tárolja az adatokat a Delta Lake-zel. A Delta Lake egy nyílt forráskód tárolási réteg, amely ACID-tranzakciókat biztosít, és lehetővé teszi a data lakehouse-t. A Databricksben létrehozott táblák alapértelmezett formátuma a Delta Lake.
Ha úgy szeretné konfigurálni az Automatikus betöltőt, hogy adatokat töltsen be egy Delta Lake-táblába, másolja és illessze be a következő kódot a jegyzetfüzet üres cellájába:
Python
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
Scala
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
Feljegyzés
A kódban definiált változóknak lehetővé kell tenni, hogy biztonságosan végrehajtsa azt anélkül, hogy a meglévő munkaterületi eszközökkel vagy más felhasználókkal ütközhet. A korlátozott hálózati vagy tárolási engedélyek hibát okoznak a kód végrehajtásakor; A korlátozások elhárításához forduljon a munkaterület rendszergazdájához.
Az automatikus betöltőről további információt a Mi az automatikus betöltő? című témakörben talál.
4. lépés: Adatok feldolgozása és használata
A jegyzetfüzetek logikai műveleteket cellánként hajtanak végre. A logika végrehajtása a cellában:
Az előző lépésben befejezett cella futtatásához jelölje ki a cellát, és nyomja le a SHIFT+ENTER billentyűkombinációt.
Az imént létrehozott táblázat lekérdezéséhez másolja és illessze be a következő kódot egy üres cellába, majd nyomja le a SHIFT+ENTER billentyűkombinációt a cella futtatásához.
Python
df = spark.read.table(table_name)
Scala
val df = spark.read.table(table_name)
A DataFrame adatainak előnézetéhez másolja és illessze be a következő kódot egy üres cellába, majd nyomja le a SHIFT+ENTER billentyűkombinációt a cella futtatásához.
Python
display(df)
Scala
display(df)
Az adatok megjelenítésének interaktív lehetőségeiről további információt a Databricks-jegyzetfüzetek vizualizációi című témakörben talál.
5. lépés: Feladat ütemezése
A Databricks-jegyzetfüzeteket éles szkriptekként futtathatja, ha feladatként adja hozzá őket egy Databricks-feladathoz. Ebben a lépésben létrehoz egy új feladatot, amelyet manuálisan aktiválhat.
A jegyzetfüzet feladatként való ütemezése:
- Kattintson az Ütemezés gombra a fejlécsáv jobb oldalán.
- Adjon meg egy egyedi nevet a feladatnévnek.
- Kattintson a Kézi gombra.
- Cluster legördülő listában válassza ki az első lépésben létrehozott fürtöt.
- Kattintson a Létrehozásra.
- A megjelenő ablakban kattintson a Futtatás gombra.
- A feladatfuttatás eredményeinek megtekintéséhez kattintson az
Utolsó futtatási időbélyeg melletti ikonra.
További információ a feladatokról: Mik azok a feladatok?.
További integrációk
További információ az Azure Databricks adatmérnöki integrációjáról és eszközeiről: