Spark-feladatdefiníció létrehozása és frissítése a Microsoft Fabric Rest API-val

A Microsoft Fabric Rest API szolgáltatásvégpontot biztosít a Fabric-elemek CRUD-műveleteihez. Ebben az oktatóanyagban végigvezetjük a Spark-feladatdefiníciós összetevő létrehozásának és frissítésének részletes forgatókönyvét. Három magas szintű lépésről van szó:

  1. Spark-feladatdefiníciós elem létrehozása kezdeti állapottal
  2. töltse fel a fő definíciós fájlt és más lib fájlokat
  3. frissítse a Spark-feladatdefiníciós elemet a fő definíciós fájl és más libfájlok OneLake URL-címével


  1. A Fabric Rest API eléréséhez Microsoft Entra-jogkivonat szükséges. A jogkivonat lekéréséhez ajánlott az MSAL-kódtár. További információ: Hitelesítési folyamat támogatása az MSAL-ben.
  2. A OneLake API eléréséhez tárolási jogkivonat szükséges. További információ: MSAL for Python.

Spark-feladatdefiníciós elem létrehozása a kezdeti állapottal

A Microsoft Fabric Rest API egységes végpontot határoz meg a Fabric-elemek CRUD-műveleteihez. A végpont az{workspaceId}/items.

Az elem részletei a kérelem törzsében vannak megadva. Íme egy példa a Spark-feladatdefiníciós elem létrehozásához szükséges kérelem törzsére:

    "displayName": "SJDHelloWorld",
    "type": "SparkJobDefinition",
    "definition": {
        "format": "SparkJobDefinitionV1",
        "parts": [
                "path": "SparkJobDefinitionV1.json",
                "payloadType": "InlineBase64"

Ebben a példában a Spark-feladatdefiníció elem neve .SJDHelloWorld A payload mező a részletes beállítás base64 kódolású tartalma, a dekódolás után a tartalom a következő:


Az alábbiakban két segédfüggvényt talál a részletes beállítás kódolásához és dekódolásához:

import base64

def json_to_base64(json_data):
    # Serialize the JSON data to a string
    json_string = json.dumps(json_data)
    # Encode the JSON string as bytes
    json_bytes = json_string.encode('utf-8')
    # Encode the bytes as Base64
    base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
    return base64_encoded

def base64_to_json(base64_data):
    # Decode the Base64-encoded string to bytes
    base64_bytes = base64_data.encode('utf-8')
    # Decode the bytes to a JSON string
    json_string = base64.b64decode(base64_bytes).decode('utf-8')
    # Deserialize the JSON string to a Python dictionary
    json_data = json.loads(json_string)
    return json_data

A Spark-feladatdefiníciós elem létrehozásához a kódrészlet a következő:

import requests

bearerToken = "breadcrumb"; # replace this token with the real AAD token

headers = {
    "Authorization": f"Bearer {bearerToken}", 
    "Content-Type": "application/json"  # Set the content type based on your request

payload = "eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9"

# Define the payload data for the POST request
payload_data = {
    "displayName": "SJDHelloWorld",
    "Type": "SparkJobDefinition",
    "definition": {
        "format": "SparkJobDefinitionV1",
        "parts": [
                "path": "SparkJobDefinitionV1.json",
                "payload": payload,
                "payloadType": "InlineBase64"

# Make the POST request with Bearer authentication
sjdCreateUrl = f"{workspaceId}/items"
response =, json=payload_data, headers=headers)

Töltse fel a fő definíciós fájlt és más lib fájlokat

A fájl OneLake-be való feltöltéséhez tárolási jogkivonat szükséges. Íme egy segédfüggvény a tárolási jogkivonat lekéréséhez:

import msal

def getOnelakeStorageToken():
    app = msal.PublicClientApplication(
        "{client id}", # this filed should be the client id 

    result = app.acquire_token_interactive(scopes=[""])

    print(f"Successfully acquired AAD token with storage audience:{result['access_token']}")

    return result['access_token']

Most létrehoztunk egy Spark-feladatdefiníciós elemet, hogy futtatható legyen, be kell állítanunk a fő definíciós fájlt és a szükséges tulajdonságokat. Az SJD-elemhez tartozó fájl feltöltésének végpontja a következő{workspaceId}/{sjdartifactid}. Az előző lépés "workspaceId" azonosítóját kell használni, a "sjdartifactid" értéke az előző lépés választörzsében található. A fő definíciós fájl beállításához az alábbi kódrészletet kell megadni:

import requests

# three steps are required: create file, append file, flush file

onelakeEndPoint = ""; # replace the id of workspace and artifact with the right one
mainExecutableFile = ""; # the name of the main executable file
mainSubFolder = "Main"; # the sub folder name of the main executable file. Don't change this value

onelakeRequestMainFileCreateUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?resource=file" # the url for creating the main executable file via the 'file' resource type
onelakePutRequestHeaders = {
    "Authorization": f"Bearer {onelakeStorageToken}", # the storage token can be achieved from the helper function above

onelakeCreateMainFileResponse = requests.put(onelakeRequestMainFileCreateUrl, headers=onelakePutRequestHeaders)
if onelakeCreateMainFileResponse.status_code == 201:
    # Request was successful
    print(f"Main File '{mainExecutableFile}' was successfully created in onelake.")

# with previous step, the main executable file is created in OneLake, now we need to append the content of the main executable file

appendPosition = 0;
appendAction = "append";

### Main File Append.
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes
onelakeRequestMainFileAppendUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={appendPosition}&action={appendAction}";
mainFileContents = "filename = 'Files/' + Constant.filename; tablename = 'Tables/' + Constant.tablename"; # the content of the main executable file, please replace this with the real content of the main executable file
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes, this value should match the size of the mainFileContents

onelakePatchRequestHeaders = {
    "Authorization": f"Bearer {onelakeStorageToken}",
    "Content-Type" : "text/plain"

onelakeAppendMainFileResponse = requests.patch(onelakeRequestMainFileAppendUrl, data = mainFileContents, headers=onelakePatchRequestHeaders)
if onelakeAppendMainFileResponse.status_code == 202:
    # Request was successful
    print(f"Successfully Accepted Main File '{mainExecutableFile}' append data.")

# with previous step, the content of the main executable file is appended to the file in OneLake, now we need to flush the file

flushAction = "flush";

### Main File flush
onelakeRequestMainFileFlushUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={mainExecutableFileSizeInBytes}&action={flushAction}"
onelakeFlushMainFileResponse = requests.patch(onelakeRequestMainFileFlushUrl, headers=onelakePatchRequestHeaders)
if onelakeFlushMainFileResponse.status_code == 200:
    print(f"Successfully Flushed Main File '{mainExecutableFile}' contents.")

Kövesse ugyanezt a folyamatot a többi lib-fájl szükség esetén való feltöltéséhez.

Frissítse a Spark-feladatdefiníciós elemet a fő definíciós fájl és más libfájlok OneLake URL-címével

Eddig létrehoztunk egy Spark-feladatdefiníciós elemet valamilyen kezdeti állapottal, feltöltöttük a fődefiníciós fájlt és más libfájlokat. Az utolsó lépés a Spark-feladatdefiníció elem frissítése a fő definíciós fájl és más lib fájlok URL-tulajdonságainak beállításához. A Spark-feladatdefiníció elem frissítésének végpontja a következő{workspaceId}/items/{sjdartifactid}. Ugyanazt a "workspaceId" és "sjdartifactid" értéket kell használni az előző lépésekben. A Spark-feladatdefiníció elem frissítéséhez a kódrészlet a következő:

mainAbfssPath = f"abfss://{workspaceId}{sjdartifactid}/Main/{mainExecutableFile}" # the workspaceId and sjdartifactid are the same as previous steps, the mainExecutableFile is the name of the main executable file
libsAbfssPath = f"abfss://{workspaceId}{sjdartifactid}/Libs/{libsFile}"  # the workspaceId and sjdartifactid are the same as previous steps, the libsFile is the name of the libs file
defaultLakehouseId = 'defaultLakehouseid'; # replace this with the real default lakehouse id

updateRequestBodyJson = {

# Encode the bytes as a Base64-encoded string
base64EncodedUpdateSJDPayload = json_to_base64(updateRequestBodyJson)

# Print the Base64-encoded string
print("Base64-encoded JSON payload for SJD Update:")

# Define the API URL
updateSjdUrl = f"{workspaceId}/items/{sjdartifactid}/updateDefinition"

updatePayload = base64EncodedUpdateSJDPayload
payloadType = "InlineBase64"
path = "SparkJobDefinitionV1.json"
format = "SparkJobDefinitionV1"
Type = "SparkJobDefinition"

# Define the headers with Bearer authentication
bearerToken = "breadcrumb"; # replace this token with the real AAD token

headers = {
    "Authorization": f"Bearer {bearerToken}", 
    "Content-Type": "application/json"  # Set the content type based on your request

# Define the payload data for the POST request
payload_data = {
    "displayName": "sjdCreateTest11",
    "Type": Type,
    "definition": {
        "format": format,
        "parts": [
                "path": path,
                "payload": updatePayload,
                "payloadType": payloadType

# Make the POST request with Bearer authentication
response =, json=payload_data, headers=headers)
if response.status_code == 200:
    print("Successfully updated SJD.")

A teljes folyamat áttekintéséhez a Fabric REST API-ra és a OneLake API-ra is szükség van egy Spark-feladatdefiníciós elem létrehozásához és frissítéséhez. A Fabric REST API a Spark-feladatdefiníció elem létrehozásához és frissítéséhez használható, a OneLake API a fő definíciós fájl és más libfájlok feltöltésére szolgál. A fő definíciós fájl és az egyéb libfájlok először a OneLake-be lesznek feltöltve. Ezután a fő definíciós fájl és az egyéb lib fájlok URL-tulajdonságai a Spark-feladatdefiníció elemben lesznek beállítva.