Zdieľať cez


Pomocou Livy API odoslať a vykonať Livy dávkové úlohy

Poznámka

Rozhranie API Livy pre službu Fabric Dátový inžinier ing je v režime ukážky.

Vzťahuje sa na:✅ Dátový inžinier a dátovú vedu v službe Microsoft Fabric

Odošlite dávkové úlohy služby Spark pomocou rozhrania API Livy pre službu Fabric Dátový inžinier ing.

Požiadavky

Rozhranie API Livy definuje zjednotený koncový bod pre operácie. Ak budete postupovať podľa príkladov v tomto článku, nahraďte zástupné symboly {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} a {Fabric_LakehouseID}.

Konfigurácia Visual Studio Code pre vašu aplikáciu Livy API Batch

  1. Vyberte položku Nastavenia Lakehouse vo vašom fabric lakehouse.

    Snímka obrazovky zobrazujúca nastavenia Lakehouse.

  2. Prejdite do časti Koncový bod Livy.

    snímka obrazovky znázorňujúca koncový bod Lakehouse Livy a úlohu relácie reťazec pripojenia.

  3. Skopírujte reťazec pripojenia Dávková úloha (druhé červené pole na obrázku) do svojho kódu.

  4. Prejdite do Centra spravovania služby Microsoft Entra a skopírujte DO svojho kódu ID aplikácie (klienta) aj ID adresára (nájomníka).

    Snímka obrazovky znázorňujúca prehľad aplikácie rozhrania API od spoločnosti Microsoft v centre spravovania služby Microsoft Entra.

Vytvorenie údajovej časti služby Spark a nahratie do služby Lakehouse

  1. Vytvorenie poznámkového .ipynb bloku v programe Visual Studio Code a vloženie nasledujúceho kódu

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
         #Spark session builder
         spark_session = (SparkSession
           .builder
           .appName("livybatchdemo") 
           .getOrCreate())
    
         spark_context = spark_session.sparkContext
         spark_context.setLogLevel("DEBUG")  
    
         targetLakehouse = spark_context.getConf().get("spark.targetLakehouse")
    
         if targetLakehouse is not None:
           print("targetLakehouse: " + str(targetLakehouse))
         else:
           print("targetLakehouse is None")
    
    df_valid_totalPrice = spark_session.sql("SELECT * FROM <YourLakeHouseDataTableName>.transactions where TotalPrice > 0")
    df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4))
    
    deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions"
    df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. Súbor jazyka Python uložte lokálne. Táto údajová časť kódu jazyka Python obsahuje dva príkazy Spark, ktoré pracujú s údajmi v lakehouse a musia byť nahraté do vášho lakehouse. Budete potrebovať cestu ABFS údajovej časti na odkazovanie v dávkovej úlohe rozhrania API Livy v programe Visual Studio Code a názov vašej tabuľky Lakehouse v príkaze SQL Select..

    Snímka obrazovky zobrazujúca bunku údajovej časti jazyka Python.

  3. Nahrajte údajovú časť jazyka Python do sekcie súborov v službe Lakehouse. > Získať údaje > Kliknite na položku Súbory > /vstupné pole.

    Snímka obrazovky znázorňujúca údajovú časť v časti Súbory služby Lakehouse.

  4. Keď sa súbor nachádza v časti Súbory vašej služby Lakehouse, kliknite na tri bodky napravo od názvu súboru údajovej časti a vyberte položku Vlastnosti.

    Snímka obrazovky zobrazujúca cestu k údajovej časti ABFS v vlastnostiach súboru v Lakehouse.

  5. Skopírujte túto cestu do notebooku v kroku 1.

Vytvorenie dávkovej relácie Spark rozhrania Livy API

  1. Vytvorte poznámkový .ipynb blok v programe Visual Studio Code a vložte nasledujúci kód.

    
    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "<Entra_TenantID>"
    client_id = "<Entra_ClientID>"
    
    workspace_id = "<Fabric_WorkspaceID>"
    lakehouse_id = "<Fabric_LakehouseID>"
    
    app = PublicClientApplication(
       client_id,
       authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
         result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                    "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                    "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
       print(f"Access token: {result['access_token']}")
    else:
       print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
       access_token = result['access_token']
       api_base_url_mist='https://api.fabric.microsoft.com/v1'
       livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"
       headers = {"Authorization": "Bearer " + access_token}
    
  2. Spustite bunku poznámkového bloku. V prehliadači by sa mala zobraziť kontextová ponuka, ktorá vám umožní vybrať identitu, pomocou ktorú sa chcete prihlásiť.

    Snímka obrazovky znázorňujúca prihlasovaciu obrazovku do aplikácie Microsoft Entra.

  3. Po výbere identity na prihlásenie sa zobrazí sa výzva na schválenie povolení rozhrania API na registráciu aplikácie Microsoft Entra.

    Snímka obrazovky zobrazujúca povolenia rozhrania API aplikácie Microsoft Entra.

  4. Po dokončení overovania zatvorte okno prehliadača.

    Snímka obrazovky znázorňujúca dokončenie overovania.

  5. V programe Visual Studio Code by sa mal zobraziť vrátený token Microsoft Entra.

    Snímka obrazovky zobrazujúca token Microsoft Entra vrátený po spustení bunky a prihlásení.

  6. Pridajte ďalšiu bunku poznámkového bloku a vložte tento kód.

    # call get batch API
    
    get_livy_get_batch = livy_base_url
    get_batch_response = requests.get(get_livy_get_batch, headers=headers)
    if get_batch_response.status_code == 200:
       print("API call successful")
       print(get_batch_response.json())
    else:
       print(f"API call failed with status code: {get_batch_response.status_code}")
       print(get_batch_response.text)
    
  7. Spustenie notebooku bunky, mali by ste vidieť dva riadky vytlačené ako Livy dávkové úlohy.

    Snímka obrazovky zobrazujúca výsledky vytvárania dávkovej relácie.

Odoslanie príkazu spark.sql pomocou dávkovej relácie rozhrania API Livy

  1. Pridajte ďalšiu bunku poznámkového bloku a vložte tento kód.

    # submit payload to existing batch session
    
    print('Submit a spark job via the livy batch API to ') 
    
    newlakehouseName = "YourNewLakehouseName"
    create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items"
    create_lakehouse_payload = {
       "displayName": newlakehouseName,
       "type": 'Lakehouse'
    }
    
    create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload)
    print(create_lakehouse_response.json())
    
    payload_data = {
       "name":"livybatchdemo_with"+ newlakehouseName,
       "file":"abfss://YourABFSPathToYourPayload.py", 
       "conf": {
         "spark.targetLakehouse": "Fabric_LakehouseID"
       }
    }
    get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data)
    
    print("The Livy batch job submitted successful")
    print(get_batch_response.json())
    
  2. Spustenie notebooku bunky, by sa malo zobraziť niekoľko riadkov vytlačených ako úloha Livy Batch je vytvorený a spustený.

    Snímka obrazovky znázorňujúca výsledky v programe Visual Studio Code po úspešnom odoslaní Livy Batch Job.

  3. Vráťte sa späť do svojho jazera a zobrazte zmeny.

Zobrazte svoju prácu v centre monitorovania

Do centra monitoringu sa môžete dostať a zobraziť rôzne aktivity Apache Spark tak, že v ľavej navigačnej prepojenia vyberiete položku Monitorovať.

  1. Po dokončení dávkovej úlohy môžete zobraziť stav relácie prechodom na položku Monitorovať.

    Snímka obrazovky znázorňujúca predchádzajúce odoslania rozhrania API od Livy v centre monitorovania.

  2. Vyberte a otvorte najnovší názov aktivity.

    Snímka obrazovky znázorňujúca najnovšiu aktivitu rozhrania Livy API v centre monitorovanie.

  3. V tomto prípade relácie rozhrania API služby Livy si môžete pozrieť predchádzajúce dávkové odoslanie, podrobnosti o spustení, verzie služby Spark a konfiguráciu. Všimnite si stav zastaveného v pravom hornom rohu.

    Snímka obrazovky znázorňujúca najnovšie podrobnosti o aktivite rozhrania API Livy v centre monitorovania.

Na opakovanie celého procesu potrebujete vzdialeného klienta, ako je napríklad Visual Studio Code, token aplikácie Microsoft Entra, URL adresa koncového bodu rozhrania Livy API, overovanie v službe Lakehouse, údajová časť služby Spark v službe Lakehouse a nakoniec dávková relácia rozhrania API Livy.