Using Logic Apps in a Data Factory Execution Framework – Part 1

Data Factory allows parameterization in many parts of our solutions. We can parameterize things such as connection information in linked services as well as blob storage containers and files in datasets. We can also parameterize certain properties in activities. For instance, we can write an expression to determine the stored procedure to be executed in a Stored Procedure Activity or the filename in the sink (destination) of a Copy Activity.

But we cannot parameterize the invoked pipeline in an Execute Pipeline Activity. This means we need to find workarounds in order to have a metadata-driven execution framework. What I mean by metadata-driven execution framework is that data is stored in a data store (in my case, a SQL Database) and used to determine what pipelines and activities get executed. With this type of framework, if I don’t want a specific pipeline to execute, I would just update my data in the datastore rather than delete the pipeline execution from the parent pipeline. We’ve been doing this type of development in SSIS for years, and Biml has played a big part in that. But SSIS allows us to parameterize the Execute Package Task.

Since we can’t implement this parameterized execution of pipelines natively, we need to look for something that Data Factory can call to accomplish the task. Paul Andrew has a nice framework that uses Azure Functions. I was working on a Data Factory solution for a client who doesn’t have C# or PowerShell developers on hand to help with the ELT process, so we needed to explore a low-code solution.

While there is no Logic App activity in Data Factory we can use a Web Activity to call the Logic App. I might have a pipeline that looks something like what is pictured below.

Data Factory pipeline that uses a Stored Procedure to capture the start of the pipeline, a Lookup to get the list of files to be copied, a ForEach loop to copy each of the files, and a Stored Procedure to mark the end of the pipeline.
Staging pipeline that copies files from Azure Data Lake Storage to Azure SQL Database

Within the ForEach loop is a single Web Activity.

Data Factory Pipeline Web Activity calling a Logic App. An expression populates the url, and a Get m
Web Activity that calls a Logic App

I used some variables and parameters in an expression to populate the URL so it would be dynamic. I used a GET method in the call.

My initial version of my Logic App is shown below.

Logic App workflow with an HTTP request trigger. 1) Create a pipeline run. 2) Initialize Variable. 3) Until loop. 4) HTTP Response.
Logic App that executes a Data Factory pipeline and waits for it to complete before returning a response

I added path parameters in my HTTP request trigger to allow me to capture the information I need to execute the appropriate pipeline. For me this included the pipeline name, a data source ID, and a country. Your parameters would vary according to your requirements.

HTTP Request trigger in a logic app with 3 path parameters: pipeline, country, Data Source ID
HTTP Request trigger in my Logic App

Logic apps has an action called “Create a pipeline run”. You tell it which data factory, which pipeline, and any parameter values needed for the pipeline execution.

Create a pipeline run action in a logic app. Data Factory Pipeline Name is populated by a parameter. The pipeline parameters are populated by a mix of static JSON and parameters.
Create a pipeline run action in my Logic App

At this point in the workflow, our pipeline would be executing. But now we need to know when it has finished. That’s what the Initialize Variable and Until Loop actions are handling. I created a string variable called Pipeline Status and set the default value to “InProgress”. My Until loop action checks my pipeline execution status. If it’s still running, it waits 5 seconds, gets the new status, and assigns that status to the variable. This repeats until the pipeline execution is no longer in progress.

Here’s the expression I used to check whether the pipeline execution is still running:

@and(not(equals(variables('PipelineStatus'), 'InProgress')),not(equals(variables('PipelineStatus'), 'Queued')))
Until loop in a logic app. Checks status of pipeline run. 1) Delay action. 2) Get a pipeline run. 3) Set variable.
Until loop in my Logic App to dynamically execute a Data Factory pipeline

Once the pipeline execution is complete, an HTTP response with the pipeline status is sent back to the caller.

HTTP Response action with status code 200 and pipeline status value in the body.
HTTP Response action in my Logic App

This is all great until you find out that Logic Apps will experience an HTTP timeout if the request takes more than 2 minutes.

Do you have any pipelines that take longer than two minutes to execute? If so, you need to change your solution to handle this. Note that you would have the same issue with Azure Functions, although it would give you 230 seconds instead of 120 seconds before it timed out. We need to switch to an asynchronous call to support long running pipelines. Paul has already done this in his framework using Azure Functions. In Logic Apps, we can change our response to an asynchronous response and then implement a polling pattern to check the status. We could alternatively implement a webhook action. I’ll write about updating the solution to handle long running pipelines in a future post.

Share

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Trust DCAC with your data

Your data systems may be treading water today, but are they prepared for the next phase of your business growth?