Zdieľať cez


Transformácia údajov pomocou databázy

Poznámka

Úlohu apache airflow poháňa apache airflow.

dbt(Data Build Tool) je open-source rozhranie príkazového riadka (CLI), ktoré zjednodušuje transformáciu a modelovanie údajov v skladoch údajov spravovaním zložitého kódu SQL štruktúrovaným a spravovateľným spôsobom. Umožňuje dátovým tímom vytvárať spoľahlivé a testovateľné transformácie v jadre ich analytických kanálov.

Po spárovaní s apache airflow sú možnosti transformácie dbt vylepšené plánovaním, orchestráciou a správou úloh toku airflow. Tento kombinovaný prístup, ktorý využíva odborné znalosti spoločnosti DBT týkajúce sa transformácie spolu so správou pracovných postupov toku vzduchu, poskytuje efektívne a robustné údajové kanály, čo v konečnom dôsledku vedie k rýchlejším a prehľadnejším rozhodnutiam založeným na údajoch.

Tento kurz ukazuje, ako vytvoriť DAG toku údajov Apache Airflow, ktorý používa dbt na transformáciu údajov uložených v sklade údajov služby Microsoft Fabric.

Požiadavky

Na začiatok musíte splniť nasledujúce predpoklady:

  • Povoľte úlohu toku vzduchu Apache v nájomníkovi.

    Poznámka

    Keďže úloha Apache Airflow je v stave ukážky, musíte ju povoliť prostredníctvom správcu nájomníka. Ak sa vám už zobrazuje funkcia Apache Airflow Job, správca nájomníka ju už pravdepodobne povolil.

    1. Prejdite na portál na správu –> Nastavenia nájomníka –> v časti Microsoft Fabric –> rozbaľte časť Používatelia môžu vytvárať a používať úlohu toku vzduchu Apache (preview)".

    2. Vyberte Použiť. Snímka obrazovky na povolenie toku údajov Apache Airflow v nájomníkovi.

  • Vytvorte objekt služby. Pridajte objekt služby ako Contributor objekt v pracovnom priestore, v ktorom vytvoríte sklad údajov.

  • Ak ho nemáte, vytvorte sklad služby Fabric. Preniesť vzorové údaje do skladu pomocou kanála údajov. V tomto kurze použijeme vzorku NYC Taxi-Green .

  • Vytvorte úlohu "Apache Airflow Job" v pracovnom priestore.

Transformácia údajov uložených v sklade služby Fabric pomocou databázy

Táto časť vás prevedie nasledujúcimi krokmi:

  1. Zadajte požiadavky.
  2. Vytvorte projekt dbt v úložisku Spravované službou Fabric poskytovanom úlohou Apache Airflow..
  3. Vytvorenie jazyka DAG pre apache airflow na organizovanie úloh databázy

Špecifikovanie požiadaviek

Vytvorte súbor requirements.txt v priečinku dags . Pridajte nasledujúce balíky ako požiadavky na tok údajov Apache.

  • cosmos: Tento balík sa používa na spustenie vašich základných projektov v službe Apache Airflow DAGS a na pracovnú skupinu.

  • dbt-fabric: Tento balík sa používa na vytvorenie projektu dbt, ktorý sa potom môže nasadiť do skladu údajov služby Fabric

       astronomer-cosmos==1.0.3
       dbt-fabric==1.5.0
    

Vytvorte projekt dbt v úložisku Fabric spravovanom službou, ktorú poskytuje úloha Apache Airflow.

  1. V tejto časti vytvoríme ukážkový projekt dbt v úlohe toku údajov Apache Airflow pre množinu nyc_taxi_green údajov s nasledujúcou štruktúrou adresára.

       dags
       |-- my_cosmos_dag.py
       |-- nyc_taxi_green
       |  |-- profiles.yml
       |  |-- dbt_project.yml
       |  |-- models
       |  |   |-- nyc_trip_count.sql
       |  |-- target
    
  2. Vytvorte priečinok s názvom nyc_taxi_green v priečinku dags so súborom profiles.yml . Tento priečinok obsahuje všetky súbory potrebné pre projekt dbt. Snímka obrazovky znázorňuje vytvorenie súborov pre projekt dbt.

  3. Skopírujte nasledujúci obsah do súboru profiles.yml. Tento konfiguračný súbor obsahuje podrobnosti o pripojení databázy a profily používané databázou dbt. Aktualizujte hodnoty zástupného symbolu a uložte súbor.

    config:
      partial_parse: true
    nyc_taxi_green:
      target: fabric-dev
      outputs:
        fabric-dev:
          type: fabric
          driver: "ODBC Driver 18 for SQL Server"
          server: <sql connection string of your data warehouse>
          port: 1433
          database: "<name of the database>"
          schema: dbo
          threads: 4
          authentication: ServicePrincipal
          tenant_id: <Tenant ID of your service principal>
          client_id: <Client ID of your service principal>
          client_secret: <Client Secret of your service principal>
    
  4. Vytvorte dbt_project.yml súbor a skopírujte nasledujúci obsah. Tento súbor určuje konfiguráciu na úrovni projektu.

    name: "nyc_taxi_green"
    
    config-version: 2
    version: "0.1"
    
    profile: "nyc_taxi_green"
    
    model-paths: ["models"]
    seed-paths: ["seeds"]
    test-paths: ["tests"]
    analysis-paths: ["analysis"]
    macro-paths: ["macros"]
    
    target-path: "target"
    clean-targets:
      - "target"
      - "dbt_modules"
      - "logs"
    
    require-dbt-version: [">=1.0.0", "<2.0.0"]
    
    models:
      nyc_taxi_green:
        materialized: table
    
  5. Vytvorte models priečinok v priečinku nyc_taxi_green . V tomto kurze vytvoríme v súbore s názvom nyc_trip_count.sql vzorový model, ktorý vytvorí tabuľku zobrazujúcu počet ciest za deň na každého dodávateľa. Skopírujte do súboru nasledujúci obsah.

       with new_york_taxis as (
           select * from nyctlc
       ),
       final as (
         SELECT
           vendorID,
           CAST(lpepPickupDatetime AS DATE) AS trip_date,
           COUNT(*) AS trip_count
         FROM
             [contoso-data-warehouse].[dbo].[nyctlc]
         GROUP BY
             vendorID,
             CAST(lpepPickupDatetime AS DATE)
         ORDER BY
             vendorID,
             trip_date;
       )
       select * from final
    

    Snímka obrazovky znázorňuje modely pre projekt dbt.

Vytvorenie jazyka DAG pre apache airflow na organizovanie úloh databázy

  • Vytvorte súbor s názvom my_cosmos_dag.py v dags priečinku a prilepte doň nasledujúci obsah.

    import os
    from pathlib import Path
    from datetime import datetime
    from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
    
    DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dags" / "nyc_taxi_green"
    DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
    profile_config = ProfileConfig(
         profile_name="nyc_taxi_green",
         target_name="fabric-dev",
         profiles_yml_filepath=DBT_ROOT_PATH / "profiles.yml",
    )
    
    dbt_fabric_dag = DbtDag(
         project_config=ProjectConfig(DBT_ROOT_PATH,),
         operator_args={"install_deps": True},
         profile_config=profile_config,
         schedule_interval="@daily",
         start_date=datetime(2023, 9, 10),
         catchup=False,
         dag_id="dbt_fabric_dag",
    )
    

Spustenie jazyka DAG

  1. Spustite DAG v službe Apache Airflow Job. Snímka obrazovky znázorňujúca spustenie jazyka DAG.

  2. Ak chcete zobraziť svoj dag načítaný v používateľskom rozhraní toku údajov Apache Airflow, kliknite na Monitor in Apache Airflow.Snímka obrazovky znázorňujúca, ako monitorovať jazyk DAG dbt.Snímka obrazovky znázorňujúca úspešné spustenie jazyka DAG.

Overenie údajov

  • Po úspešnom spustení na overenie údajov sa v sklade údajov služby Fabric zobrazí nová tabuľka s názvom nyc_trip_count.sql. Snímka obrazovky znázorňujúca úspešné dbt dag.

Rýchla konfigurácia: Vytvorenie úlohy v toku vzduchu Apache