Azure Data Factory – Mapping Data Flows

In my previous post, I discussed the process of connecting to a web service and retrieving updated records to be inserted into a database through the use of Azure Data Factory pipelines. In this post, I’ll continue the process by using Azure Data Factory (ADF) Mapping Data Flows to transform the data and integrate the Data Flow with the pipeline that was created in the previous post.

Introduction to ADF Mapping Data Flows

One of the challenges that data engineers are faced with in today’s “Big Data” world is the need for a scalable process that enables the data transformation workload. The typical tools used by ETL (Extract, Transform, Load) and ELT (Extract, Load, Transform) processes (think SQL Server Integration Services or InfoSphere DataStage ) weren’t really designed to scale out to meet the volume and velocity requirements imposed today. To meet this need, data engineers are using systems such as Apache Spark and Hadoop, which require specialized knowledge and coding skills.

ADF Mapping Data Flows has been designed to provide developers with a fully visual, drag and drop, design experience, without the need to write code. The resulting data flow is integrated into an ADF pipeline, and executed within the context of a Spark cluster (implemented by Azure Databricks) which can be scaled as necessary to support the workload.

As of now, ADF Mapping Data Flows is currently in limited preview, you can request access here: https://aka.ms/dataflowpreview (at some point in the near future, there will not be a need to request access and the feature will move out of preview mode)

Getting Started with ADF Mapping Data Flows

For the purposes of this post, I will continue where the previous post left off. We currently have an Azure Data Factory pipeline that connects to the City of Chicago Crime dataset and returns new records that we do not have in our ChicagoCrime database. (If you haven’t yet built the ChicagoCrime database, read the series of posts here)

The process that we will use looks like this:

  1. Connect to the CSV file that was created by the pipeline
  2. Transform the data (add appropriate data type conversions)
  3. Load the data into the NewCrimes table in the database

This is a very simple example and will not effectively demonstrate the power of Mapping Data Flows, but it will give you a good idea of how to integrate them into your data pipelines.

To build the data flow, open the Azure Portal, browse to your Data Factory instance, and click the Author & Monitor link. Under Factory Resources, click the ellipses (…) next to Data Flows, and add a New Data Flow. This will activate the Mapping Data Flow wizard:

Click the Finish button and name the Data Flow Transform New Reports. Click the Add Source box and step through the informational dialog explaining the layout of the Data Flow Source object.

Name the output stream NewCrimesFile and then click New to the right of the Source dataset drop down:

Select DelimtedText and click Finish to open the New Dataset dialog. Name the Dataset NewCrimes, choose the BlobStore linked service that you previously created and then click Browse and navigate to the NewCrimeReports.csv file that was created earlier. Select First row as header, and Import schema From connection/store.

Click Finish to create the Dataset. At the top of the UI, slide the Data Flow Debug switch to the right to enable debug. This will take some time as a new Azure Databricks cluster will be provisioned and started.

Once the cluster is started (you will know this by the green dot to the right of the debug switch) click the Source Options tab and choose Delete source files in the After completion radio button (This will delete the file after it is processed by the data flow)

Click the Projection tab and view the schema that was inferred from the file. Note that every field is of type String. This is normal since the CSV file is all text. This is where you would perform and data type conversions that would be necessary for other ETL operations. For the purposes of this exercise, we will leave everything as String types since the actual data conversions will be done in the database. (If we wanted to perform some aggregations or other ETL functions, we’d want to convert the data types here to make sure that the correct types were presented downstream in the data flow)

Since this is a relatively small dataset, there is no need to optimize partitioning, which is what you would select under the Optimize tab. Select the Data Preview tab, and then click Fetch latest preview data to load a sample of the data.

Now that the source is properly configured for the data flow, click the + in the lower right of the NewCrimesFile source and select the Select object under Row modifier to add a column select object.

Enter RemoveColumns in the Output stream name dialog. Scroll to the bottom of the list and remove the last 4 columns (location.latitude, location.human-address, location.needs_recoding, and location.longitude) by clicking the trash can icon next to each column. (These columns are not used in the database, but have been included in the JSON results from the web service) The result should look like this:

Select the Data Preview tab and then click Fetch latest preview data to view the resulting dataset:

In the ChicagoCrime database, there is a column named Location, which simply contains the Latitude and Longitude of the report in the format “(lat,lon)”. This column is used by some reporting tools to provide map visuals. Since this column is not present in the result returned by the web service, we will need to use Mapping Data Flows to construct it.

To add the column, click the + to the right of the RemoveColumns object and select Derived column under Schema modifier:

Enter LocationColumn in the Output stream name dialog, and then type location in the Columns dialog. Add the function concat(‘(‘,latitude,’,’,longitude,’)’) in the expression editor (this will build a string matching the correct format), and then click Save and exit.

Select the Data Preview tab and click Fetch latest data preview, then scroll to the right to verify that the column has been correctly added and populated.

The final step for the data flow is to write the transformed dataset to the Crimes_New table in the database.

Click the + next to the LocationColumn activity and add a new Sink (you may have to scroll the dialog down).

Click Finish to dismiss the text next to the Sink activity, and enter NewCrimesTable in the Output stream name dialog, then click New to the right of the Sink dataset drop-down to create a New Dataset. Select Azure SQL Database and select Finish. Name the new dataset Crimes_NewTable and select the ChicagoCrimesDB linked service (that was created previously) and then select the Crimes_New table in the Table drop-down and click Finish.

Select the Settings tab, and select the Truncate table
Table action. This will ensure that the table is empty before loading any data.

Select the Mapping tab, turn off the Auto Mapping function and then ensure that each of the columns are appropriately mapped to the destination columns (The UI does not recognize the column names that have spaces in the destination table)

Now that the data flow is complete (you can verify by selecting the Data Preview tab and then selecting Fetch latest data preview) you can add the data flow to the pipeline that was created earlier. On the left side, under Factory Resources, select the New Crime Reports pipeline. Under Move & Transform, select the Data Flow activity and drag it to the canvas to the right of the Copy Data activity. Select the Transform New Reports data flow.

Select Finish, and connect the output of the GetLatestReports Copy Data activity to the input of the Transform New Reports data flow. The resulting pipeline will look like this:

 

To verify that the pipeline is fully functional, choose the Debug option to start the pipeline execution. Note that this will take several minutes to complete, as the Azure Databricks cluster that executes the data flow will need to be provisioned and started. You will likely need to refresh the status, as the auto refresh stops after 5 minutes.

Once the execution is complete, click on the status (the eyeglass) icon to view the results:

The final step in the update process is to copy the data from the Crimes_New table to the ChicagoCrimes table. This is accomplished via the prc_AddNewReports stored procedure that was created when you built the database.

In the Data Factory UI, drag a Stored Procedure activity from the General section to canvas. Connect the output of the Transform New Reports activity to the input, and name the activity AddNewReports.

Select the SQL Account tab and select the ChicagoCrimeDB linked service.

Select the Stored Procedure tab and select the prc_AddNewReports stored procedure.

Conclusion

Azure Data Factory and ADF Mapping Data Flows combine to create a very powerful ETL/ELT system that allows data engineers to develop complex data pipelines with little or no code development.

This post extended the last post in this series by adding a simple Mapping Data Flows process that transformed the output of a web service to a database table.

Additional posts in this series will explore some of the more advanced features of Mapping Data Flows.

Update to Creating a Real-World Database in Azure

Almost 4 years ago (amazing how time flies!) I wrote a series of posts on creating a large-scale real-world database in Azure that could be used for demos and testing. It’s amazing to me how much mileage I’ve gotten out of this database over the years. It has become my go-to dataset for a number of scenarios, and I’ve used it as the basis to discuss many new Azure-based technologies with customers and partners. One thing that occurred to me though recently is that I hadn’t really updated the blog post to take advantage of new Azure Data Integration tools such as Azure Data Factory and specifically Mapping Data Flows so this post will begin to do just that.

Updating the Database

The first step in ensuring that your database is up to date with the latest available information is to understand how new/updated data is made available. In the case of the demo database discussed here, the City of Chicago posts the crime dataset to the data portal every day. This dataset usually contains all police reports that have been filed and is approximately 7 days old (i.e., the last report is about a week old).  The dataset is available to manually export to CSV (as I detailed in the last post in this series) but it is also available via an http web service that uses the SODA protocol which is very easy to work with. The basic process to update the database will be:

  1. Identify the last date that is currently in the database
  2. Connect to the City of Chicago Data Portal and retrieve records after the last date
  3. Transform the data returned to fit the schema of the database
  4. Insert new records into the database

Fortunately, Azure Data Factory gives us a very good platform to execute each of the steps above in a reusable data pipeline. In this post, we’ll focus in the first two steps above, using a data pipeline in Azure Data Factory to accomplish the tasks. The result will be a pipeline that looks like this:

Building a Data Pipeline

For the purposes of this post, I will assume that you already have an Azure account and have created an Azure Data Factory. If not, you can follow the steps in this article to create a free Azure Account and deploy a Data Factory instance. Once you’ve gone through the steps in the tutorial above and have created a Data Factory instance, choose the Author and Monitor link to load the design canvas. The result will be a blank canvas that looks something like this:

Note: I am using GitHub integration with my Data Factory which is described here: https://azure.microsoft.com/en-us/blog/azure-data-factory-visual-tools-now-supports-github-integration/

Create a new Pipeline named “New Crime Reports” by clicking on the ellipse next to Pipelines and selecting Add New Pipeline and then type the name in the Name dialog. Next, drag a “Lookup” object from the General toolbox on the left side. Name it “GetLatestDate“. The canvas will look like this:

For this step, we will be connecting to the existing ChicagoCrime database (which was described in the earlier set of posts mentioned above) and looking up the latest date/time that I stored in the ChicagoCrimes table. In order to accomplish this, we will need to create a dataset to query the database and return the result. Click on the Settings tab and then select New to the right of the Source dataset drop-down, and then select Azure SQL Database as the source:

When you click Finish to add the dataset, you will be prompted to enter additional information about the dataset. Enter ChicagoCrimeLastDate as the name, and then select the Connection tab. Click New to the right of the Linked service dialog and enter the appropriate information for your Azure Database connection:

Once you have entered the correct information to connect to your database, select Finish, and then in the Table drop-down, select the v_LastDate view. (If you do not have this view in your database, the DDL can be found at the git repository linked at the end of this post)

Note: You don’t need to create this view and can leave the dataset more generic by selecting a different table and then using a Query in the Lookup activity. I chose to use a view here for simplicity

Now that the required information has been entered for the connection and dataset, you can return to the pipeline tab and select Preview data to verify that the correct information is returned.

At this point your canvas will look like this:

If you want to verify the functionality of the Lookup activity, select Debug and note that the pipeline will deploy and execute:

It will run for a few seconds, and then will succeed. To view the output click the Output icon (arrow pointing to the right away from the box) under Actions, and the JSON results from the activity will be displayed:

In order to use the results from the Lookup activity elsewhere in the pipeline, we will need to store them in a pipeline variable (this is not entirely true, you can reference the output of a previous step directly, however I prefer to use variables in my pipelines). To create a variable, select an open area on the canvas (make sure the Lookup activity is NOT selected) and then select the Variables tab. Click New, and then enter the name, type, and default value for the variable. For this pipeline, the name is LastDate, the type is String, and the default value is 01/01/2001 (you can choose any arbitrary value here).

Once the LastDate variable is created, drag a SetVariable activity onto the canvas to the right of the Lookup activity. Drag the output (the green box) of the Lookup activity to the SetVariable activity (this will set the SetVariable activity to execute on success of the lookup). Name the activity PopulateLastDate, then select the Variables tab, select the LastDate variable in the drop-down, and then click in the Value box to add dynamic content. In the expression editor use @{activity(‘GetLatestDate’).output.firstRow.MaxDate} as the expression.

Click Finish to complete the Value dialog. To test the pipeline and verify that you properly set the variable, choose Debug to start the pipeline, and then click the output (under Actions) for the PopulateLastDate activity and verify that the last date in your database table is returned.

Now that you have retrieved the last date and populated the LastDate variable, you can connect to the City of Chicago web service and query the dataset. To do this, drag a Copy Data activity from the Move & Transform section of the toolbox on the left, and enter the name GetLatestReports in the Name field. Drag the green box on the Set Variable activity to the Copy Data activity to connect them. Select the Source tab, and click New to the right of the Source dataset drop-down. Search for, and select, HTTP in the New Dataset dialog and press Finish.

Name the dataset ChicagoCrimesSODA and select the Connection tab. Select New to the right of the Linked service dialog. Enter CityOfChicagoCrimes for the Name, https://data.cityofchicago.org/resource/crimes.json for the URL, and Anonymous for the Authentication.

Select Test connection to verify that the URL was entered correctly and then click Finish to create the Linked Service. In the Relative Url dialog, click to Add Dynamic Content, then enter @concat(‘?$limit=50000&$order=date&$where=date>”’,dataset().LastDate,””) as the function. Pay close attention to the number of single quotes within the concat function. This will construct an appropriate query string (see the API documentation linked above for more information) and will make sure that the dataset is ordered by date, starting from the last date available in the database, and limited to 50,000 rows returned (this is a hard limit imposed by the software that the City of Chicago uses to provide the interface). Click Finish to save the Dynamic Content, and then click Test connection to verify that it is correct.

Verify that the Request Method is GET, and select JSON format for the File format, and Array of objects for the File pattern.

Note: For the purposes of this exercise, you will leave the schema unbound. In most cases, you would want to bind the schema, but since we are using dynamically-constructed query strings to query the web service, we won’t be able to directly import the schema. If you want to bind the schema, you can provide a static relative URL that will enable you to import the schema, and then replace it with dynamic content after the schema is imported. For this exercise, Data Factory will properly handle the schema mapping without being directly bound.

Click on the Parameters tab, and add a New Parameter named LastDate as a String Type with a default value of a valid date (I use 01/01/2001 since that is the first available date in the dataset)

Now that the source connection is configured, click on the New Crime Reports pipeline tab and in the Dataset parameters section, set the LastDate parameter to the value @{variables(‘LastDate’)} (which will populate the parameter with the value of the LastDate variable that is assigned during the Set Variable activity. This parameter will be used in the Relative URL section to complete the query string by adding the appropriate date. You cannot reference the variable directly in the Dataset configuration, which is why we need to use this two-step process)

Now that the Source is configured properly, click on the Sink tab and select New to the right of the Sink dataset drop-down. Name the new Dataset ChicagoCrimesFile.

Before continuing, switch back to the Azure Portal and add a new Storage account, and then create a Blob container in that account called newcrimereports

Switch back to the Data Factory canvas, and in the Connection tab for the ChicagoCrimesFile sink, click New to the right of the Linked service drop-down and enter the appropriate information for the Blob Storage account that you just created.

Click Finish to create the Connection, and then click Browse in the File path, and select the newcrimesfolder container that you created earlier.

In the name section of the File path dialog, enter NewCrimeReports.csv and select Text format in the File format drop down, select Comma in the Column delimiter drop-down and Carriage Return + Line feed in the Row delimiter drop-down.

Select Column names in the first row, and enter a double-quote (“) in the Quote character dialog.

Like the source, you will not bind the schema to the sink for the purposes of this exercise. Data Factory will infer the schema at runtime and will properly create the output file. The web service will return records in an Array of JSON objects, and the sink will copy the JSON to a standard CSV text file. At this point, your Pipeline should look like this:

To validate that everything is configured correctly, select Debug to deploy and execute the pipeline. This should take a couple of minutes to full run. Once complete, check the output of each of the activities. Note that the Copy Data activity has a pair of eyeglasses next to it that will allow you to view specifics such as the number of records read and written to the output file.

You can verify that the output file was created by switching to the Azure portal and browsing to the storage account that you created earlier, clicking on Blobs, and then selecting the newcrimereports container. Inside the container you should see the NewCrimeReports.csv file. You can click the ellipses next to the file and select View/Edit blob to see the contents of the file.

Conclusion

In this post, I discussed how to use an Azure Data Factory pipeline to connect to a web service and return records that are written to a file stored in an Azure storage blob container. In the next post in this series, I will detail how to use Azure Data Factory Mapping Data Flows to transform the data and insert it into the ChicagoCrime database.