Megosztás a következőn keresztül:


Adatok betöltése DLT használatával

Az Azure Databricksben az Apache Spark által támogatott bármely adatforrásból betölthet adatokat a DLT használatával. A DLT-ben adatkészleteket (táblákat és nézeteket) definiálhat bármely olyan lekérdezéshez, amely Spark DataFrame-et ad vissza, beleértve a Stream DataFrame-eket és a Pandas for Spark DataFrames-et. Az adatbetöltési feladatokhoz a Databricks a streamelési táblák használatát javasolja a legtöbb használati esetben. A streamelő táblák alkalmasak arra, hogy adatokat töltsenek be a felhőbeli objektumtárolóból az Automatikus betöltővel vagy az üzenetbuszokból, például a Kafkából. Az alábbi példák néhány gyakori mintát mutatnak be.

Fontos

Nem minden adatforrás rendelkezik SQL-támogatással. A DLT-folyamatban lévő SQL- és Python-jegyzetfüzeteket kombinálva az SQL-t a betöltésen túl minden művelethez használhatja.

A DLT-ben alapértelmezés szerint nem csomagolt kódtárak kezeléséről további információt a Python-függőségek kezelése dLT-folyamatokhozcímű témakörben talál.

Fájlok betöltése felhőalapú objektumtárolóból

A Databricks az Automatikus betöltő és a DLT használatát javasolja a legtöbb adatbetöltési feladathoz a felhőobjektum-tárolóból. Az automatikus betöltő és a DLT úgy lett kialakítva, hogy fokozatosan és idempotensen betöltse az egyre növekvő adatokat a felhőbeli tárolóba érkezve. Az alábbi példák az Automatikus betöltő használatával hoznak létre adatkészleteket CSV- és JSON-fájlokból:

Jegyzet

Ha egy Unity Catalog-alapú folyamatban az Auto Loaderrel szeretne fájlokat betölteni, külső helyeket kell használnia. A Unity Catalog DLT-sel való használatáról további információt A Unity katalógus használata DLT-folyamatokkalcímű témakörben talál.

Piton

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

Lásd Mi az automatikus betöltő? és Automatikus betöltő SQL-szintaxis.

Figyelmeztetés

Ha az Automatikus betöltőt fájlértesítésekkel használja, és teljes frissítést futtat a folyamathoz vagy a streamelési táblához, manuálisan kell törölnie az erőforrásokat. A CloudFilesResourceManager használatával végezheti el a törlést.

Adatok betöltése üzenetbuszból

A DLT adattovábbító rendszereket úgy konfigurálhatja, hogy streamelő táblákkal befogadja az üzenetbuszokból származó adatokat. A Databricks azt javasolja, hogy a streamelési táblákat a folyamatos végrehajtással és a továbbfejlesztett automatikus skálázással kombinálja, hogy a leghatékonyabb betöltést biztosítsa az üzenetbuszokról érkező alacsony késésű betöltéshez. Lásd: A DLT-folyamatok fürtkihasználtságának optimalizálását az automatikus skálázás továbbfejlesztésével.

A következő kód például konfigurál egy streamelési táblát a Kafkából származó adatok betöltéséhez:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Az alsóbb rétegbeli műveleteket tiszta SQL-ben írhatja, hogy streamelési átalakításokat hajtson végre ezen az adatokon, ahogyan az alábbi példában is látható:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(kafka_raw)
WHERE ...

Az Event Hubs használatára példa: Az Azure Event Hubs használata DLT-adatforrásként.

Lásd: Streamelési adatforrások konfigurálása.

Adatok betöltése külső rendszerekből

A DLT támogatja az adatok betöltését az Azure Databricks által támogatott adatforrásokból. Lásd: Csatlakozás adatforrásokhoz. Külső adatokat is betölthet a Lakehouse Federation használatával támogatott adatforrásokhoz. Mivel a Lakehouse Federation használatához a Databricks Runtime 13.3 LTS-es vagy újabb verziója szükséges, a felhasználáshoz a folyamatot úgy kell konfigurálni, hogy a előzetes csatornátalkalmazza.

Egyes adatforrások nem rendelkeznek egyenértékű támogatással az SQL-ben. Ha nem tudja használni a Lakehouse Federationt ezen adatforrások egyikével, egy Python-jegyzetfüzettel betöltheti az adatokat a forrásból. Python- és SQL-forráskódot is hozzáadhat ugyanahhoz a DLT-folyamathoz. Az alábbi példa egy materializált nézetet deklarál egy távoli PostgreSQL-táblában lévő adatok aktuális állapotának eléréséhez:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Kis méretű vagy statikus adathalmazok betöltése a felhőbeli objektumtárolóból

Kis méretű vagy statikus adathalmazokat az Apache Spark betöltési szintaxisával tölthet be. A DLT az Apache Spark által az Azure Databricksen támogatott összes fájlformátumot támogatja. A teljes listát az Adatformátum beállításaicímű témakörben találja.

Az alábbi példák bemutatják a JSON betöltését DLT-táblák létrehozásához.

Piton

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Jegyzet

Az SELECT * FROM format.`path`; SQL-szerkezet az Azure Databricks összes SQL-környezetében gyakori. Ez a DLT-vel rendelkező SQL használatával történő közvetlen fájlhozzáférés ajánlott mintája.

Tároló hitelesítő adatainak biztonságos elérése titkos kulcsokkal egy folyamatban

Az Azure Databricks titkos kulcsokat használhatja hitelesítő adatok, például hozzáférési kulcsok vagy jelszavak tárolására. A folyamat titkos kódjának konfigurálásához használja a Spark tulajdonságot a folyamatbeállítások fürtkonfigurációjában. Lásd: Számítás konfigurálása DLT-folyamathoz.

Az alábbi példa titkos kóddal tárolja a bemeneti adatok azure Data Lake Storage Gen2 (ADLS Gen2) tárfiókból való olvasásához szükséges hozzáférési kulcsot az Automatikus betöltőhasználatával. Ugyanezzel a módszerrel konfigurálhatja a folyamathoz szükséges titkos kulcsokat, például az AWS-kulcsokat az S3 eléréséhez, vagy egy Apache Hive-metaadattár jelszavát.

Az Azure Data Lake Storage Gen2 használatával kapcsolatos további információkért lásd: Csatlakozás az Azure Data Lake Storage Gen2-hez és a Blob Storage-.

Jegyzet

A titkos kulcs értékét beállító spark_conf konfigurációs kulcshoz hozzá kell adnia a spark.hadoop. előtagot.

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
      }
    }
  ],
  "name": "DLT quickstart using ADLS2"
}

Helyettesít

  • <storage-account-name> az ADLS Gen2 tárfiók nevével.
  • <scope-name> az Azure Databricks titkos hatókörének nevével.
  • <secret-name> az Azure Storage-fiók hozzáférési kulcsát tartalmazó kulcs nevével.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Helyettesít

  • <container-name> a bemeneti adatokat tároló Azure Storage-fiók tárolójának nevével.
  • <storage-account-name> az ADLS Gen2 tárfiók nevével.
  • <path-to-input-dataset> a bemeneti adathalmaz elérési útvonalával.

Adatok betöltése az Azure Event Hubsból

Az Azure Event Hubs egy adatfolyam-szolgáltatás, amely Apache Kafka-kompatibilis felületet biztosít. A DLT-futtatókörnyezetben található strukturált streamelési Kafka-összekötő használatával betöltheti az Azure Event Hubsból érkező üzeneteket. Az Azure Event Hubsból érkező üzenetek betöltéséről és feldolgozásáról további információt az Az Azure Event Hubs használata DLT-adatforráskéntcímű témakörben talál.