In my previous post, I discussed the process of connecting to a web service and retrieving updated records to be inserted into a database through the use of Azure Data Factory pipelines. In this post, I’ll continue the process by using Azure Data Factory (ADF) Mapping Data Flows to transform the data and integrate the Data Flow with the pipeline that was created in the previous post.
Introduction to ADF Mapping Data Flows
One of the challenges that data engineers are faced with in today’s “Big Data” world is the need for a scalable process that enables the data transformation workload. The typical tools used by ETL (Extract, Transform, Load) and ELT (Extract, Load, Transform) processes (think SQL Server Integration Services or InfoSphere DataStage ) weren’t really designed to scale out to meet the volume and velocity requirements imposed today. To meet this need, data engineers are using systems such as Apache Spark and Hadoop, which require specialized knowledge and coding skills.
ADF Mapping Data Flows has been designed to provide developers with a fully visual, drag and drop, design experience, without the need to write code. The resulting data flow is integrated into an ADF pipeline, and executed within the context of a Spark cluster (implemented by Azure Databricks) which can be scaled as necessary to support the workload.
As of now, ADF Mapping Data Flows is currently in limited preview, you can request access here: https://aka.ms/dataflowpreview (at some point in the near future, there will not be a need to request access and the feature will move out of preview mode)
Getting Started with ADF Mapping Data Flows
For the purposes of this post, I will continue where the previous post left off. We currently have an Azure Data Factory pipeline that connects to the City of Chicago Crime dataset and returns new records that we do not have in our ChicagoCrime database. (If you haven’t yet built the ChicagoCrime database, read the series of posts here)
The process that we will use looks like this:
- Connect to the CSV file that was created by the pipeline
- Transform the data (add appropriate data type conversions)
- Load the data into the NewCrimes table in the database
This is a very simple example and will not effectively demonstrate the power of Mapping Data Flows, but it will give you a good idea of how to integrate them into your data pipelines.
To build the data flow, open the Azure Portal, browse to your Data Factory instance, and click the Author & Monitor link. Under Factory Resources, click the ellipses (…) next to Data Flows, and add a New Data Flow. This will activate the Mapping Data Flow wizard:
Click the Finish button and name the Data Flow Transform New Reports. Click the Add Source box and step through the informational dialog explaining the layout of the Data Flow Source object.
Name the output stream NewCrimesFile and then click New to the right of the Source dataset drop down:
Select DelimtedText and click Finish to open the New Dataset dialog. Name the Dataset NewCrimes, choose the BlobStore linked service that you previously created and then click Browse and navigate to the NewCrimeReports.csv file that was created earlier. Select First row as header, and Import schema From connection/store.
Click Finish to create the Dataset. At the top of the UI, slide the Data Flow Debug switch to the right to enable debug. This will take some time as a new Azure Databricks cluster will be provisioned and started.
Once the cluster is started (you will know this by the green dot to the right of the debug switch) click the Source Options tab and choose Delete source files in the After completion radio button (This will delete the file after it is processed by the data flow)
Click the Projection tab and view the schema that was inferred from the file. Note that every field is of type String. This is normal since the CSV file is all text. This is where you would perform and data type conversions that would be necessary for other ETL operations. For the purposes of this exercise, we will leave everything as String types since the actual data conversions will be done in the database. (If we wanted to perform some aggregations or other ETL functions, we’d want to convert the data types here to make sure that the correct types were presented downstream in the data flow)
Since this is a relatively small dataset, there is no need to optimize partitioning, which is what you would select under the Optimize tab. Select the Data Preview tab, and then click Fetch latest preview data to load a sample of the data.
Now that the source is properly configured for the data flow, click the + in the lower right of the NewCrimesFile source and select the Select object under Row modifier to add a column select object.
Enter RemoveColumns in the Output stream name dialog. Scroll to the bottom of the list and remove the last 4 columns (location.latitude, location.human-address, location.needs_recoding, and location.longitude) by clicking the trash can icon next to each column. (These columns are not used in the database, but have been included in the JSON results from the web service) The result should look like this:
Select the Data Preview tab and then click Fetch latest preview data to view the resulting dataset:
In the ChicagoCrime database, there is a column named Location, which simply contains the Latitude and Longitude of the report in the format “(lat,lon)”. This column is used by some reporting tools to provide map visuals. Since this column is not present in the result returned by the web service, we will need to use Mapping Data Flows to construct it.
To add the column, click the + to the right of the RemoveColumns object and select Derived column under Schema modifier:
Enter LocationColumn in the Output stream name dialog, and then type location in the Columns dialog. Add the function concat(‘(‘,latitude,’,’,longitude,’)’) in the expression editor (this will build a string matching the correct format), and then click Save and exit.
Select the Data Preview tab and click Fetch latest data preview, then scroll to the right to verify that the column has been correctly added and populated.
The final step for the data flow is to write the transformed dataset to the Crimes_New table in the database.
Click the + next to the LocationColumn activity and add a new Sink (you may have to scroll the dialog down).
Click Finish to dismiss the text next to the Sink activity, and enter NewCrimesTable in the Output stream name dialog, then click New to the right of the Sink dataset drop-down to create a New Dataset. Select Azure SQL Database and select Finish. Name the new dataset Crimes_NewTable and select the ChicagoCrimesDB linked service (that was created previously) and then select the Crimes_New table in the Table drop-down and click Finish.
Select the Settings tab, and select the Truncate table
Table action. This will ensure that the table is empty before loading any data.
Select the Mapping tab, turn off the Auto Mapping function and then ensure that each of the columns are appropriately mapped to the destination columns (The UI does not recognize the column names that have spaces in the destination table)
Now that the data flow is complete (you can verify by selecting the Data Preview tab and then selecting Fetch latest data preview) you can add the data flow to the pipeline that was created earlier. On the left side, under Factory Resources, select the New Crime Reports pipeline. Under Move & Transform, select the Data Flow activity and drag it to the canvas to the right of the Copy Data activity. Select the Transform New Reports data flow.
Select Finish, and connect the output of the GetLatestReports Copy Data activity to the input of the Transform New Reports data flow. The resulting pipeline will look like this:
To verify that the pipeline is fully functional, choose the Debug option to start the pipeline execution. Note that this will take several minutes to complete, as the Azure Databricks cluster that executes the data flow will need to be provisioned and started. You will likely need to refresh the status, as the auto refresh stops after 5 minutes.
Once the execution is complete, click on the status (the eyeglass) icon to view the results:
The final step in the update process is to copy the data from the Crimes_New table to the ChicagoCrimes table. This is accomplished via the prc_AddNewReports stored procedure that was created when you built the database.
In the Data Factory UI, drag a Stored Procedure activity from the General section to canvas. Connect the output of the Transform New Reports activity to the input, and name the activity AddNewReports.
Select the SQL Account tab and select the ChicagoCrimeDB linked service.
Select the Stored Procedure tab and select the prc_AddNewReports stored procedure.
Conclusion
Azure Data Factory and ADF Mapping Data Flows combine to create a very powerful ETL/ELT system that allows data engineers to develop complex data pipelines with little or no code development.
This post extended the last post in this series by adding a simple Mapping Data Flows process that transformed the output of a web service to a database table.
Additional posts in this series will explore some of the more advanced features of Mapping Data Flows.