แชร์ผ่าน


ใช้ Livy API เพื่อส่งและดําเนินการชุดงาน Livy

หมายเหตุ

Livy API สําหรับ Fabric วิศวกรข้อมูล ing อยู่ในตัวอย่าง

นําไปใช้กับ:✅ วิศวกรข้อมูลและวิทยาศาสตร์ข้อมูลใน Microsoft Fabric

ส่งชุดงาน Spark โดยใช้ Livy API สําหรับ Fabric วิศวกรข้อมูล

ข้อกำหนดเบื้องต้น

Livy API กําหนดจุดสิ้นสุดแบบรวมสําหรับการดําเนินการ แทนที่พื้นที่ที่สํารองไว้ {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} และ {Fabric_LakehouseID} ด้วยค่าที่เหมาะสมเมื่อคุณทําตามตัวอย่างในบทความนี้

กําหนดค่ารหัส Visual Studio สําหรับชุด Livy API ของคุณ

  1. เลือก การตั้งค่า เลคเฮ้าส์ใน Fabric Lakehouse ของคุณ

    สกรีนช็อตที่แสดงการตั้งค่าของเลคเฮ้าส์

  2. นําทางไปยังส่วน จุด สิ้นสุด Livy

    สกรีนช็อตที่แสดงจุดสิ้นสุดของ Lakehouse Livy และสายอักขระการเชื่อมต่องานเซสชัน

  3. คัดลอกสายอักขระการเชื่อมต่อชุดงาน (กล่องสีแดงที่สองในรูปภาพ) ไปยังรหัสของคุณ

  4. ไปที่ ศูนย์การจัดการ Microsoft Entra และคัดลอกทั้ง ID แอปพลิเคชัน (ไคลเอนต์) และ ID ไดเรกทอรี (ผู้เช่า) ไปยังรหัสของคุณ

    สกรีนช็อตแสดงภาพรวมแอป Livy API ในศูนย์การจัดการ Microsoft Entra

สร้างเพย์โหลด Spark และอัปโหลดไปยัง Lakehouse ของคุณ

  1. .ipynbสร้างสมุดบันทึกใน Visual Studio Code และแทรกโค้ดต่อไปนี้

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
         #Spark session builder
         spark_session = (SparkSession
           .builder
           .appName("livybatchdemo") 
           .getOrCreate())
    
         spark_context = spark_session.sparkContext
         spark_context.setLogLevel("DEBUG")  
    
         targetLakehouse = spark_context.getConf().get("spark.targetLakehouse")
    
         if targetLakehouse is not None:
           print("targetLakehouse: " + str(targetLakehouse))
         else:
           print("targetLakehouse is None")
    
    df_valid_totalPrice = spark_session.sql("SELECT * FROM <YourLakeHouseDataTableName>.transactions where TotalPrice > 0")
    df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4))
    
    deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions"
    df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. บันทึกไฟล์ Python ภายในเครื่อง ส่วนข้อมูลรหัส Python นี้ประกอบด้วยคําสั่ง Spark สองคําสั่งที่ทํางานบนข้อมูลใน Lakehouse และจําเป็นต้องอัปโหลดไปยังเลคเฮ้าส์ของคุณ คุณจะต้องมีเส้นทาง ABFS ของส่วนข้อมูลเพื่ออ้างอิงในชุดงาน Livy API ของคุณในรหัส Visual Studio Code และชื่อตาราง Lakehouse ของคุณในคําสั่ง เลือก SQL

    สกรีนช็อตที่แสดงเซลล์ส่วนข้อมูล Python

  3. อัปโหลดส่วนข้อมูล Python ไปยังส่วนไฟล์ของเลคเฮ้าส์ของคุณ > รับข้อมูล > อัปโหลดไฟล์ > คลิกในกล่องไฟล์/ ป้อนเข้า

    สกรีนช็อตที่แสดงส่วนข้อมูลในส่วนไฟล์ของเลคเฮ้าส์

  4. หลังจากที่ไฟล์อยู่ในส่วนของไฟล์ของเลคเฮ้าส์ของคุณ คลิกที่สามจุดทางด้านขวาของชื่อไฟล์ส่วนข้อมูลของคุณ และเลือกคุณสมบัติ

    สกรีนช็อตแสดงเส้นทางของส่วนข้อมูล ABFS ในคุณสมบัติของไฟล์ในเลคเฮ้าส์

  5. คัดลอกเส้นทาง ABFS นี้ไปยังเซลล์ของสมุดบันทึกของคุณในขั้นตอนที่ 1

สร้างเซสชันชุดงาน Livy API Spark

  1. .ipynbสร้างสมุดบันทึกใน Visual Studio Code และแทรกโค้ดต่อไปนี้

    
    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "<Entra_TenantID>"
    client_id = "<Entra_ClientID>"
    
    workspace_id = "<Fabric_WorkspaceID>"
    lakehouse_id = "<Fabric_LakehouseID>"
    
    app = PublicClientApplication(
       client_id,
       authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
         result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                    "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                    "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
       print(f"Access token: {result['access_token']}")
    else:
       print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
       access_token = result['access_token']
       api_base_url_mist='https://api.fabric.microsoft.com/v1'
       livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"
       headers = {"Authorization": "Bearer " + access_token}
    
  2. เรียกใช้เซลล์ของสมุดบันทึก ป็อปอัพควรปรากฏในเบราว์เซอร์ของคุณ เพื่อให้คุณสามารถเลือกข้อมูลประจําตัวที่จะลงชื่อเข้าใช้ได้

    สกรีนช็อตที่แสดงหน้าจอเข้าสู่ระบบไปยังแอป Microsoft Entra

  3. หลังจากที่คุณเลือกข้อมูลประจําตัวที่จะลงชื่อเข้าใช้แล้ว คุณจะถูกขอให้อนุมัติสิทธิ์ API การลงทะเบียนแอป Microsoft Entra

    สกรีนช็อตที่แสดงสิทธิ์ API ของแอป Microsoft Entra

  4. ปิดหน้าต่างเบราว์เซอร์หลังจากเสร็จสิ้นการรับรองความถูกต้อง

    สกรีนช็อตที่แสดงการรับรองความถูกต้องเสร็จสมบูรณ์

  5. ใน Visual Studio Code คุณควรเห็นโทเค็น Microsoft Entra ที่ส่งกลับมา

    สกรีนช็อตแสดงโทเค็น Microsoft Entra ที่ส่งกลับหลังจากเรียกใช้เซลล์และเข้าสู่ระบบ

  6. เพิ่มเซลล์สมุดบันทึกอีกเซลล์และแทรกโค้ดนี้

    # call get batch API
    
    get_livy_get_batch = livy_base_url
    get_batch_response = requests.get(get_livy_get_batch, headers=headers)
    if get_batch_response.status_code == 200:
       print("API call successful")
       print(get_batch_response.json())
    else:
       print(f"API call failed with status code: {get_batch_response.status_code}")
       print(get_batch_response.text)
    
  7. เรียกใช้เซลล์สมุดบันทึก คุณควรเห็นบรรทัดสองบรรทัดที่พิมพ์เมื่อสร้างชุดงาน Livy แล้ว

    สกรีนช็อตที่แสดงผลลัพธ์ของการสร้างเซสชันชุดงาน

ส่งคําสั่ง spark.sql โดยใช้เซสชันชุดงาน Livy API

  1. เพิ่มเซลล์สมุดบันทึกอีกเซลล์และแทรกโค้ดนี้

    # submit payload to existing batch session
    
    print('Submit a spark job via the livy batch API to ') 
    
    newlakehouseName = "YourNewLakehouseName"
    create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items"
    create_lakehouse_payload = {
       "displayName": newlakehouseName,
       "type": 'Lakehouse'
    }
    
    create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload)
    print(create_lakehouse_response.json())
    
    payload_data = {
       "name":"livybatchdemo_with"+ newlakehouseName,
       "file":"abfss://YourABFSPathToYourPayload.py", 
       "conf": {
         "spark.targetLakehouse": "Fabric_LakehouseID"
       }
    }
    get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data)
    
    print("The Livy batch job submitted successful")
    print(get_batch_response.json())
    
  2. เรียกใช้เซลล์ของสมุดบันทึก คุณควรเห็นบรรทัดหลายบรรทัดที่พิมพ์เมื่อสร้างและเรียกใช้งานชุด Livy

    สกรีนช็อตที่แสดงผลลัพธ์ในรหัส Visual Studio หลังจากส่งงานชุด Livy เรียบร้อยแล้ว

  3. นําทางกลับไปยังเลคเฮาส์ของคุณเพื่อดูการเปลี่ยนแปลง

ดูงานของคุณในฮับการตรวจสอบ

คุณสามารถเข้าถึงฮับการตรวจสอบเพื่อดูกิจกรรม Apache Spark ต่าง ๆ ได้โดยการเลือก การตรวจสอบ ในลิงก์การนําทางด้านซ้าย

  1. เมื่อชุดงานเสร็จสมบูรณ์ คุณสามารถดูสถานะเซสชันโดยนําทางไปยัง การตรวจสอบ

    สกรีนช็อตที่แสดงการส่ง Livy API ก่อนหน้านี้ในฮับการตรวจสอบ

  2. เลือกและเปิดชื่อกิจกรรมล่าสุด

    สกรีนช็อตแสดงกิจกรรม Livy API ล่าสุดในฮับการตรวจสอบ

  3. ในกรณีเซสชัน Livy API นี้ คุณสามารถดูการส่งชุดก่อนหน้า ของคุณ เรียกใช้รายละเอียด เวอร์ชัน Spark และการกําหนดค่าของคุณ สังเกตสถานะหยุดที่ด้านบนขวา

    สกรีนช็อตที่แสดงรายละเอียดกิจกรรม Livy API ล่าสุดในฮับการตรวจสอบ

ในการสรุปกระบวนการทั้งหมด คุณต้องมีไคลเอ็นต์ระยะไกล เช่น Visual Studio Code โทเค็นแอป Microsoft Entra URL จุดสิ้นสุด Livy API การรับรองความถูกต้องกับเลคเฮ้าส์ของคุณ เพย์โหลด Spark ในเลคเฮ้าส์ของคุณ และในที่สุดเซสชัน Livy API เป็นชุด