Kurz: Spuštění kompletního analytického kanálu Lakehouse
Tento tutoriál vám ukáže, jak nastavit kompletní analytickou pipeline pro Azure Databricks lakehouse.
Důležité
Tento kurz používá interaktivní poznámkové bloky k dokončení běžných úloh ETL v Pythonu v clusterech s podporou katalogu Unity. Pokud nepoužíváte katalog Unity, přečtěte si téma Spuštění první úlohy ETL v Azure Databricks.
Úkoly v tomto kurzu
Na konci tohoto článku se budete cítit pohodlně:
- Spuštění výpočetního clusteru s podporou katalogu Unity.
- Vytvoření poznámkového bloku Databricks
- Čtení a zápis dat z externího umístění v rámci Unity Catalogu
- Konfigurace načítání přírůstkových dat do tabulky Unity Catalog pomocí Auto Loaderu.
- Spouštění buněk poznámkového bloku pro zpracování, dotazování a náhled dat
- Plánování poznámkového bloku jako úlohy Databricks
- Dotazování tabulek Unity Catalog z Databricks SQL
Azure Databricks poskytuje sadu nástrojů připravených pro produkční prostředí, které odborníkům na data umožňují rychle vyvíjet a nasazovat kanály extrakce, transformace a načítání (ETL). Katalog Unity umožňuje správci dat konfigurovat a zabezpečit přihlašovací údaje úložiště, externí umístění a databázové objekty pro uživatele v celé organizaci. Databricks SQL umožňuje analytikům spouštět dotazy SQL na stejné tabulky, které se používají v produkčních úlohách ETL, což umožňuje ve velkém měřítku business intelligence v reálném čase.
K sestavení kanálů ETL můžete také použít DLT. Databricks vytvořil DLT, aby se snížila složitost sestavování, nasazování a údržby produkčních kanálů ETL. Podívejte se na tutoriál: Spusťte svůj první DLT pipeline.
Požadavky
Poznámka:
Pokud nemáte oprávnění ke kontrole clusteru, můžete většinu následujících kroků dokončit, pokud máte přístup ke clusteru.
Krok 1: Vytvoření clusteru
Pokud chcete provádět průzkumnou analýzu dat a přípravu dat, vytvořte cluster, který poskytuje výpočetní prostředky potřebné ke spouštění příkazů.
- Na bočním panelu klikněte na
Výpočty.
- Klikněte na bočním panelu na
Nová a poté vyberte Cluster. Otevře se stránka Nový cluster/výpočetní prostředky.
- Zadejte jedinečný název clusteru.
- V části Výkon vyberte přepínač Jeden uzel.
- V části Pokročilépřepněte nastavení režimu přístupu na Ruční a pak vyberte Vyhrazený.
- V jeden uživatel nebo skupinavyberte své uživatelské jméno.
- Vyberte požadovanou verzi modulu runtime Databricks, 11.1 nebo vyšší, pokud chcete použít katalog Unity.
- Kliknutím na Vytvořit výpočet vytvořte klastr.
Další informace o clusterech Databricks najdete v tématu Výpočty.
Krok 2: Vytvoření poznámkového bloku Databricks
Chcete-li vytvořit poznámkový blok v pracovním prostoru, klepněte na tlačítko Nový na bočním panelu a potom klepněte na příkaz Poznámkový blok. V pracovním prostoru se otevře prázdný poznámkový blok.
Další informace o vytváření a správě poznámkových bloků najdete v tématu Správa poznámkových bloků.
Krok 3: Zápis a čtení dat z externího umístění spravovaného katalogem Unity
Databricks doporučuje používat Auto Loader pro přírůstkový příjem dat. Auto Loader automaticky rozpozná a zpracuje nové soubory při jejich doručení do cloudového úložiště objektů.
Pomocí katalogu Unity můžete spravovat zabezpečený přístup k externím umístěním. Uživatelé nebo instanční objekty s oprávněními READ FILES
k externímu umístění mohou k příjmu dat použít Auto Loader.
Za normálních okolností data přijdou na externí místo v důsledku zápisů z jiných systémů. V této ukázce můžete simulovat doručení dat tím, že do externího umístění zapíšete soubory JSON.
Zkopírujte následující kód do buňky poznámkového bloku. Hodnotu řetězce pro catalog
nahraďte názvem katalogu s oprávněními CREATE CATALOG
a USE CATALOG
. Nahraďte hodnotu řetězce external_location
cestou pro externí umístění s oprávněními READ FILES
, WRITE FILES
a CREATE EXTERNAL TABLE
.
Externí umístění se dají definovat jako celý kontejner úložiště, ale často odkazují na adresář vnořený do kontejneru.
Správný formát cesty k externímu umístění je "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"
.
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
Při spuštění této buňky by se měl zobrazit řádek s obsahem "12 bytes", vypsat řetězec Hello world! a zobrazit všechny databáze dostupné v katalogu. Pokud se vám nedaří tuto buňku spustit, ověřte, že jste v pracovním prostoru s povoleným katalogem Unity, a požádejte správce pracovního prostoru o správná oprávnění k dokončení tohoto kurzu.
Níže uvedený kód Pythonu používá vaši e-mailovou adresu k vytvoření jedinečné databáze v zadaném katalogu a jedinečného umístění úložiště v externím umístění. Spuštěním této buňky odeberete všechna data přidružená k tomuto kurzu, což vám umožní spustit tento příklad idempotentním způsobem. Třída je definována a pak je vytvořena její instance, kterou použijete k simulaci příchozích dávek dat z připojeného systému do vašeho externího zdrojového umístění.
Zkopírujte tento kód do nové buňky v poznámkovém bloku a spusťte ho a nakonfigurujte prostředí.
Poznámka:
Proměnné definované v tomto kódu by vám měly umožnit bezpečné spuštění bez rizika konfliktu s existujícími prostředky pracovního prostoru nebo jinými uživateli. Omezená oprávnění k síti nebo úložišti způsobí chyby při provádění tohoto kódu; Požádejte správce pracovního prostoru o řešení těchto omezení.
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
Teď můžete získat dávku dat zkopírováním následujícího kódu do buňky a jeho spuštěním. Tuto buňku můžete spustit ručně až 60krát a aktivovat tak nové doručení dat.
RawData.land_batch()
Krok 4: Konfigurace Automatického Zavaděče pro příjem dat do Unity katalogu
Databricks doporučuje ukládat data pomocí Delta Lake. Delta Lake je opensourcová vrstva úložiště, která poskytuje transakce ACID a umožňuje datové jezero. Delta Lake je výchozí formát pro tabulky vytvořené v Databricks.
Pokud chcete nakonfigurovat Auto Loader pro příjem dat do tabulky Unity Katalogu, zkopírujte a vložte následující kód do prázdné buňky v notebooku.
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
Další informace o automatickém zavaděči najdete v tématu Co je automatický zavaděč?.
Další informace o strukturovaném streamování pomocí katalogu Unity najdete v tématu Použití katalogu Unity se strukturovaným streamováním.
Krok 5: Zpracování dat a interakce s nimi
Poznámkové bloky spouštějí logické operace buňku po buňce. Pomocí těchto kroků spusťte logiku v buňce:
Pokud chcete buňku, kterou jste dokončili v předchozím kroku, spustit, vyberte buňku a stiskněte SHIFT+ENTER.
Pokud chcete zadat dotaz na tabulku, kterou jste právě vytvořili, zkopírujte a vložte následující kód do prázdné buňky a stisknutím kláves SHIFT+ENTER buňku spusťte.
df = spark.read.table(table)
Pokud chcete zobrazit náhled dat v datovém rámci, zkopírujte a vložte následující kód do prázdné buňky a potom buňku spusťte stisknutím kombinace kláves SHIFT+ENTER .
display(df)
Další informace o interaktivních možnostech vizualizace dat najdete v tématu Vizualizace v poznámkových blocích Databricks.
Krok 6: Naplánování úlohy
Poznámkové bloky Databricks můžete spustit jako produkční skripty tím, že je přidáte jako úkol do úlohy Databricks. V tomto kroku vytvoříte novou úlohu, kterou můžete aktivovat ručně.
Naplánujte svůj poznámkový blok jako úkol:
- Na pravé straně záhlaví klikněte na Plán .
- Zadejte jedinečný název pro název úlohy.
- Klikněte na Ruční.
- V rozevíracím seznamu Cluster vyberte cluster, který jste vytvořili v kroku 1.
- Klikněte na Vytvořit.
- V zobrazeném okně klikněte na Spustit nyní.
- Pokud chcete zobrazit výsledky spuštění úlohy, klikněte na
ikonu vedle časového razítka posledního spuštění .
Další informace o úlohách najdete v tématu Co jsou úlohy?.
Krok 7: Dotazování tabulky z Databricks SQL
Každý, kdo má oprávnění USE CATALOG
k aktuálnímu katalogu, USE SCHEMA
oprávnění k aktuálnímu schématu a SELECT
oprávnění v tabulce může dotazovat obsah tabulky z preferovaného rozhraní Databricks API.
Ke spouštění dotazů v Databricks SQL potřebujete přístup ke spuštěné službě SQL Warehouse.
Tabulka, kterou jste vytvořili dříve v tomto kurzu, má název target_table
. Můžete ho dotazovat pomocí katalogu, který jste zadali v první buňce a databázi s paternem e2e_lakehouse_<your-username>
. K vyhledání datových objektů, které jste vytvořili, můžete použít
Další integrace
Další informace o integracích a nástrojích pro přípravu dat pomocí Azure Databricks: