ForEach-tevékenység az Azure Data Factoryben és az Azure Synapse Analyticsben
A következőkre vonatkozik: Azure Data Factory
Azure Synapse Analytics
Tipp.
Próbálja ki a Data Factoryt a Microsoft Fabricben, amely egy teljes körű elemzési megoldás a nagyvállalatok számára. A Microsoft Fabric az adattovábbítástól az adatelemzésig, a valós idejű elemzésig, az üzleti intelligenciáig és a jelentéskészítésig mindent lefed. Ismerje meg, hogyan indíthat új próbaverziót ingyenesen!
A ForEach-tevékenység ismétlődő vezérlési folyamatot határoz meg egy Azure Data Factory- vagy Synapse-folyamatban. Ez a tevékenység egy gyűjtemény megismétlésére, valamint egy megadott ciklustevékenység végrehajtására szolgál. E tevékenység ciklusos megvalósítása hasonló a Foreach ciklusos szerkezetéhez a programozási nyelvek esetében.
ForEach-tevékenység létrehozása felhasználói felülettel
Ha ForEach-tevékenységet szeretne használni egy folyamatban, hajtsa végre a következő lépéseket:
A ForEach-tevékenység bemeneteként bármilyen tömbtípus-változót vagy más tevékenység kimenetét használhatja. Tömbváltozó létrehozásához válassza ki a folyamatvászon hátterét, majd a Változók lapon adjon hozzá egy tömbtípus változót az alább látható módon.
Keresse meg a ForEachot a folyamattevékenységek panelen, és húzzon egy ForEach-tevékenységet a folyamatvászonra.
Válassza ki az új ForEach-tevékenységet a vásznon, ha még nincs kijelölve, és a Beállítások fület a részletek szerkesztéséhez.
Jelölje ki az Elemek mezőt, majd a Dinamikus tartalom hozzáadása hivatkozásra kattintva nyissa meg a dinamikus tartalomszerkesztő panelt.
Válassza ki a dinamikus tartalomszerkesztőben szűrni kívánt bemeneti tömböt. Ebben a példában az első lépésben létrehozott változót választjuk ki.
Válassza a ForEach-tevékenység Tevékenységszerkesztőjét egy vagy több végrehajtandó tevékenység hozzáadásához a bemeneti elemek tömb minden eleméhez.
A ForEach-tevékenységen belül létrehozott tevékenységekben hivatkozhat arra az aktuális elemre, amelyet a ForEach-tevékenység iterál az Elemek listából. Az aktuális elemre bárhol hivatkozhat, ahol egy dinamikus kifejezéssel megadhat egy tulajdonságértéket. A dinamikus tartalomszerkesztőben válassza a ForEach iterátort az aktuális elem visszaadásához.
Syntax
A tulajdonságokat a cikk későbbi részében ismertetjük. Az Elemek tulajdonság a gyűjtemény, és a gyűjtemény minden elemére az @item()
alábbi szintaxisban látható módon hivatkozik:
{
"name":"MyForEachActivityName",
"type":"ForEach",
"typeProperties":{
"isSequential":"true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
"type": "Expression"
},
"activities":[
{
"name":"MyCopyActivity",
"type":"Copy",
"typeProperties":{
...
},
"inputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@item()"
}
}
]
}
]
}
}
Típustulajdonságok
Tulajdonság | Leírás | Megengedett értékek | Szükséges |
---|---|---|---|
név | Az egyes tevékenységek neve. | Sztring | Igen |
típus | ForEach értékre kell állítani | Sztring | Igen |
isSequential | Megadja, hogy a hurkot egymás után vagy párhuzamosan kell-e végrehajtani. Egyszerre legfeljebb 50 ciklusos iteráció hajtható végre. Ha például egy ForEach-tevékenység 10 különböző forrás- és fogadóadatkészlettel rendelkező másolási tevékenységen iterál, és az isSequential értéke Hamis, a rendszer egyszerre hajtja végre az összes másolatot. Az alapértelmezett érték Hamis. Ha az "isSequential" értéke Hamis, győződjön meg arról, hogy megfelelő konfiguráció van több végrehajtható fájl futtatásához. Ellenkező esetben ezt a tulajdonságot óvatosan kell használni az írási ütközések elkerülése érdekében. További információ: Párhuzamos végrehajtás szakasz. |
Logikai | Szám Az alapértelmezett érték Hamis. |
batchCount | A párhuzamos végrehajtások számának szabályozásához használandó kötegszám (ha az isSequential értéke hamis). Ez a felső egyidejűségi korlát, de az egyes tevékenységek nem mindig ezen a számon lesznek végrehajtva | Egész szám (legfeljebb 50) | Szám Az alapértelmezett érték 20. |
Elemek | Olyan kifejezés, amely egy JSON-tömböt ad vissza, hogy át lehessen iterálni. | Kifejezés (amely JSON-tömböt ad vissza) | Igen |
Tevékenységek | A végrehajtandó tevékenységek. | Tevékenységek listája | Igen |
Párhuzamos végrehajtás
Ha az isSequential értéke hamis, a tevékenység párhuzamosan, legfeljebb 50 egyidejű iterációval halad. Ezt a beállítást körültekintően kell használni. Ha az egyidejű iterációk ugyanarra a mappára, de különböző fájlokra írnak, ez a megközelítés rendben van. Ha az egyidejű iterációk ugyanabba a fájlba írnak egyidejűleg, ez a megközelítés valószínűleg hibát okoz.
Iterációs kifejezés nyelve
A ForEach-tevékenységben adjon meg egy tömböt, amely át lesz írva a tulajdonságelemekhez." A ForEach-tevékenység egyetlen enumerálásán keresztüli iterálására használható@item()
. Ha például az elemek tömbök: [1, 2, 3], az első iterációban 1, @item()
a második iterációban 2, a harmadik iterációban pedig 3 értéket ad vissza. A hasonló kifejezéssel tízszer is @range(0,10)
iterálhat, kezdve a 0 és a 9 végződés között.
Iterálás egyetlen tevékenységen keresztül
Forgatókönyv: Másolás ugyanabból a forrásfájlból az Azure Blobban több célfájlba az Azure Blobban.
Folyamatdefiníció
{
"name": "<MyForEachPipeline>",
"properties": {
"activities": [
{
"name": "<MyForEachActivity>",
"type": "ForEach",
"typeProperties": {
"isSequential": "true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPath",
"type": "Expression"
},
"activities": [
{
"name": "MyCopyActivity",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": "false"
},
"sink": {
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
}
},
"inputs": [
{
"referenceName": "<MyDataset>",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs": [
{
"referenceName": "MyDataset",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@item()"
}
}
]
}
]
}
}
],
"parameters": {
"mySourceDatasetFolderPath": {
"type": "String"
},
"mySinkDatasetFolderPath": {
"type": "String"
}
}
}
}
Blob-adatkészlet definíciója
{
"name":"<MyDataset>",
"properties":{
"type":"AzureBlob",
"typeProperties":{
"folderPath":{
"value":"@dataset().MyFolderPath",
"type":"Expression"
}
},
"linkedServiceName":{
"referenceName":"StorageLinkedService",
"type":"LinkedServiceReference"
},
"parameters":{
"MyFolderPath":{
"type":"String"
}
}
}
}
Paraméterértékek futtatása
{
"mySourceDatasetFolderPath": "input/",
"mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}
Több tevékenység iterálása
Egy ForEach-tevékenységben több tevékenység (például másolási és webes tevékenységek) iterálása is lehetséges. Ebben a forgatókönyvben azt javasoljuk, hogy több tevékenységet külön folyamatba bontsa ki. Ezután a folyamat ExecutePipeline tevékenységével ForEach-tevékenységgel meghívhatja a különálló folyamatot több tevékenységgel.
Syntax
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "<MyForEachMultipleActivities>"
"typeProperties": {
"isSequential": true,
"items": {
...
},
"activities": [
{
"type": "ExecutePipeline",
"name": "<MyInnerPipeline>"
"typeProperties": {
"pipeline": {
"referenceName": "<copyHttpPipeline>",
"type": "PipelineReference"
},
"parameters": {
...
},
"waitOnCompletion": true
}
}
]
}
}
],
"parameters": {
...
}
}
}
Példa
Forgatókönyv: Egy ForEach-tevékenységen belüli InnerPipeline iterálása folyamattevékenység végrehajtásával. A belső folyamat sémadefiníciókat paraméteresen tartalmazó másolatai.
Főfolyamat-definíció
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "MyForEachActivity",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.inputtables",
"type": "Expression"
},
"activities": [
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "InnerCopyPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceTableName": {
"value": "@item().SourceTable",
"type": "Expression"
},
"sourceTableStructure": {
"value": "@item().SourceTableStructure",
"type": "Expression"
},
"sinkTableName": {
"value": "@item().DestTable",
"type": "Expression"
},
"sinkTableStructure": {
"value": "@item().DestTableStructure",
"type": "Expression"
}
},
"waitOnCompletion": true
},
"name": "ExecuteCopyPipeline"
}
]
}
}
],
"parameters": {
"inputtables": {
"type": "Array"
}
}
}
}
Belső folyamat definíciója
{
"name": "InnerCopyPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
}
},
"sink": {
"type": "SqlSink"
}
},
"name": "CopyActivity",
"inputs": [
{
"referenceName": "sqlSourceDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sourceTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sourceTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "sqlSinkDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sinkTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sinkTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
]
}
],
"parameters": {
"sourceTableName": {
"type": "String"
},
"sourceTableStructure": {
"type": "String"
},
"sinkTableName": {
"type": "String"
},
"sinkTableStructure": {
"type": "String"
}
}
}
}
Forrásadatkészlet definíciója
{
"name": "sqlSourceDataset",
"properties": {
"type": "SqlServerTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "sqlserverLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Fogadó adatkészlet definíciója
{
"name": "sqlSinkDataSet",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "azureSqlLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Főfolyamat-paraméterek
{
"inputtables": [
{
"SourceTable": "department",
"SourceTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
],
"DestTable": "department2",
"DestTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
]
}
]
}
Kimenetek összesítése
A foreach-tevékenység kimeneteinek összesítéséhez használja a Változók és a Hozzáfűző változók tevékenységet.
Először deklarál egy változót array
a folyamatban. Ezután meghívja a Változó hozzáfűzése tevékenységet az egyes foreach-hurkokon belül. Ezt követően lekérheti az aggregációt a tömbből.
Korlátozások és kerülő megoldások
Íme néhány korlátozás a ForEach-tevékenységre és a javasolt kerülő megoldásokra.
Korlátozás | Áthidaló megoldás |
---|---|
Egy ForEach-hurkot nem ágyazhat be egy másik ForEach-hurokba (vagy egy Until hurokba). | Tervezzen meg egy kétszintű folyamatot, amelyben a külső ForEach hurokkal rendelkező külső folyamat egy belső folyamat fölé viszi a beágyazott hurkot. |
A ForEach-tevékenység legfeljebb batchCount 50-et tartalmazhat a párhuzamos feldolgozáshoz, és legfeljebb 100 000 elemet. |
Tervezzen meg egy kétszintű folyamatot, amelyben a ForEach-tevékenységgel rendelkező külső folyamat egy belső folyamaton keresztül iterál. |
A SetVariable nem használható párhuzamosan futó ForEach-tevékenységen belül, mivel a változók globálisak a teljes folyamatra vonatkozóan, és nem tartoznak a ForEach vagy más tevékenységek hatókörébe. | Fontolja meg a szekvenciális ForEach használatát, vagy használja a Folyamat végrehajtása a ForEachban (a gyermekfolyamatban kezelt változó/paraméter). |
Kapcsolódó tartalom
Lásd az egyéb támogatott vezérlési folyamatokat: