Condividi tramite


Esercitazione: Eseguire una pipeline di analisi completa per lakehouse

Questa esercitazione illustra come configurare una pipeline di analisi end-to-end per un lakehouse di Azure Databricks.

Importante

Questa esercitazione utilizza notebook interattivi per completare attività ETL comuni in Python su cluster abilitati per il Catalogo Unity. Se non si usa il catalogo Unity, vedere Eseguire il primo carico di lavoro ETL in Azure Databricks.

Attività in questa esercitazione

Al termine di questo articolo, si avrà familiarità con:

  1. Avvio di un cluster di elaborazione abilitato al Catalogo Unity.
  2. Creazione di un notebook di Databricks.
  3. Scrittura e lettura di dati da una posizione esterna del catalogo Unity.
  4. Configurazione dell'inserimento incrementale dei dati in una tabella del catalogo Unity con il caricatore automatico.
  5. Esecuzione delle celle del notebook per elaborare, eseguire query e ottenere un'anteprima dei dati.
  6. Pianificazione di un notebook come attività di Databricks.
  7. Esecuzione di query sulle tabelle del catalogo Unity da Databricks SQL

Azure Databricks offre una suite di strumenti pronti per la produzione che consentono ai professionisti dei dati di sviluppare e distribuire rapidamente pipeline di estrazione, trasformazione e caricamento (ETL). Il catalogo Unity consente agli amministratori dei dati di configurare e proteggere le credenziali di archiviazione, i percorsi esterni e gli oggetti di database per gli utenti in un'organizzazione. Databricks SQL consente agli analisti di eseguire query SQL sulle stesse tabelle usate nei carichi di lavoro ETL di produzione, consentendo funzionalità di business intelligence in tempo reale su larga scala.

È anche possibile usare DLT per compilare pipeline ETL. Databricks ha creato DLT per ridurre la complessità della compilazione, della distribuzione e della gestione delle pipeline ETL di produzione. Vedi Esercitazione: Esegui la tua prima pipeline DLT.

Requisiti

Nota

Se non si dispone dei privilegi di controllo del cluster, è comunque possibile completare la maggior parte dei passaggi seguenti purché si abbia accesso a un cluster.

Passaggio 1: Creare un cluster

Per eseguire l'analisi esplorativa dei dati e la progettazione dei dati, creare un cluster per fornire le risorse di calcolo necessarie per eseguire i comandi.

  1. Nella barra laterale fare clic su icona dell’ambiente di calcoloAmbiente di calcolo.
  2. Fare clic su Nuova iconaNuovo nella barra laterale e quindi selezionare Cluster. In questo modo si apre la pagina Nuovo cluster/Calcolo.
  3. Specificare un nome univoco per il cluster.
  4. Nella sezione Prestazioni selezionare il pulsante di opzione nodo singolo.
  5. In Advancedimpostare la modalità di accesso su Manuale, quindi selezionare Dedicato.
  6. In singolo utente o gruppo, seleziona il tuo nome utente.
  7. Selezionare la versione desiderata del Databricks runtime , 11.1 o versione successiva per usare il catalogo Unity.
  8. Fare clic su Create compute per creare il cluster.

Per altre informazioni sui cluster Databricks, consultare Ambiente di calcolo.

Passaggio 2: Creare un notebook in Databricks

Per creare un Notebook nell'area di lavoro, fare clic su Nuova iconaNuovo nella barra laterale e quindi su Notebook. Viene aperto un Notebook vuoto nell'area di lavoro.

Per altre informazioni sulla creazione e la gestione dei Notebook, vedere Gestire i Notebook.

Passaggio 3: Scrivere e leggere dati da una posizione esterna gestita dal catalogo Unity

Databricks consiglia di usare il caricatore automatico per l'inserimento incrementale dei dati. Il caricatore automatico rileva ed elabora automaticamente nuovi file quando arrivano nell'archiviazione di oggetti cloud.

Usare il catalogo Unity per gestire l'accesso sicuro a posizioni esterne. Gli utenti o le entità servizio con READ FILES autorizzazioni per una posizione esterna possono usare il caricatore automatico per inserire i dati.

In genere, i dati arriveranno in una posizione esterna come risultato delle scritture effettuate da altri sistemi. In questa demo è possibile simulare l'arrivo dei dati scrivendo file JSON in un percorso esterno.

Copiare il codice seguente in una cella del notebook. Sostituire il valore stringa per catalog con il nome di un catalogo che abbia le autorizzazioni CREATE CATALOG e USE CATALOG. Sostituire il valore stringa di external_location con il percorso di una posizione esterna con le autorizzazioni di READ FILES, WRITE FILES e CREATE EXTERNAL TABLE.

I percorsi esterni possono essere definiti come un intero contenitore di archiviazione, ma spesso puntano a una directory annidata in un contenitore.

Il formato corretto per un percorso esterno è "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location".


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

L'esecuzione di questa cella deve stampare una riga che legge 12 byte, stampare la stringa "Hello world!" e visualizzare tutti i database presenti nel catalogo fornito. Se non riesci a eseguire questa cella, assicurati di essere in un'area di lavoro con Unity Catalog abilitato e richiedi le autorizzazioni appropriate all'amministratore dell'area di lavoro per completare questo tutorial.

Il codice Python seguente usa l'indirizzo di posta elettronica per creare un database univoco nel catalogo fornito e un percorso di archiviazione univoco nella posizione esterna fornita. L'esecuzione di questa cella rimuoverà tutti i dati associati a questo tutorial, permettendoti di eseguire l'esempio in maniera idempotente. Viene definita e creata un'istanza di una classe che verrà usata per simulare batch di dati in arrivo da un sistema connesso alla posizione esterna di origine.

Copiare questo codice in una nuova cella del notebook ed eseguirlo per configurare l'ambiente.

Nota

Le variabili definite in questo codice devono consentire di eseguirla in modo sicuro senza rischiare conflitti con gli asset dell'area di lavoro esistenti o altri utenti. Le autorizzazioni di rete o archiviazione limitate genereranno errori durante l'esecuzione di questo codice; contattare l'amministratore dell'area di lavoro per risolvere questi problemi.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

È ora possibile inserire un batch di dati copiando il codice seguente in una cella ed eseguendolo. È possibile eseguire manualmente questa cella fino a 60 volte per attivare l'arrivo di nuovi dati.

RawData.land_batch()

Passaggio 4: Configurare il caricatore automatico per inserire dati nel catalogo Unity

Databricks consiglia di archiviare i dati con Delta Lake. Delta Lake è un livello di archiviazione open source che fornisce transazioni ACID e abilita data lakehouse. Delta Lake è il formato predefinito per le tabelle create in Databricks.

Per configurare il caricatore automatico per inserire i dati in una tabella del catalogo Unity, copiare e incollare il codice seguente nella cella vuota del notebook:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(source)
  .select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Per altre informazioni sul caricatore automatico, consultare Che cos'è il caricatore automatico?.

Per altre informazioni sullo streaming strutturato con il catalogo Unity, vedere Uso del catalogo Unity con Structured Streaming.

Passaggio 5: Elaborare e interagire con i dati

I notebook eseguono la logica cella per cella. Seguire questi passaggi per eseguire la logica nella cella:

  1. Per eseguire la cella completata nel passaggio precedente, selezionare la cella e premere MAIUSC+INVIO.

  2. Per eseguire una query sulla tabella appena creata, copiare e incollare il seguente codice in una cella vuota, quindi premere MAIUSC+INVIO per eseguire la cella.

    df = spark.read.table(table)
    
  3. Per visualizzare in anteprima i dati nel dataframe, copiare e incollare il seguente codice in una cella vuota, quindi premere MAIUSC+INVIO per eseguire la cella.

    display(df)
    

Per altre informazioni sulle opzioni interattive per la visualizzazione dei dati, consultare Visualizzazioni nei notebook di Databricks.

Passaggio 6: Pianificare un lavoro

È possibile eseguire i notebook di Databricks come script di produzione aggiungendoli come compito in un job di Databricks. In questo passaggio verrà creato un nuovo processo che è possibile attivare manualmente.

Per pianificare il tuo notebook come attività:

  1. Fare clic su Pianifica sul lato destro della barra di intestazione.
  2. Immettere un nome univoco in Nome del processo.
  3. Fare clic su Manuale.
  4. Nell'elenco a discesa Cluster selezionare il cluster creato nel passaggio 1.
  5. Cliccare su Crea.
  6. Nella finestra visualizzata fare clic su Esegui adesso.
  7. Per visualizzare i risultati dell'esecuzione del processo, fare clic sull'icona Collegamento esterno accanto al timestamp Ultima esecuzione.

Per ulteriori informazioni sui lavori, vedere Che cosa sono i lavori?.

Passaggio 7: Eseguire query sulla tabella da Databricks SQL

Chiunque disponga dell'autorizzazione USE CATALOG per il catalogo corrente, l'autorizzazione USE SCHEMA per lo schema corrente e SELECT le autorizzazioni per la tabella possono eseguire query sul contenuto della tabella dall'API Databricks preferita.

È necessario accedere a un'istanza di SQL Warehouse in esecuzione per eseguire query in Databricks SQL.

La tabella creata in precedenza in questa esercitazione ha il nome target_table. È possibile eseguire una query utilizzando il catalogo fornito nella prima cella e il database con il pattern e2e_lakehouse_<your-username>. È possibile usare Esplora cataloghi per trovare gli oggetti dati creati.

Integrazioni aggiuntive

Altre informazioni sulle integrazioni e sugli strumenti per la progettazione dei dati con Azure Databricks: