Pomocou Livy API odoslať a vykonať Livy dávkové úlohy
Poznámka
Rozhranie API Livy pre službu Fabric Dátový inžinier ing je v režime ukážky.
Vzťahuje sa na:✅ Dátový inžinier a dátovú vedu v službe Microsoft Fabric
Odošlite dávkové úlohy služby Spark pomocou rozhrania API Livy pre službu Fabric Dátový inžinier ing.
Požiadavky
Kapacita Fabric Premium alebo skúšobná verzia s budovou Lakehouse.
Vzdialený klient, ako napríklad Visual Studio Code s poznámkovými blokmi Jupyter, PySpark a knižnicou overenia spoločnosti Microsoft (MSAL) pre jazyk Python.
Na prístup k rozhraniu Rest API služby Fabric sa vyžaduje token aplikácie Microsoft Entra. Zaregistrujte aplikáciu pomocou platformy microsoft identity.
Niektoré údaje vo vašom lakehouse, tento príklad používa NYC Taxi & Limuzína komisie green_tripdata_2022_08 parketový súbor načítaný do jazera.
Rozhranie API Livy definuje zjednotený koncový bod pre operácie. Ak budete postupovať podľa príkladov v tomto článku, nahraďte zástupné symboly {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} a {Fabric_LakehouseID}.
Konfigurácia Visual Studio Code pre vašu aplikáciu Livy API Batch
Vyberte položku Nastavenia Lakehouse vo vašom fabric lakehouse.
Prejdite do časti Koncový bod Livy.
Skopírujte reťazec pripojenia Dávková úloha (druhé červené pole na obrázku) do svojho kódu.
Prejdite do Centra spravovania služby Microsoft Entra a skopírujte DO svojho kódu ID aplikácie (klienta) aj ID adresára (nájomníka).
Vytvorenie údajovej časti služby Spark a nahratie do služby Lakehouse
Vytvorenie poznámkového
.ipynb
bloku v programe Visual Studio Code a vloženie nasledujúceho kóduimport sys import os from pyspark.sql import SparkSession from pyspark.conf import SparkConf from pyspark.sql.functions import col if __name__ == "__main__": #Spark session builder spark_session = (SparkSession .builder .appName("livybatchdemo") .getOrCreate()) spark_context = spark_session.sparkContext spark_context.setLogLevel("DEBUG") targetLakehouse = spark_context.getConf().get("spark.targetLakehouse") if targetLakehouse is not None: print("targetLakehouse: " + str(targetLakehouse)) else: print("targetLakehouse is None") df_valid_totalPrice = spark_session.sql("SELECT * FROM <YourLakeHouseDataTableName>.transactions where TotalPrice > 0") df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4)) deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions" df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
Súbor jazyka Python uložte lokálne. Táto údajová časť kódu jazyka Python obsahuje dva príkazy Spark, ktoré pracujú s údajmi v lakehouse a musia byť nahraté do vášho lakehouse. Budete potrebovať cestu ABFS údajovej časti na odkazovanie v dávkovej úlohe rozhrania API Livy v programe Visual Studio Code a názov vašej tabuľky Lakehouse v príkaze SQL Select..
Nahrajte údajovú časť jazyka Python do sekcie súborov v službe Lakehouse. > Získať údaje > Kliknite na položku Súbory > /vstupné pole.
Keď sa súbor nachádza v časti Súbory vašej služby Lakehouse, kliknite na tri bodky napravo od názvu súboru údajovej časti a vyberte položku Vlastnosti.
Skopírujte túto cestu do notebooku v kroku 1.
Vytvorenie dávkovej relácie Spark rozhrania Livy API
Vytvorte poznámkový
.ipynb
blok v programe Visual Studio Code a vložte nasledujúci kód.from msal import PublicClientApplication import requests import time tenant_id = "<Entra_TenantID>" client_id = "<Entra_ClientID>" workspace_id = "<Fabric_WorkspaceID>" lakehouse_id = "<Fabric_LakehouseID>" app = PublicClientApplication( client_id, authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url_mist='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches" headers = {"Authorization": "Bearer " + access_token}
Spustite bunku poznámkového bloku. V prehliadači by sa mala zobraziť kontextová ponuka, ktorá vám umožní vybrať identitu, pomocou ktorú sa chcete prihlásiť.
Po výbere identity na prihlásenie sa zobrazí sa výzva na schválenie povolení rozhrania API na registráciu aplikácie Microsoft Entra.
Po dokončení overovania zatvorte okno prehliadača.
V programe Visual Studio Code by sa mal zobraziť vrátený token Microsoft Entra.
Pridajte ďalšiu bunku poznámkového bloku a vložte tento kód.
# call get batch API get_livy_get_batch = livy_base_url get_batch_response = requests.get(get_livy_get_batch, headers=headers) if get_batch_response.status_code == 200: print("API call successful") print(get_batch_response.json()) else: print(f"API call failed with status code: {get_batch_response.status_code}") print(get_batch_response.text)
Spustenie notebooku bunky, mali by ste vidieť dva riadky vytlačené ako Livy dávkové úlohy.
Odoslanie príkazu spark.sql pomocou dávkovej relácie rozhrania API Livy
Pridajte ďalšiu bunku poznámkového bloku a vložte tento kód.
# submit payload to existing batch session print('Submit a spark job via the livy batch API to ') newlakehouseName = "YourNewLakehouseName" create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items" create_lakehouse_payload = { "displayName": newlakehouseName, "type": 'Lakehouse' } create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload) print(create_lakehouse_response.json()) payload_data = { "name":"livybatchdemo_with"+ newlakehouseName, "file":"abfss://YourABFSPathToYourPayload.py", "conf": { "spark.targetLakehouse": "Fabric_LakehouseID" } } get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data) print("The Livy batch job submitted successful") print(get_batch_response.json())
Spustenie notebooku bunky, by sa malo zobraziť niekoľko riadkov vytlačených ako úloha Livy Batch je vytvorený a spustený.
Vráťte sa späť do svojho jazera a zobrazte zmeny.
Zobrazte svoju prácu v centre monitorovania
Do centra monitoringu sa môžete dostať a zobraziť rôzne aktivity Apache Spark tak, že v ľavej navigačnej prepojenia vyberiete položku Monitorovať.
Po dokončení dávkovej úlohy môžete zobraziť stav relácie prechodom na položku Monitorovať.
Vyberte a otvorte najnovší názov aktivity.
V tomto prípade relácie rozhrania API služby Livy si môžete pozrieť predchádzajúce dávkové odoslanie, podrobnosti o spustení, verzie služby Spark a konfiguráciu. Všimnite si stav zastaveného v pravom hornom rohu.
Na opakovanie celého procesu potrebujete vzdialeného klienta, ako je napríklad Visual Studio Code, token aplikácie Microsoft Entra, URL adresa koncového bodu rozhrania Livy API, overovanie v službe Lakehouse, údajová časť služby Spark v službe Lakehouse a nakoniec dávková relácia rozhrania API Livy.