Sdílet prostřednictvím


Kurz: Spuštění kompletního analytického kanálu Lakehouse

Tento tutoriál vám ukáže, jak nastavit kompletní analytickou pipeline pro Azure Databricks lakehouse.

Důležité

Tento kurz používá interaktivní poznámkové bloky k dokončení běžných úloh ETL v Pythonu v clusterech s podporou katalogu Unity. Pokud nepoužíváte katalog Unity, přečtěte si téma Spuštění první úlohy ETL v Azure Databricks.

Úkoly v tomto kurzu

Na konci tohoto článku se budete cítit pohodlně:

  1. Spuštění výpočetního clusteru s podporou katalogu Unity.
  2. Vytvoření poznámkového bloku Databricks
  3. Čtení a zápis dat z externího umístění v rámci Unity Catalogu
  4. Konfigurace načítání přírůstkových dat do tabulky Unity Catalog pomocí Auto Loaderu.
  5. Spouštění buněk poznámkového bloku pro zpracování, dotazování a náhled dat
  6. Plánování poznámkového bloku jako úlohy Databricks
  7. Dotazování tabulek Unity Catalog z Databricks SQL

Azure Databricks poskytuje sadu nástrojů připravených pro produkční prostředí, které odborníkům na data umožňují rychle vyvíjet a nasazovat kanály extrakce, transformace a načítání (ETL). Katalog Unity umožňuje správci dat konfigurovat a zabezpečit přihlašovací údaje úložiště, externí umístění a databázové objekty pro uživatele v celé organizaci. Databricks SQL umožňuje analytikům spouštět dotazy SQL na stejné tabulky, které se používají v produkčních úlohách ETL, což umožňuje ve velkém měřítku business intelligence v reálném čase.

K sestavení kanálů ETL můžete také použít DLT. Databricks vytvořil DLT, aby se snížila složitost sestavování, nasazování a údržby produkčních kanálů ETL. Podívejte se na tutoriál: Spusťte svůj první DLT pipeline.

Požadavky

Poznámka:

Pokud nemáte oprávnění ke kontrole clusteru, můžete většinu následujících kroků dokončit, pokud máte přístup ke clusteru.

Krok 1: Vytvoření clusteru

Pokud chcete provádět průzkumnou analýzu dat a přípravu dat, vytvořte cluster, který poskytuje výpočetní prostředky potřebné ke spouštění příkazů.

  1. Na bočním panelu klikněte na ikona VýpočtyVýpočty.
  2. Klikněte na bočním panelu na novou ikonuNová a poté vyberte Cluster. Otevře se stránka Nový cluster/výpočetní prostředky.
  3. Zadejte jedinečný název clusteru.
  4. V části Výkon vyberte přepínač Jeden uzel.
  5. V části Pokročilépřepněte nastavení režimu přístupu na Ruční a pak vyberte Vyhrazený.
  6. V jeden uživatel nebo skupinavyberte své uživatelské jméno.
  7. Vyberte požadovanou verzi modulu runtime Databricks, 11.1 nebo vyšší, pokud chcete použít katalog Unity.
  8. Kliknutím na Vytvořit výpočet vytvořte klastr.

Další informace o clusterech Databricks najdete v tématu Výpočty.

Krok 2: Vytvoření poznámkového bloku Databricks

Chcete-li vytvořit poznámkový blok v pracovním prostoru, klepněte na tlačítko Nová ikonaNový na bočním panelu a potom klepněte na příkaz Poznámkový blok. V pracovním prostoru se otevře prázdný poznámkový blok.

Další informace o vytváření a správě poznámkových bloků najdete v tématu Správa poznámkových bloků.

Krok 3: Zápis a čtení dat z externího umístění spravovaného katalogem Unity

Databricks doporučuje používat Auto Loader pro přírůstkový příjem dat. Auto Loader automaticky rozpozná a zpracuje nové soubory při jejich doručení do cloudového úložiště objektů.

Pomocí katalogu Unity můžete spravovat zabezpečený přístup k externím umístěním. Uživatelé nebo instanční objekty s oprávněními READ FILES k externímu umístění mohou k příjmu dat použít Auto Loader.

Za normálních okolností data přijdou na externí místo v důsledku zápisů z jiných systémů. V této ukázce můžete simulovat doručení dat tím, že do externího umístění zapíšete soubory JSON.

Zkopírujte následující kód do buňky poznámkového bloku. Hodnotu řetězce pro catalog nahraďte názvem katalogu s oprávněními CREATE CATALOG a USE CATALOG. Nahraďte hodnotu řetězce external_location cestou pro externí umístění s oprávněními READ FILES, WRITE FILES a CREATE EXTERNAL TABLE.

Externí umístění se dají definovat jako celý kontejner úložiště, ale často odkazují na adresář vnořený do kontejneru.

Správný formát cesty k externímu umístění je "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location".


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

Při spuštění této buňky by se měl zobrazit řádek s obsahem "12 bytes", vypsat řetězec Hello world! a zobrazit všechny databáze dostupné v katalogu. Pokud se vám nedaří tuto buňku spustit, ověřte, že jste v pracovním prostoru s povoleným katalogem Unity, a požádejte správce pracovního prostoru o správná oprávnění k dokončení tohoto kurzu.

Níže uvedený kód Pythonu používá vaši e-mailovou adresu k vytvoření jedinečné databáze v zadaném katalogu a jedinečného umístění úložiště v externím umístění. Spuštěním této buňky odeberete všechna data přidružená k tomuto kurzu, což vám umožní spustit tento příklad idempotentním způsobem. Třída je definována a pak je vytvořena její instance, kterou použijete k simulaci příchozích dávek dat z připojeného systému do vašeho externího zdrojového umístění.

Zkopírujte tento kód do nové buňky v poznámkovém bloku a spusťte ho a nakonfigurujte prostředí.

Poznámka:

Proměnné definované v tomto kódu by vám měly umožnit bezpečné spuštění bez rizika konfliktu s existujícími prostředky pracovního prostoru nebo jinými uživateli. Omezená oprávnění k síti nebo úložišti způsobí chyby při provádění tohoto kódu; Požádejte správce pracovního prostoru o řešení těchto omezení.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

Teď můžete získat dávku dat zkopírováním následujícího kódu do buňky a jeho spuštěním. Tuto buňku můžete spustit ručně až 60krát a aktivovat tak nové doručení dat.

RawData.land_batch()

Krok 4: Konfigurace Automatického Zavaděče pro příjem dat do Unity katalogu

Databricks doporučuje ukládat data pomocí Delta Lake. Delta Lake je opensourcová vrstva úložiště, která poskytuje transakce ACID a umožňuje datové jezero. Delta Lake je výchozí formát pro tabulky vytvořené v Databricks.

Pokud chcete nakonfigurovat Auto Loader pro příjem dat do tabulky Unity Katalogu, zkopírujte a vložte následující kód do prázdné buňky v notebooku.

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(source)
  .select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Další informace o automatickém zavaděči najdete v tématu Co je automatický zavaděč?.

Další informace o strukturovaném streamování pomocí katalogu Unity najdete v tématu Použití katalogu Unity se strukturovaným streamováním.

Krok 5: Zpracování dat a interakce s nimi

Poznámkové bloky spouštějí logické operace buňku po buňce. Pomocí těchto kroků spusťte logiku v buňce:

  1. Pokud chcete buňku, kterou jste dokončili v předchozím kroku, spustit, vyberte buňku a stiskněte SHIFT+ENTER.

  2. Pokud chcete zadat dotaz na tabulku, kterou jste právě vytvořili, zkopírujte a vložte následující kód do prázdné buňky a stisknutím kláves SHIFT+ENTER buňku spusťte.

    df = spark.read.table(table)
    
  3. Pokud chcete zobrazit náhled dat v datovém rámci, zkopírujte a vložte následující kód do prázdné buňky a potom buňku spusťte stisknutím kombinace kláves SHIFT+ENTER .

    display(df)
    

Další informace o interaktivních možnostech vizualizace dat najdete v tématu Vizualizace v poznámkových blocích Databricks.

Krok 6: Naplánování úlohy

Poznámkové bloky Databricks můžete spustit jako produkční skripty tím, že je přidáte jako úkol do úlohy Databricks. V tomto kroku vytvoříte novou úlohu, kterou můžete aktivovat ručně.

Naplánujte svůj poznámkový blok jako úkol:

  1. Na pravé straně záhlaví klikněte na Plán .
  2. Zadejte jedinečný název pro název úlohy.
  3. Klikněte na Ruční.
  4. V rozevíracím seznamu Cluster vyberte cluster, který jste vytvořili v kroku 1.
  5. Klikněte na Vytvořit.
  6. V zobrazeném okně klikněte na Spustit nyní.
  7. Pokud chcete zobrazit výsledky spuštění úlohy, klikněte na Externí odkaz ikonu vedle časového razítka posledního spuštění .

Další informace o úlohách najdete v tématu Co jsou úlohy?.

Krok 7: Dotazování tabulky z Databricks SQL

Každý, kdo má oprávnění USE CATALOG k aktuálnímu katalogu, USE SCHEMA oprávnění k aktuálnímu schématu a SELECT oprávnění v tabulce může dotazovat obsah tabulky z preferovaného rozhraní Databricks API.

Ke spouštění dotazů v Databricks SQL potřebujete přístup ke spuštěné službě SQL Warehouse.

Tabulka, kterou jste vytvořili dříve v tomto kurzu, má název target_table. Můžete ho dotazovat pomocí katalogu, který jste zadali v první buňce a databázi s paternem e2e_lakehouse_<your-username>. K vyhledání datových objektů, které jste vytvořili, můžete použít Průzkumníka katalogu.

Další integrace

Další informace o integracích a nástrojích pro přípravu dat pomocí Azure Databricks: