Zdieľať cez


Ako vytvoriť a aktualizovať definíciu úloh služby Spark pomocou rozhrania Microsoft Fabric Rest API

Microsoft Fabric Rest API poskytuje koncový bod služby pre operácie CRUD položiek tkaniny. V tomto kurze si prejdeme komplexný scenár, ako vytvoriť a aktualizovať artefakt definície úloh služby Spark. Sú zapojené tri kroky na vysokej úrovni:

  1. vytvorenie položky definície úlohy služby Spark s určitým počiatočným stavom
  2. nahrať hlavný súbor definície a iné súbory knižnice
  3. aktualizácia položky Spark Job Definition pomocou URL adresy OneLake hlavného súboru definície a ďalších súborov lib

Požiadavky

  1. Na prístup k rozhraniu Rest API služby Fabric sa vyžaduje token Microsoft Entra. Odporúča sa získať token knižnice MSAL. Ďalšie informácie nájdete v téme Podpora postupu overovania v aplikácii MSAL.
  2. Na prístup k API OneLake sa vyžaduje token úložiska. Ďalšie informácie nájdete v téme MSAL pre python.

Vytvorenie položky v definícii úlohy Spark s počiatočným stavom

Rozhranie Microsoft Fabric Rest API definuje jednotný koncový bod pre operácie CRUD položiek služby Fabric. Koncový bod je https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items.

Podrobnosti o položke sú zadané v tele textu požiadavky. Tu je príklad tela požiadavky na vytvorenie položky definície úlohy v službe Spark:

{
    "displayName": "SJDHelloWorld",
    "type": "SparkJobDefinition",
    "definition": {
        "format": "SparkJobDefinitionV1",
        "parts": [
            {
                "path": "SparkJobDefinitionV1.json",
                "payload":"eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9",
                "payloadType": "InlineBase64"
            }
        ]
    }
}

V tomto príklade je položka Definícia úlohy služby Spark pomenovaná ako SJDHelloWorld. Pole payload je obsah nastavenia podrobností s kódovaním base64 po dekódovaní a obsah je:

{
    "executableFile":null,
    "defaultLakehouseArtifactId":"",
    "mainClass":"",
    "additionalLakehouseIds":[],
    "retryPolicy":null,
    "commandLineArguments":"",
    "additionalLibraryUris":[],
    "language":"",
    "environmentArtifactId":null
}

Tu sú dve pomocné funkcie na kódovanie a dekódovanie podrobného nastavenia:

import base64

def json_to_base64(json_data):
    # Serialize the JSON data to a string
    json_string = json.dumps(json_data)
    
    # Encode the JSON string as bytes
    json_bytes = json_string.encode('utf-8')
    
    # Encode the bytes as Base64
    base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
    
    return base64_encoded

def base64_to_json(base64_data):
    # Decode the Base64-encoded string to bytes
    base64_bytes = base64_data.encode('utf-8')
    
    # Decode the bytes to a JSON string
    json_string = base64.b64decode(base64_bytes).decode('utf-8')
    
    # Deserialize the JSON string to a Python dictionary
    json_data = json.loads(json_string)
    
    return json_data

Tu je úryvok kódu na vytvorenie položky definície úlohy služby Spark:

import requests

bearerToken = "breadcrumb"; # replace this token with the real AAD token

headers = {
    "Authorization": f"Bearer {bearerToken}", 
    "Content-Type": "application/json"  # Set the content type based on your request
}

payload = "eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9"

# Define the payload data for the POST request
payload_data = {
    "displayName": "SJDHelloWorld",
    "Type": "SparkJobDefinition",
    "definition": {
        "format": "SparkJobDefinitionV1",
        "parts": [
            {
                "path": "SparkJobDefinitionV1.json",
                "payload": payload,
                "payloadType": "InlineBase64"
            }
        ]
    }
}

# Make the POST request with Bearer authentication
sjdCreateUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items"
response = requests.post(sjdCreateUrl, json=payload_data, headers=headers)

Nahratie hlavného súboru definície a ďalších súborov knižnice

Na nahratie súboru do služby OneLake sa vyžaduje token úložiska. Tu je pomocná funkcia na získanie tokenu úložiska:


import msal

def getOnelakeStorageToken():
    app = msal.PublicClientApplication(
        "{client id}", # this filed should be the client id 
        authority="https://login.microsoftonline.com/microsoft.com")

    result = app.acquire_token_interactive(scopes=["https://storage.azure.com/.default"])

    print(f"Successfully acquired AAD token with storage audience:{result['access_token']}")

    return result['access_token']

Teraz máme vytvorenú položku Spark Job Definition, aby bola spúšťateľná, musíme nastaviť hlavný súbor definície a požadované vlastnosti. Koncový bod nahrávania súboru pre túto položku SJD je https://onelake.dfs.fabric.microsoft.com/{workspaceId}/{sjdartifactid}. Mal by sa použiť rovnaký "workspaceId" z predchádzajúceho kroku, hodnotu "sjdartifactid" možno nájsť v tele odpovede predchádzajúceho kroku. Tu je úryvok kódu na nastavenie hlavného súboru definície:

import requests

# three steps are required: create file, append file, flush file

onelakeEndPoint = "https://onelake.dfs.fabric.microsoft.com/workspaceId/sjdartifactid"; # replace the id of workspace and artifact with the right one
mainExecutableFile = "main.py"; # the name of the main executable file
mainSubFolder = "Main"; # the sub folder name of the main executable file. Don't change this value


onelakeRequestMainFileCreateUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?resource=file" # the url for creating the main executable file via the 'file' resource type
onelakePutRequestHeaders = {
    "Authorization": f"Bearer {onelakeStorageToken}", # the storage token can be achieved from the helper function above
}

onelakeCreateMainFileResponse = requests.put(onelakeRequestMainFileCreateUrl, headers=onelakePutRequestHeaders)
if onelakeCreateMainFileResponse.status_code == 201:
    # Request was successful
    print(f"Main File '{mainExecutableFile}' was successfully created in onelake.")

# with previous step, the main executable file is created in OneLake, now we need to append the content of the main executable file

appendPosition = 0;
appendAction = "append";

### Main File Append.
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes
onelakeRequestMainFileAppendUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={appendPosition}&action={appendAction}";
mainFileContents = "filename = 'Files/' + Constant.filename; tablename = 'Tables/' + Constant.tablename"; # the content of the main executable file, please replace this with the real content of the main executable file
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes, this value should match the size of the mainFileContents

onelakePatchRequestHeaders = {
    "Authorization": f"Bearer {onelakeStorageToken}",
    "Content-Type" : "text/plain"
}

onelakeAppendMainFileResponse = requests.patch(onelakeRequestMainFileAppendUrl, data = mainFileContents, headers=onelakePatchRequestHeaders)
if onelakeAppendMainFileResponse.status_code == 202:
    # Request was successful
    print(f"Successfully Accepted Main File '{mainExecutableFile}' append data.")

# with previous step, the content of the main executable file is appended to the file in OneLake, now we need to flush the file

flushAction = "flush";

### Main File flush
onelakeRequestMainFileFlushUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={mainExecutableFileSizeInBytes}&action={flushAction}"
print(onelakeRequestMainFileFlushUrl)
onelakeFlushMainFileResponse = requests.patch(onelakeRequestMainFileFlushUrl, headers=onelakePatchRequestHeaders)
if onelakeFlushMainFileResponse.status_code == 200:
    print(f"Successfully Flushed Main File '{mainExecutableFile}' contents.")
else:
    print(onelakeFlushMainFileResponse.json())

Postupujte podľa rovnakého procesu a v prípade potreby nahrajte ďalšie súbory lib.

Aktualizujte položku Spark Job Definition pomocou URL adresy OneLake hlavného súboru definície a iných súborov lib.

Doteraz sme vytvorili položku Spark Job Definition s počiatočným stavom, nahrali hlavný súbor definície a iné súbory lib, Posledným krokom je aktualizácia položky Spark Job Definition a nastavenie vlastností URL adresy hlavného súboru definície a ďalších súborov lib. Koncový bod aktualizácie položky Spark Job Definition je https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}. Mali by ste použiť rovnaké "workspaceId" a "sjdartifactid" z predchádzajúcich krokov. Tu je úryvok kódu na aktualizáciu položky definície úlohy služby Spark:


mainAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Main/{mainExecutableFile}" # the workspaceId and sjdartifactid are the same as previous steps, the mainExecutableFile is the name of the main executable file
libsAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Libs/{libsFile}"  # the workspaceId and sjdartifactid are the same as previous steps, the libsFile is the name of the libs file
defaultLakehouseId = 'defaultLakehouseid'; # replace this with the real default lakehouse id

updateRequestBodyJson = {
    "executableFile":mainAbfssPath,
    "defaultLakehouseArtifactId":defaultLakehouseId,
    "mainClass":"",
    "additionalLakehouseIds":[],
    "retryPolicy":None,
    "commandLineArguments":"",
    "additionalLibraryUris":[libsAbfssPath],
    "language":"Python",
    "environmentArtifactId":None}

# Encode the bytes as a Base64-encoded string
base64EncodedUpdateSJDPayload = json_to_base64(updateRequestBodyJson)

# Print the Base64-encoded string
print("Base64-encoded JSON payload for SJD Update:")
print(base64EncodedUpdateSJDPayload)

# Define the API URL
updateSjdUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items/{sjdartifactid}/updateDefinition"

updatePayload = base64EncodedUpdateSJDPayload
payloadType = "InlineBase64"
path = "SparkJobDefinitionV1.json"
format = "SparkJobDefinitionV1"
Type = "SparkJobDefinition"

# Define the headers with Bearer authentication
bearerToken = "breadcrumb"; # replace this token with the real AAD token

headers = {
    "Authorization": f"Bearer {bearerToken}", 
    "Content-Type": "application/json"  # Set the content type based on your request
}

# Define the payload data for the POST request
payload_data = {
    "displayName": "sjdCreateTest11",
    "Type": Type,
    "definition": {
        "format": format,
        "parts": [
            {
                "path": path,
                "payload": updatePayload,
                "payloadType": payloadType
            }
        ]
    }
}


# Make the POST request with Bearer authentication
response = requests.post(updateSjdUrl, json=payload_data, headers=headers)
if response.status_code == 200:
    print("Successfully updated SJD.")
else:
    print(response.json())
    print(response.status_code)

Na opakovanie celého procesu je potrebné vytvoriť a aktualizovať položku služby Spark Job Definition pomocou rozhrania REST API služby Fabric aj rozhrania API OneLake. Rozhranie REST API služby Fabric sa používa na vytvorenie a aktualizáciu položky Spark Job Definition, rozhranie API OneLake sa používa na nahrávanie súboru hlavnej definície a iných súborov lib. Hlavný súbor definície a ďalšie súbory lib sú nahrané do OneLake prvý. Potom sa vlastnosti URL adresy hlavného súboru definície a ďalších súborov knižnice nastavia v položke Definícia úlohy služby Spark.