Condividi tramite


Caricare dati con DLT

È possibile caricare dati da qualsiasi origine dati supportata da Apache Spark in Azure Databricks usando DLT. È possibile definire set di dati (tabelle e viste) in DLT su qualsiasi query che restituisca un dataframe Spark, inclusi i dataframe di streaming e Pandas per i dataframe Spark. Per le attività di inserimento dati, Databricks consiglia di usare le tabelle di streaming per la maggior parte dei casi d'uso. Le tabelle di streaming sono valide per l'inserimento di dati dall'archiviazione di oggetti cloud tramite il caricatore automatico o da bus di messaggi come Kafka. Gli esempi seguenti illustrano alcuni modelli comuni.

Importante

Non tutte le origini dati supportano SQL. È possibile combinare notebook SQL e Python in una pipeline DLT per usare SQL per tutte le operazioni oltre l'inserimento.

Per informazioni dettagliate sull'uso delle librerie non incluse in DLT per impostazione predefinita, vedere Gestire le dipendenze Python per le pipeline DLT.

Caricare file dall'archiviazione di oggetti cloud

Databricks consiglia di usare il caricatore automatico con DLT per la maggior parte delle attività di inserimento dati dall'archiviazione di oggetti cloud. Il caricatore automatico e DLT sono progettati per caricare in modo incrementale e idempotente i dati in continua crescita man mano che arrivano nell'archiviazione cloud. Gli esempi seguenti usano Il caricatore automatico per creare set di dati da file CSV e JSON:

Nota

Per caricare i file con Auto Loader in una pipeline abilitata per Unity Catalog, è necessario usare percorsi esterni. Per altre informazioni sull'uso di Unity Catalog con DLT, vedere Usare il catalogo Unity con le pipeline DLT.

Pitone

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

Vedi Che cos'è Auto Loader? e sintassi SQL di Auto Loader.

Avvertimento

Se si usa Il caricatore automatico con le notifiche dei file ed è necessario eseguire un aggiornamento completo per la pipeline o la tabella di streaming, è necessario pulire manualmente le risorse. È possibile usare il CloudFilesResourceManager in un notebook per eseguire la pulizia.

Caricare dati da un bus di messaggi

È possibile configurare le pipeline DLT per acquisire dati dai bus di messaggi con tabelle di streaming. Databricks consiglia di combinare tabelle di streaming con l'esecuzione continua e la scalabilità automatica avanzata per fornire l'inserimento più efficiente per il caricamento a bassa latenza dai bus di messaggi. Consulta Ottimizza l'utilizzo del cluster delle pipeline DLT con scalabilità automatizzata avanzata.

Ad esempio, il codice seguente configura una tabella di streaming per inserire dati da Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

È possibile scrivere operazioni downstream in SQL puro per eseguire trasformazioni di streaming su questi dati, come nell'esempio seguente:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(kafka_raw)
WHERE ...

Per un esempio di utilizzo di Hub eventi, vedere Usare Hub eventi di Azure come origine dati DLT.

Vedere Configurare le origini dati di streaming.

Caricare dati da sistemi esterni

DLT supporta il caricamento di dati da qualsiasi origine dati supportata da Azure Databricks. Vedere Connettersi alle origini dati. È possibile anche caricare dati esterni utilizzando Lakehouse Federation per le origini dati supportate . Poiché Lakehouse Federation richiede Databricks Runtime 13.3 LTS o versione successiva, per usare Lakehouse Federation la pipeline deve essere configurata per usare il canale di anteprima .

Alcune origini dati non hanno supporto equivalente in SQL. Se non è possibile usare Lakehouse Federation con una di queste origini dati, è possibile usare un notebook Python per inserire dati dall'origine. È possibile aggiungere codice sorgente Python e SQL alla stessa pipeline DLT. L'esempio seguente dichiara una vista materializzata per accedere allo stato corrente dei dati in una tabella PostgreSQL remota:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Caricare set di dati statici o di piccole dimensioni dall'archiviazione di oggetti cloud

È possibile caricare set di dati statici o di piccole dimensioni usando la sintassi di caricamento di Apache Spark. DLT supporta tutti i formati di file supportati da Apache Spark in Azure Databricks. Per un elenco completo, vedere le opzioni di formato dati .

Gli esempi seguenti illustrano il caricamento di JSON per creare tabelle DLT:

Pitone

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Nota

Il costrutto SQL SELECT * FROM format.`path`; è comune a tutti gli ambienti SQL in Azure Databricks. È il modello consigliato per l'accesso diretto ai file usando SQL con DLT.

accedere in modo sicuro alle credenziali di archiviazione con chiavi segrete in una pipeline

È possibile usare Azure Databricks segreti per archiviare credenziali come chiavi di accesso o password. Per configurare il segreto nella pipeline, usare una proprietà Spark nella configurazione del cluster delle impostazioni della pipeline. Vedere Configurare il calcolo per una pipeline DLT.

L'esempio seguente usa un segreto per archiviare una chiave di accesso necessaria per leggere i dati di input da un account di archiviazione di Azure Data Lake Storage Gen2 (ADLS Gen2) usando caricatore automatico. È possibile usare questo stesso metodo per configurare qualsiasi segreto richiesto dalla pipeline, ad esempio le chiavi AWS per accedere a S3 o la password a un metastore Apache Hive.

Per altre informazioni sull'uso di Azure Data Lake Storage Gen2, vedere Connettersi ad Azure Data Lake Storage Gen2 e Archiviazione BLOB.

Nota

È necessario aggiungere il prefisso spark.hadoop. alla chiave di configurazione spark_conf che imposta il valore del segreto.

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
      }
    }
  ],
  "name": "DLT quickstart using ADLS2"
}

Sostituire

  • <storage-account-name> con il nome dell'account di archiviazione di ADLS Gen2.
  • <scope-name> con il nome dello scope segreto di Azure Databricks.
  • <secret-name> con il nome della chiave contenente la chiave di accesso dell'account di archiviazione di Azure.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Sostituire

  • <container-name> con il nome del contenitore dell'account di archiviazione di Azure in cui sono archiviati i dati di input.
  • Sostituire <storage-account-name> con il nome dell'account di archiviazione ADLS Gen2.
  • <path-to-input-dataset> con il percorso del set di dati di input.

Caricare dati da Hub eventi di Azure

Hub eventi di Azure è un servizio di streaming di dati che fornisce un'interfaccia compatibile con Apache Kafka. È possibile usare il connettore Structured Streaming Kafka, incluso nel runtime DLT, per caricare i messaggi da Hub eventi di Azure. Per altre informazioni sul caricamento e l'elaborazione dei messaggi da Hub eventi di Azure, vedere Usare Hub eventi di Azure come origine dati DLT.