Microsoft Fabric Rest API를 사용하여 Spark 작업 정의를 만들고 업데이트하는 방법
Microsoft Fabric Rest API는 Fabric 항목의 CRUD 작업에 대한 서비스 엔드포인트를 제공합니다. 이 자습서에서는 Spark 작업 정의 아티팩트를 만들고 업데이트하는 방법에 대한 엔드투엔드 시나리오를 안내합니다. 세 가지 고급 단계가 포함됩니다.
- 초기 상태의 Spark 작업 정의 항목 만들기
- 기본 정의 파일 및 기타 lib 파일 업로드
- 기본 정의 파일 및 기타 lib 파일의 OneLake URL로 Spark 작업 정의 항목 업데이트
필수 조건
- Fabric Rest API에 액세스하려면 Microsoft Entra 토큰이 필요합니다. MSAL 라이브러리는 토큰을 가져오는 것이 좋습니다. 자세한 내용은 MSAL의 인증 흐름 지원을 참조하세요.
- OneLake API에 액세스하려면 스토리지 토큰이 필요합니다. 자세한 내용은 Python용 MSAL를 참조하세요.
초기 상태로 Spark 작업 정의 항목 만들기
Microsoft Fabric Rest API는 Fabric 항목의 CRUD 작업을 위한 통합 엔드포인트를 정의합니다. 엔드포인트가 https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items
인 경우
항목 세부 정보는 요청 본문 내에 지정됩니다. Spark 작업 정의 항목을 만들기 위한 요청 본문의 예는 다음과 같습니다.
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV1",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload":"eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9",
"payloadType": "InlineBase64"
}
]
}
}
이 예제에서는 Spark 작업 정의 항목의 이름을 SJDHelloWorld
로 지정합니다. payload
필드는 디코딩 후 세부 설정의 base64로 인코딩된 콘텐츠입니다. 콘텐츠는 다음과 같습니다.
{
"executableFile":null,
"defaultLakehouseArtifactId":"",
"mainClass":"",
"additionalLakehouseIds":[],
"retryPolicy":null,
"commandLineArguments":"",
"additionalLibraryUris":[],
"language":"",
"environmentArtifactId":null
}
다음은 자세한 설정을 인코딩하고 디코딩하는 두 가지 도우미 함수입니다.
import base64
def json_to_base64(json_data):
# Serialize the JSON data to a string
json_string = json.dumps(json_data)
# Encode the JSON string as bytes
json_bytes = json_string.encode('utf-8')
# Encode the bytes as Base64
base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
return base64_encoded
def base64_to_json(base64_data):
# Decode the Base64-encoded string to bytes
base64_bytes = base64_data.encode('utf-8')
# Decode the bytes to a JSON string
json_string = base64.b64decode(base64_bytes).decode('utf-8')
# Deserialize the JSON string to a Python dictionary
json_data = json.loads(json_string)
return json_data
Spark 작업 정의 항목을 만드는 코드 조각은 다음과 같습니다.
import requests
bearerToken = "breadcrumb"; # replace this token with the real AAD token
headers = {
"Authorization": f"Bearer {bearerToken}",
"Content-Type": "application/json" # Set the content type based on your request
}
payload = "eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9"
# Define the payload data for the POST request
payload_data = {
"displayName": "SJDHelloWorld",
"Type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV1",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": payload,
"payloadType": "InlineBase64"
}
]
}
}
# Make the POST request with Bearer authentication
sjdCreateUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items"
response = requests.post(sjdCreateUrl, json=payload_data, headers=headers)
기본 정의 파일 및 기타 lib 파일 업로드
OneLake에 파일을 업로드하려면 스토리지 토큰이 필요합니다. 스토리지 토큰을 가져오는 도우미 함수는 다음과 같습니다.
import msal
def getOnelakeStorageToken():
app = msal.PublicClientApplication(
"{client id}", # this filed should be the client id
authority="https://login.microsoftonline.com/microsoft.com")
result = app.acquire_token_interactive(scopes=["https://storage.azure.com/.default"])
print(f"Successfully acquired AAD token with storage audience:{result['access_token']}")
return result['access_token']
이제 Spark 작업 정의 항목을 만들어 실행 가능하도록 하려면 기본 정의 파일 및 필수 속성을 설정해야 합니다. 이 SJD 항목에 대한 파일을 업로드하기 위한 엔드포인트는 https://onelake.dfs.fabric.microsoft.com/{workspaceId}/{sjdartifactid}
과 같습니다. 이전 단계와 동일한 "workspaceId"를 사용해야 하며, 이전 단계의 응답 본문에서 "sjdartifactid" 값을 찾을 수 있습니다. 기본 정의 파일을 설정하는 코드 조각은 다음과 같습니다.
import requests
# three steps are required: create file, append file, flush file
onelakeEndPoint = "https://onelake.dfs.fabric.microsoft.com/workspaceId/sjdartifactid"; # replace the id of workspace and artifact with the right one
mainExecutableFile = "main.py"; # the name of the main executable file
mainSubFolder = "Main"; # the sub folder name of the main executable file. Don't change this value
onelakeRequestMainFileCreateUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?resource=file" # the url for creating the main executable file via the 'file' resource type
onelakePutRequestHeaders = {
"Authorization": f"Bearer {onelakeStorageToken}", # the storage token can be achieved from the helper function above
}
onelakeCreateMainFileResponse = requests.put(onelakeRequestMainFileCreateUrl, headers=onelakePutRequestHeaders)
if onelakeCreateMainFileResponse.status_code == 201:
# Request was successful
print(f"Main File '{mainExecutableFile}' was successfully created in onelake.")
# with previous step, the main executable file is created in OneLake, now we need to append the content of the main executable file
appendPosition = 0;
appendAction = "append";
### Main File Append.
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes
onelakeRequestMainFileAppendUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={appendPosition}&action={appendAction}";
mainFileContents = "filename = 'Files/' + Constant.filename; tablename = 'Tables/' + Constant.tablename"; # the content of the main executable file, please replace this with the real content of the main executable file
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes, this value should match the size of the mainFileContents
onelakePatchRequestHeaders = {
"Authorization": f"Bearer {onelakeStorageToken}",
"Content-Type" : "text/plain"
}
onelakeAppendMainFileResponse = requests.patch(onelakeRequestMainFileAppendUrl, data = mainFileContents, headers=onelakePatchRequestHeaders)
if onelakeAppendMainFileResponse.status_code == 202:
# Request was successful
print(f"Successfully Accepted Main File '{mainExecutableFile}' append data.")
# with previous step, the content of the main executable file is appended to the file in OneLake, now we need to flush the file
flushAction = "flush";
### Main File flush
onelakeRequestMainFileFlushUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={mainExecutableFileSizeInBytes}&action={flushAction}"
print(onelakeRequestMainFileFlushUrl)
onelakeFlushMainFileResponse = requests.patch(onelakeRequestMainFileFlushUrl, headers=onelakePatchRequestHeaders)
if onelakeFlushMainFileResponse.status_code == 200:
print(f"Successfully Flushed Main File '{mainExecutableFile}' contents.")
else:
print(onelakeFlushMainFileResponse.json())
필요한 경우 동일한 프로세스에 따라 다른 lib 파일을 업로드합니다.
기본 정의 파일 및 기타 lib 파일의 OneLake URL로 Spark 작업 정의 항목 업데이트
지금까지 몇 가지 초기 상태의 Spark 작업 정의 항목을 만들고 기본 정의 파일 및 기타 lib 파일을 업로드했습니다. 마지막 단계는 Spark 작업 정의 항목을 업데이트하여 기본 정의 파일 및 기타 lib 파일의 URL 속성을 설정하는 것입니다. Spark 작업 정의 항목을 업데이트하기 위한 엔드포인트는 https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}
입니다. 이전 단계와 동일한 "workspaceId" 및 "sjdartifactid"를 사용해야 합니다. Spark 작업 정의 항목을 업데이트하는 코드 조각은 다음과 같습니다.
mainAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Main/{mainExecutableFile}" # the workspaceId and sjdartifactid are the same as previous steps, the mainExecutableFile is the name of the main executable file
libsAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Libs/{libsFile}" # the workspaceId and sjdartifactid are the same as previous steps, the libsFile is the name of the libs file
defaultLakehouseId = 'defaultLakehouseid'; # replace this with the real default lakehouse id
updateRequestBodyJson = {
"executableFile":mainAbfssPath,
"defaultLakehouseArtifactId":defaultLakehouseId,
"mainClass":"",
"additionalLakehouseIds":[],
"retryPolicy":None,
"commandLineArguments":"",
"additionalLibraryUris":[libsAbfssPath],
"language":"Python",
"environmentArtifactId":None}
# Encode the bytes as a Base64-encoded string
base64EncodedUpdateSJDPayload = json_to_base64(updateRequestBodyJson)
# Print the Base64-encoded string
print("Base64-encoded JSON payload for SJD Update:")
print(base64EncodedUpdateSJDPayload)
# Define the API URL
updateSjdUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items/{sjdartifactid}/updateDefinition"
updatePayload = base64EncodedUpdateSJDPayload
payloadType = "InlineBase64"
path = "SparkJobDefinitionV1.json"
format = "SparkJobDefinitionV1"
Type = "SparkJobDefinition"
# Define the headers with Bearer authentication
bearerToken = "breadcrumb"; # replace this token with the real AAD token
headers = {
"Authorization": f"Bearer {bearerToken}",
"Content-Type": "application/json" # Set the content type based on your request
}
# Define the payload data for the POST request
payload_data = {
"displayName": "sjdCreateTest11",
"Type": Type,
"definition": {
"format": format,
"parts": [
{
"path": path,
"payload": updatePayload,
"payloadType": payloadType
}
]
}
}
# Make the POST request with Bearer authentication
response = requests.post(updateSjdUrl, json=payload_data, headers=headers)
if response.status_code == 200:
print("Successfully updated SJD.")
else:
print(response.json())
print(response.status_code)
전체 프로세스를 요약하려면 Spark 작업 정의 항목을 만들고 업데이트하려면 Fabric REST API와 OneLake API가 모두 필요합니다. Fabric REST API는 Spark 작업 정의 항목을 만들고 업데이트하는 데 사용되며, OneLake API는 기본 정의 파일 및 기타 lib 파일을 업로드하는 데 사용됩니다. 주 정의 파일 및 기타 lib 파일은 먼저 OneLake에 업로드됩니다. 그런 다음 기본 정의 파일 및 기타 lib 파일의 URL 속성이 Spark 작업 정의 항목에 설정됩니다.