Megosztás a következőn keresztül:


Livy-kötegelt feladatok elküldése és végrehajtása a Livy API használatával

Feljegyzés

A Livy API for Fabric adatmérnök ing előzetes verzióban érhető el.

A következőkre vonatkozik:✅ adatmérnök és Adattudomány a Microsoft Fabricben

Spark-kötegelt feladatok elküldése a Livy API for Fabric adatmérnök ing használatával.

Előfeltételek

A Livy API egységes végpontot határoz meg a műveletekhez. Cserélje le a(z) {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} és {Fabric_LakehouseID} helyőrzőket a megfelelő értékekre, amikor a cikkben szereplő példákat követi.

A Visual Studio Code konfigurálása a Livy API Batchhez

  1. Válassza a Lakehouse beállításai lehetőséget a Fabric Lakehouse-ban.

    Képernyőkép a Lakehouse beállításairól.

  2. Lépjen a Livy végpontszakaszra .

    képernyőkép a Lakehouse Livy végpontról és a munkamenet-feladat kapcsolati sztring.

  3. Másolja a Batch-feladat kapcsolati sztring (a kép második piros mezőjét) a kódba.

  4. Lépjen a Microsoft Entra Felügyeleti központba , és másolja az alkalmazás (ügyfél) és a címtár (bérlő) azonosítóját a kódba.

    Képernyőkép a Livy API-alkalmazás áttekintéséről a Microsoft Entra felügyeleti központban.

Spark-hasznos adatok létrehozása és feltöltés a Lakehouse-ba

  1. .ipynb Jegyzetfüzet létrehozása a Visual Studio Code-ban, és szúrja be a következő kódot

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
         #Spark session builder
         spark_session = (SparkSession
           .builder
           .appName("livybatchdemo") 
           .getOrCreate())
    
         spark_context = spark_session.sparkContext
         spark_context.setLogLevel("DEBUG")  
    
         targetLakehouse = spark_context.getConf().get("spark.targetLakehouse")
    
         if targetLakehouse is not None:
           print("targetLakehouse: " + str(targetLakehouse))
         else:
           print("targetLakehouse is None")
    
    df_valid_totalPrice = spark_session.sql("SELECT * FROM <YourLakeHouseDataTableName>.transactions where TotalPrice > 0")
    df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4))
    
    deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions"
    df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. Mentse a Python-fájlt helyileg. Ez a Python-kód hasznos adata két Spark-utasítást tartalmaz, amelyek egy Lakehouse-beli adatokon működnek, és fel kell tölteni a Lakehouse-ba. Szüksége lesz az adatterhelés ABFS-útvonalára, hogy hivatkozhasson rá a Visual Studio Code Livy API kötegelt feladatában, valamint a Lakehouse-tábla nevére a Select SQL utasításban.

    Képernyőkép a Python hasznos adatcelláról.

  3. Töltse fel a Python hasznos adatait a Lakehouse fájlszakaszára. > > Adatok lekérése Fájlok > feltöltése gombra kattintva a Fájlok/ beviteli mezőben.

    Képernyőkép a Lakehouse Fájlok szakaszának hasznos adatairól.

  4. Miután a fájl a Lakehouse Fájlok szakaszában található, kattintson a hasznos adatfájlnév jobb oldalán található három pontra, és válassza a Tulajdonságok lehetőséget.

    Képernyőkép a hasznos adat ABFS-elérési útról a Lakehouse-fájl tulajdonságai között.

  5. Másolja ezt az ABFS-elérési utat a jegyzetfüzetcellába az 1. lépésben.

Livy API Spark-köteg-munkamenet létrehozása

  1. Hozzon létre egy jegyzetfüzetet .ipynb a Visual Studio Code-ban, és szúrja be a következő kódot.

    
    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "<Entra_TenantID>"
    client_id = "<Entra_ClientID>"
    
    workspace_id = "<Fabric_WorkspaceID>"
    lakehouse_id = "<Fabric_LakehouseID>"
    
    app = PublicClientApplication(
       client_id,
       authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
         result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                    "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                    "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
       print(f"Access token: {result['access_token']}")
    else:
       print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
       access_token = result['access_token']
       api_base_url_mist='https://api.fabric.microsoft.com/v1'
       livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"
       headers = {"Authorization": "Bearer " + access_token}
    
  2. Futtassa a jegyzetfüzetcellát, egy előugró ablaknak kell megjelennie a böngészőben, amellyel kiválaszthatja a bejelentkezéshez használt identitást.

    Képernyőkép a Microsoft Entra alkalmazásba való bejelentkezés képernyőről.

  3. Miután kiválasztotta a bejelentkezéshez szükséges identitást, a rendszer kérni fogja a Microsoft Entra alkalmazásregisztrációs API-engedélyeinek jóváhagyását is.

    Képernyőkép a Microsoft Entra alkalmazás API-engedélyeinek megjelenítéséről.

  4. A hitelesítés befejezése után zárja be a böngészőablakot.

    Képernyőkép a hitelesítés befejezéséről.

  5. A Visual Studio Code-ban a Microsoft Entra-jogkivonatnak kell megjelennie.

    A cella futtatása és a bejelentkezés után visszaadott Microsoft Entra-jogkivonat képernyőképe.

  6. Adjon hozzá egy másik jegyzetfüzetcellát, és szúrja be ezt a kódot.

    # call get batch API
    
    get_livy_get_batch = livy_base_url
    get_batch_response = requests.get(get_livy_get_batch, headers=headers)
    if get_batch_response.status_code == 200:
       print("API call successful")
       print(get_batch_response.json())
    else:
       print(f"API call failed with status code: {get_batch_response.status_code}")
       print(get_batch_response.text)
    
  7. Futtassa a jegyzetfüzetcellát, a Livy-kötegelt feladat létrehozásakor két sornak kell megjelennie.

    Képernyőkép a kötegelt munkamenet létrehozásának eredményeiről.

Spark.sql utasítás elküldése a Livy API kötegelt munkamenetével

  1. Adjon hozzá egy másik jegyzetfüzetcellát, és szúrja be ezt a kódot.

    # submit payload to existing batch session
    
    print('Submit a spark job via the livy batch API to ') 
    
    newlakehouseName = "YourNewLakehouseName"
    create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items"
    create_lakehouse_payload = {
       "displayName": newlakehouseName,
       "type": 'Lakehouse'
    }
    
    create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload)
    print(create_lakehouse_response.json())
    
    payload_data = {
       "name":"livybatchdemo_with"+ newlakehouseName,
       "file":"abfss://YourABFSPathToYourPayload.py", 
       "conf": {
         "spark.targetLakehouse": "Fabric_LakehouseID"
       }
    }
    get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data)
    
    print("The Livy batch job submitted successful")
    print(get_batch_response.json())
    
  2. Futtassa a jegyzetfüzetcellát, a Livy Batch-feladat létrehozásakor és futtatásakor több sornak kell megjelennie.

    Képernyőkép a Visual Studio Code-ban a Livy Batch-feladat sikeres elküldésekor megjelenő eredményekről.

  3. Lépjen vissza a Lakehouse-hoz a módosítások megtekintéséhez.

Feladatok megtekintése a Monitorozási központban

A monitorozási központhoz a bal oldali navigációs hivatkozások Figyelő elemével tekintheti meg a különböző Apache Spark-tevékenységeket.

  1. Amikor a kötegelt feladat befejeződött, megtekintheti a munkamenet állapotát a Monitorra lépve.

    Képernyőkép a Korábbi Livy API-beküldésekről a Monitorozási központban.

  2. Válassza ki és nyissa meg a legutóbbi tevékenységnevet.

    Képernyőkép a Legutóbbi Livy API-tevékenységről a Monitorozási központban.

  3. Ebben a Livy API-munkamenet esetében láthatja az előző kötegküldést, a futtatási részleteket, a Spark-verziókat és a konfigurációt. Figyelje meg a leállított állapotot a jobb felső sarokban.

    Képernyőkép a Legutóbbi Livy API-tevékenység részleteiről a Monitorozási központban.

A teljes folyamat áttekintéséhez szüksége lesz egy távoli ügyfélre, például a Visual Studio Code-ra, a Microsoft Entra alkalmazás jogkivonatára, a Livy API végpontJÁNAK URL-címére, a Lakehouse-beli hitelesítésre, egy Spark hasznos adatra a Lakehouse-ban, és végül egy kötegelt Livy API-munkamenetre.