Data Factory allows parameterization in many parts of our solutions. We can parameterize things such as connection information in linked services as well as blob storage containers and files in datasets. We can also parameterize certain properties in activities. For instance, we can write an expression to determine the stored procedure to be executed in a Stored Procedure Activity or the filename in the sink (destination) of a Copy Activity.
But we cannot parameterize the invoked pipeline in an Execute Pipeline Activity. This means we need to find workarounds in order to have a metadata-driven execution framework. What I mean by metadata-driven execution framework is that data is stored in a data store (in my case, a SQL Database) and used to determine what pipelines and activities get executed. With this type of framework, if I don’t want a specific pipeline to execute, I would just update my data in the datastore rather than delete the pipeline execution from the parent pipeline. We’ve been doing this type of development in SSIS for years, and Biml has played a big part in that. But SSIS allows us to parameterize the Execute Package Task.
Since we can’t implement this parameterized execution of pipelines natively, we need to look for something that Data Factory can call to accomplish the task. Paul Andrew has a nice framework that uses Azure Functions. I was working on a Data Factory solution for a client who doesn’t have C# or PowerShell developers on hand to help with the ELT process, so we needed to explore a low-code solution.
While there is no Logic App activity in Data Factory we can use a Web Activity to call the Logic App. I might have a pipeline that looks something like what is pictured below.
Within the ForEach loop is a single Web Activity.
I used some variables and parameters in an expression to populate the URL so it would be dynamic. I used a GET method in the call.
My initial version of my Logic App is shown below.
I added path parameters in my HTTP request trigger to allow me to capture the information I need to execute the appropriate pipeline. For me this included the pipeline name, a data source ID, and a country. Your parameters would vary according to your requirements.
Logic apps has an action called “Create a pipeline run”. You tell it which data factory, which pipeline, and any parameter values needed for the pipeline execution.
At this point in the workflow, our pipeline would be executing. But now we need to know when it has finished. That’s what the Initialize Variable and Until Loop actions are handling. I created a string variable called Pipeline Status and set the default value to “InProgress”. My Until loop action checks my pipeline execution status. If it’s still running, it waits 5 seconds, gets the new status, and assigns that status to the variable. This repeats until the pipeline execution is no longer in progress.
Here’s the expression I used to check whether the pipeline execution is still running:
@and(not(equals(variables('PipelineStatus'), 'InProgress')),not(equals(variables('PipelineStatus'), 'Queued')))
Once the pipeline execution is complete, an HTTP response with the pipeline status is sent back to the caller.
This is all great until you find out that Logic Apps will experience an HTTP timeout if the request takes more than 2 minutes.
Do you have any pipelines that take longer than two minutes to execute? If so, you need to change your solution to handle this. Note that you would have the same issue with Azure Functions, although it would give you 230 seconds instead of 120 seconds before it timed out. We need to switch to an asynchronous call to support long running pipelines. Paul has already done this in his framework using Azure Functions. In Logic Apps, we can change our response to an asynchronous response and then implement a polling pattern to check the status. We could alternatively implement a webhook action. I’ll write about updating the solution to handle long running pipelines in a future post.