Livy-kötegelt feladatok elküldése és végrehajtása a Livy API használatával
Feljegyzés
A Livy API for Fabric adatmérnök ing előzetes verzióban érhető el.
A következőkre vonatkozik:✅ adatmérnök és Adattudomány a Microsoft Fabricben
Spark-kötegelt feladatok elküldése a Livy API for Fabric adatmérnök ing használatával.
Előfeltételek
Fabric Premium vagy Trial kapacitás egy Lakehouse-nal.
Egy távoli ügyfél, például a Visual Studio Code jupyter notebookokkal, a PySparkkal és a PythonHoz készült Microsoft Authentication Library (MSAL) használatával.
A Fabric Rest API eléréséhez Microsoft Entra-alkalmazásjogkivonat szükséges. Alkalmazás regisztrálása a Microsoft Identitásplatform.
Néhány adat a tóházban, ez a példa a NYC Taxi &Limuzin Bizottság green_tripdata_2022_08 egy parkettafájlt, amely a tóházba van betöltve.
A Livy API egységes végpontot határoz meg a műveletekhez. Cserélje le a(z) {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} és {Fabric_LakehouseID} helyőrzőket a megfelelő értékekre, amikor a cikkben szereplő példákat követi.
A Visual Studio Code konfigurálása a Livy API Batchhez
Válassza a Lakehouse beállításai lehetőséget a Fabric Lakehouse-ban.
Lépjen a Livy végpontszakaszra .
Másolja a Batch-feladat kapcsolati sztring (a kép második piros mezőjét) a kódba.
Lépjen a Microsoft Entra Felügyeleti központba , és másolja az alkalmazás (ügyfél) és a címtár (bérlő) azonosítóját a kódba.
Spark-hasznos adatok létrehozása és feltöltés a Lakehouse-ba
.ipynb
Jegyzetfüzet létrehozása a Visual Studio Code-ban, és szúrja be a következő kódotimport sys import os from pyspark.sql import SparkSession from pyspark.conf import SparkConf from pyspark.sql.functions import col if __name__ == "__main__": #Spark session builder spark_session = (SparkSession .builder .appName("livybatchdemo") .getOrCreate()) spark_context = spark_session.sparkContext spark_context.setLogLevel("DEBUG") targetLakehouse = spark_context.getConf().get("spark.targetLakehouse") if targetLakehouse is not None: print("targetLakehouse: " + str(targetLakehouse)) else: print("targetLakehouse is None") df_valid_totalPrice = spark_session.sql("SELECT * FROM <YourLakeHouseDataTableName>.transactions where TotalPrice > 0") df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4)) deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions" df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
Mentse a Python-fájlt helyileg. Ez a Python-kód hasznos adata két Spark-utasítást tartalmaz, amelyek egy Lakehouse-beli adatokon működnek, és fel kell tölteni a Lakehouse-ba. Szüksége lesz az adatterhelés ABFS-útvonalára, hogy hivatkozhasson rá a Visual Studio Code Livy API kötegelt feladatában, valamint a Lakehouse-tábla nevére a Select SQL utasításban.
Töltse fel a Python hasznos adatait a Lakehouse fájlszakaszára. > > Adatok lekérése Fájlok > feltöltése gombra kattintva a Fájlok/ beviteli mezőben.
Miután a fájl a Lakehouse Fájlok szakaszában található, kattintson a hasznos adatfájlnév jobb oldalán található három pontra, és válassza a Tulajdonságok lehetőséget.
Másolja ezt az ABFS-elérési utat a jegyzetfüzetcellába az 1. lépésben.
Livy API Spark-köteg-munkamenet létrehozása
Hozzon létre egy jegyzetfüzetet
.ipynb
a Visual Studio Code-ban, és szúrja be a következő kódot.from msal import PublicClientApplication import requests import time tenant_id = "<Entra_TenantID>" client_id = "<Entra_ClientID>" workspace_id = "<Fabric_WorkspaceID>" lakehouse_id = "<Fabric_LakehouseID>" app = PublicClientApplication( client_id, authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url_mist='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches" headers = {"Authorization": "Bearer " + access_token}
Futtassa a jegyzetfüzetcellát, egy előugró ablaknak kell megjelennie a böngészőben, amellyel kiválaszthatja a bejelentkezéshez használt identitást.
Miután kiválasztotta a bejelentkezéshez szükséges identitást, a rendszer kérni fogja a Microsoft Entra alkalmazásregisztrációs API-engedélyeinek jóváhagyását is.
A hitelesítés befejezése után zárja be a böngészőablakot.
A Visual Studio Code-ban a Microsoft Entra-jogkivonatnak kell megjelennie.
Adjon hozzá egy másik jegyzetfüzetcellát, és szúrja be ezt a kódot.
# call get batch API get_livy_get_batch = livy_base_url get_batch_response = requests.get(get_livy_get_batch, headers=headers) if get_batch_response.status_code == 200: print("API call successful") print(get_batch_response.json()) else: print(f"API call failed with status code: {get_batch_response.status_code}") print(get_batch_response.text)
Futtassa a jegyzetfüzetcellát, a Livy-kötegelt feladat létrehozásakor két sornak kell megjelennie.
Spark.sql utasítás elküldése a Livy API kötegelt munkamenetével
Adjon hozzá egy másik jegyzetfüzetcellát, és szúrja be ezt a kódot.
# submit payload to existing batch session print('Submit a spark job via the livy batch API to ') newlakehouseName = "YourNewLakehouseName" create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items" create_lakehouse_payload = { "displayName": newlakehouseName, "type": 'Lakehouse' } create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload) print(create_lakehouse_response.json()) payload_data = { "name":"livybatchdemo_with"+ newlakehouseName, "file":"abfss://YourABFSPathToYourPayload.py", "conf": { "spark.targetLakehouse": "Fabric_LakehouseID" } } get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data) print("The Livy batch job submitted successful") print(get_batch_response.json())
Futtassa a jegyzetfüzetcellát, a Livy Batch-feladat létrehozásakor és futtatásakor több sornak kell megjelennie.
Lépjen vissza a Lakehouse-hoz a módosítások megtekintéséhez.
Feladatok megtekintése a Monitorozási központban
A monitorozási központhoz a bal oldali navigációs hivatkozások Figyelő elemével tekintheti meg a különböző Apache Spark-tevékenységeket.
Amikor a kötegelt feladat befejeződött, megtekintheti a munkamenet állapotát a Monitorra lépve.
Válassza ki és nyissa meg a legutóbbi tevékenységnevet.
Ebben a Livy API-munkamenet esetében láthatja az előző kötegküldést, a futtatási részleteket, a Spark-verziókat és a konfigurációt. Figyelje meg a leállított állapotot a jobb felső sarokban.
A teljes folyamat áttekintéséhez szüksége lesz egy távoli ügyfélre, például a Visual Studio Code-ra, a Microsoft Entra alkalmazás jogkivonatára, a Livy API végpontJÁNAK URL-címére, a Lakehouse-beli hitelesítésre, egy Spark hasznos adatra a Lakehouse-ban, és végül egy kötegelt Livy API-munkamenetre.