Azure Data Factory ve Azure Synapse Analytics'te ForEach etkinliği
UYGULANANLAR: Azure Data Factory
Azure Synapse Analytics
İpucu
Kuruluşlar için hepsi bir arada analiz çözümü olan Microsoft Fabric'te Data Factory'yi deneyin. Microsoft Fabric , veri taşımadan veri bilimine, gerçek zamanlı analize, iş zekasına ve raporlamaya kadar her şeyi kapsar. Yeni bir deneme sürümünü ücretsiz olarak başlatmayı öğrenin!
ForEach Etkinliği, Azure Data Factory veya Synapse işlem hattında yinelenen bir denetim akışı tanımlar. Bu etkinlik bir koleksiyon üzerinde yinelemek için kullanılır ve bir döngüde belirtilen etkinlikleri yürütür. Bu etkinliğin döngü uygulaması, programlama dillerindeki Foreach döngü yapısına benzer.
Kullanıcı arabirimiyle ForEach etkinliği oluşturma
İşlem hattında ForEach etkinliği kullanmak için aşağıdaki adımları tamamlayın:
ForEach etkinliğinizin girişi olarak herhangi bir dizi türü değişkenini veya diğer etkinliklerden gelen çıkışları kullanabilirsiniz. Dizi değişkeni oluşturmak için işlem hattı tuvalinin arka planını seçin ve ardından Değişkenler sekmesini seçerek aşağıda gösterildiği gibi bir dizi türü değişkeni ekleyin.
İşlem hattı Etkinlikleri bölmesinde ForEach'i arayın ve bir ForEach etkinliğini işlem hattı tuvaline sürükleyin.
Tuvalde henüz seçili değilse yeni ForEach etkinliğini ve ayrıntılarını düzenlemek için Ayarlar sekmesini seçin.
Öğeler alanını seçin ve dinamik içerik düzenleyicisi bölmesini açmak için Dinamik içerik ekle bağlantısını seçin.
Dinamik içerik düzenleyicisinde filtrelenecek giriş dizinizi seçin. Bu örnekte, ilk adımda oluşturulan değişkeni seçiyoruz.
Giriş Öğeleri dizisindeki her öğe için yürütülecek bir veya daha fazla etkinlik eklemek için ForEach etkinliğindeki Etkinlikler düzenleyicisini seçin.
ForEach etkinliği içinde oluşturduğunuz tüm etkinliklerde, Öğeler listesinden ForEach etkinliğinin yinelemede olduğu geçerli öğeye başvurabilirsiniz. Bir özellik değeri belirtmek için dinamik ifadeyi kullanabileceğiniz her yerde geçerli öğeye başvurabilirsiniz. Geçerli öğeyi döndürmek için dinamik içerik düzenleyicisinde ForEach yineleyicisini seçin.
Sözdizimi
Özellikler bu makalenin devamında açıklanmıştır. items özelliği koleksiyondur ve koleksiyondaki her öğeye aşağıdaki söz diziminde gösterildiği gibi kullanılarak @item()
başvurulur:
{
"name":"MyForEachActivityName",
"type":"ForEach",
"typeProperties":{
"isSequential":"true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
"type": "Expression"
},
"activities":[
{
"name":"MyCopyActivity",
"type":"Copy",
"typeProperties":{
...
},
"inputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@item()"
}
}
]
}
]
}
}
Tür özellikleri
Özellik | Açıklama | İzin verilen değerler | Zorunlu |
---|---|---|---|
Adı | Her etkinlik için öğesinin adı. | String | Yes |
Tür | ForEach olarak ayarlanmalıdır | String | Yes |
isSequential | Döngünün sırayla mı yoksa paralel olarak mı yürütülmesi gerektiğini belirtir. Paralel olarak aynı anda en fazla 50 döngü yinelemesi yürütülebilir). Örneğin, 10 farklı kaynak ve isSequential değeri False olan havuz veri kümelerine sahip bir kopyalama etkinliği üzerinde yinelenen bir ForEach etkinliğiniz varsa, tüm kopyalar aynı anda yürütülür. Varsayılan değer False'tur. "isSequential" False olarak ayarlandıysa, birden çok yürütülebilir dosyayı çalıştırmak için doğru bir yapılandırma olduğundan emin olun. Aksi takdirde, yazma çakışmalarının oluşmasını önlemek için bu özellik dikkatli kullanılmalıdır. Daha fazla bilgi için bkz . Paralel yürütme bölümü. |
Boolean | Hayır Varsayılan değer False'tur. |
batchCount | Paralel yürütme sayısını denetlemek için kullanılacak toplu iş sayısı (isSequential false olarak ayarlandığında). Bu üst eşzamanlılık sınırıdır, ancak her etkinlik için her zaman bu sayıda yürütülmeyecek | Tamsayı (en fazla 50) | Hayır Varsayılan değer 20'dir. |
Items | Yinelenecek bir JSON Dizisi döndüren ifade. | İfade (JSON Dizisi döndüren) | Yes |
Aktiviteler | Yürütülecek etkinlikler. | Etkinlikler Listesi | Yes |
Paralel yürütme
isSequential false olarak ayarlanırsa, etkinlik en fazla 50 eşzamanlı yinelemeyle paralel olarak yinelenir. Bu ayar dikkatli kullanılmalıdır. Eşzamanlı yinelemeler aynı klasöre ancak farklı dosyalara yazıyorsa, bu yaklaşım uygundur. Eşzamanlı yinelemeler aynı dosyaya eşzamanlı olarak yazıyorsa, bu yaklaşım büyük olasılıkla bir hataya neden olur.
Yineleme ifade dili
ForEach etkinliğinde, özellik öğeleri için yinelenecek bir dizi sağlayın." ForEach etkinliğindeki tek bir numaralandırmayı yinelemek için kullanın @item()
. Örneğin, öğeler bir diziyse: [1, 2, 3], ilk yinelemede 1, @item()
ikinci yinelemede 2 ve üçüncü yinelemede 3 döndürür. 0 ile başlayan ve 9 ile biten on kez yinelemek için like ifadesini de kullanabilirsiniz @range(0,10)
.
Tek bir etkinlik üzerinde yineleme
Senaryo: Azure Blob'daki aynı kaynak dosyadan Azure Blob'daki birden çok hedef dosyaya kopyalayın.
İşlem hattı tanımı
{
"name": "<MyForEachPipeline>",
"properties": {
"activities": [
{
"name": "<MyForEachActivity>",
"type": "ForEach",
"typeProperties": {
"isSequential": "true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPath",
"type": "Expression"
},
"activities": [
{
"name": "MyCopyActivity",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": "false"
},
"sink": {
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
}
},
"inputs": [
{
"referenceName": "<MyDataset>",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs": [
{
"referenceName": "MyDataset",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@item()"
}
}
]
}
]
}
}
],
"parameters": {
"mySourceDatasetFolderPath": {
"type": "String"
},
"mySinkDatasetFolderPath": {
"type": "String"
}
}
}
}
Blob veri kümesi tanımı
{
"name":"<MyDataset>",
"properties":{
"type":"AzureBlob",
"typeProperties":{
"folderPath":{
"value":"@dataset().MyFolderPath",
"type":"Expression"
}
},
"linkedServiceName":{
"referenceName":"StorageLinkedService",
"type":"LinkedServiceReference"
},
"parameters":{
"MyFolderPath":{
"type":"String"
}
}
}
}
Parametre değerlerini çalıştırma
{
"mySourceDatasetFolderPath": "input/",
"mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}
Birden çok etkinlikte yineleme
Bir ForEach etkinliğindeki birden çok etkinlik (örneğin, kopyalama ve web etkinlikleri) üzerinde yineleme yapmak mümkündür. Bu senaryoda, birden çok etkinliği ayrı bir işlem hattında soyutlamanızı öneririz. Ardından, birden çok etkinliğe sahip ayrı işlem hattını çağırmak için ForEach etkinliğiyle işlem hattındaki ExecutePipeline etkinliğini kullanabilirsiniz.
Sözdizimi
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "<MyForEachMultipleActivities>"
"typeProperties": {
"isSequential": true,
"items": {
...
},
"activities": [
{
"type": "ExecutePipeline",
"name": "<MyInnerPipeline>"
"typeProperties": {
"pipeline": {
"referenceName": "<copyHttpPipeline>",
"type": "PipelineReference"
},
"parameters": {
...
},
"waitOnCompletion": true
}
}
]
}
}
],
"parameters": {
...
}
}
}
Örnek
Senaryo: İşlem Hattı Yürütme etkinliğiyle bir ForEach etkinliği içindeki bir InnerPipeline üzerinde yineleme yapın. İç işlem hattı, şema tanımları parametreli olarak kopyalanır.
Ana İşlem Hattı tanımı
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "MyForEachActivity",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.inputtables",
"type": "Expression"
},
"activities": [
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "InnerCopyPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceTableName": {
"value": "@item().SourceTable",
"type": "Expression"
},
"sourceTableStructure": {
"value": "@item().SourceTableStructure",
"type": "Expression"
},
"sinkTableName": {
"value": "@item().DestTable",
"type": "Expression"
},
"sinkTableStructure": {
"value": "@item().DestTableStructure",
"type": "Expression"
}
},
"waitOnCompletion": true
},
"name": "ExecuteCopyPipeline"
}
]
}
}
],
"parameters": {
"inputtables": {
"type": "Array"
}
}
}
}
İç işlem hattı tanımı
{
"name": "InnerCopyPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
}
},
"sink": {
"type": "SqlSink"
}
},
"name": "CopyActivity",
"inputs": [
{
"referenceName": "sqlSourceDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sourceTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sourceTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "sqlSinkDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sinkTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sinkTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
]
}
],
"parameters": {
"sourceTableName": {
"type": "String"
},
"sourceTableStructure": {
"type": "String"
},
"sinkTableName": {
"type": "String"
},
"sinkTableStructure": {
"type": "String"
}
}
}
}
Kaynak veri kümesi tanımı
{
"name": "sqlSourceDataset",
"properties": {
"type": "SqlServerTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "sqlserverLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Havuz veri kümesi tanımı
{
"name": "sqlSinkDataSet",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "azureSqlLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Ana işlem hattı parametreleri
{
"inputtables": [
{
"SourceTable": "department",
"SourceTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
],
"DestTable": "department2",
"DestTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
]
}
]
}
Çıkışları toplama
Foreach etkinliğinin çıkışlarını toplamak için değişkenleri ve Değişken Ekle etkinliğini kullanabilirsiniz.
İlk olarak, işlem hattında bir array
değişken bildirin. Ardından, her foreach döngüsünün içinde Değişken Ekle etkinliğini çağırabilirsiniz. Daha sonra, toplamayı dizinizden alabilirsiniz.
Sınırlamalar ve geçici çözümler
ForEach etkinliğinin bazı sınırlamaları ve önerilen geçici çözümler aşağıdadır.
Sınırlama | Geçici çözüm |
---|---|
ForEach döngüsünü başka bir ForEach döngüsüne (veya Until döngüsüne) iç içe yerleştiremezsiniz. | Dış ForEach döngüsüne sahip dış işlem hattının iç içe döngüye sahip bir iç işlem hattı üzerinde yinelendiği iki düzeyli bir işlem hattı tasarlayın. |
ForEach etkinliğinin paralel işleme için en fazla batchCount 50, en fazla 100.000 öğesi vardır. |
ForEach etkinliğiyle dış işlem hattının bir iç işlem hattı üzerinde yinelendiği iki düzeyli bir işlem hattı tasarlayın. |
SetVariable, paralel olarak çalışan bir ForEach etkinliğinin içinde kullanılamaz çünkü değişkenler tüm işlem hattının genelidir, bunların kapsamı forEach veya başka bir etkinlik olarak belirlenemez. | Sıralı ForEach kullanmayı göz önünde bulundurun veya ForEach içinde İşlem Hattını Yürüt 'i kullanın (Alt İşlem Hattında işlenen Değişken/Parametre). |
İlgili içerik
Desteklenen diğer denetim akışı etkinliklerine bakın: