在azuredatafactory管道中,我尝试让两个CopyActivities按顺序运行,即第一个将数据从blob复制到SQL表,然后第二个将SQL表复制到另一个数据库。在
我尝试了下面的代码,但是结果管道没有依赖于活动(从工作流图和JSON签入azureui)。当我运行管道时,收到如下错误消息: “ErrorResponseException:模板验证失败:'行'1'和列'22521'处模板操作'my second activity nameScope'的'runAfter'属性包含不存在的操作。巴拉巴巴拉….“
一旦我在azureui中手动添加依赖项,我就可以成功地运行管道。在
如果有人能给我指出示例代码(Python/C#/Powershell)或文档,我将不胜感激。 我的Python代码:
def createDataFactoryRectStage(self,
aPipelineName, aActivityStageName, aActivityAcquireName,
aRectFileName, aRectDSName,
aStageTableName, aStageDSName,
aAcquireTableName, aAcquireDSName):
adf_client = self.__getAdfClient()
ds_blob = AzureBlobdataset(linked_service_name = LinkedServiceReference(AZURE_DATAFACTORY_LS_BLOB_RECT),
folder_path=PRJ_AZURE_BLOB_PATH_RECT,
file_name = aRectFileName,
format = {"type": "TextFormat",
"columnDelimiter": ",",
"rowDelimiter": "",
"nullValue": "\\N",
"treatEmptyAsNull": "true",
"firstRowAsHeader": "true",
"quoteChar": "\"",})
adf_client.datasets.create_or_update(AZURE_RESOURCE_GROUP, AZURE_DATAFACTORY, aRectDSName, ds_blob)
ds_stage= AzureSqlTableDataset(linked_service_name = LinkedServiceReference(AZURE_DATAFACTORY_LS_SQLDB_STAGE),
table_name='[dbo].[' + aStageTableName + ']')
adf_client.datasets.create_or_update(AZURE_RESOURCE_GROUP, AZURE_DATAFACTORY, aStageDSName, ds_stage)
ca_blob_to_stage = CopyActivity(aActivityStageName,
inputs=[DatasetReference(aRectDSName)],
outputs=[DatasetReference(aStageDSName)],
source= BlobSource(),
sink= SqlSink(write_batch_size = AZURE_SQL_WRITE_BATCH_SIZE))
ds_acquire= AzureSqlTableDataset(linked_service_name = LinkedServiceReference(AZURE_DATAFACTORY_LS_SQLDB_ACQUIRE),
table_name='[dbo].[' + aAcquireTableName + ']')
adf_client.datasets.create_or_update(AZURE_RESOURCE_GROUP, AZURE_DATAFACTORY, aAcquireDSName, ds_acquire)
dep = ActivityDependency(ca_blob_to_stage, dependency_conditions =[DependencyCondition('Succeeded')])
ca_stage_to_acquire = CopyActivity(aActivityAcquireName,
inputs=[DatasetReference(aStageDSName)],
outputs=[DatasetReference(aAcquireDSName)],
source= SqlSource(),
sink= SqlSink(write_batch_size = AZURE_SQL_WRITE_BATCH_SIZE),
depends_on=[dep])
p_obj = PipelineResource(activities=[ca_blob_to_stage, ca_stage_to_acquire], parameters={})
return adf_client.pipelines.create_or_update(AZURE_RESOURCE_GROUP, AZURE_DATAFACTORY, aPipelineName, p_obj)
下面是
C#
中的一个示例,它基本上在管道中的序列中执行Chaining activities
和链接活动。还记得在ADFV1中,我们必须将一个活动的输出配置为另一个活动的输入,以将它们链接起来并使它们相互依赖。在管道代码段(请注意dependsOn属性,该属性确保第二个活动在第一个活动成功运行后运行)-
请查看ADFV2教程here,以获得全面的解释和更多场景。在
相关问题 更多 >
编程相关推荐