Incrementally copy data using Change Data Capture - Azure Data Factory (2023)

  • Article
  • 12 minutes to read

APPLIES TO: Incrementally copy data using Change Data Capture - Azure Data Factory (1)Azure Data Factory Incrementally copy data using Change Data Capture - Azure Data Factory (2)Azure Synapse Analytics

In this tutorial, you create an Azure data factory with a pipeline that loads delta data based on change data capture (CDC) information in the source Azure SQL Managed Instance database to an Azure blob storage.

You perform the following steps in this tutorial:

  • Prepare the source data store
  • Create a data factory.
  • Create linked services.
  • Create source and sink datasets.
  • Create, debug and run the pipeline to check for changed data
  • Modify data in the source table
  • Complete, run and monitor the full incremental copy pipeline

Overview

The Change Data Capture technology supported by data stores such as Azure SQL Managed Instances (MI) and SQL Server can be used to identify changed data. This tutorial describes how to use Azure Data Factory with SQL Change Data Capture technology to incrementally load delta data from Azure SQL Managed Instance into Azure Blob Storage. For more concrete information about SQL Change Data Capture technology, see Change data capture in SQL Server.

End-to-end workflow

Here are the typical end-to-end workflow steps to incrementally load data using the Change Data Capture technology.

Note

Both Azure SQL MI and SQL Server support the Change Data Capture technology. This tutorial uses Azure SQL Managed Instance as the source data store. You can also use an on-premises SQL Server.

High-level solution

In this tutorial, you create a pipeline that performs the following operations:

  1. Create a lookup activity to count the number of changed records in the SQL Database CDC table and pass it to an IF Condition activity.
  2. Create an If Condition to check whether there are changed records and if so, invoke the copy activity.
  3. Create a copy activity to copy the inserted/updated/deleted data between the CDC table to Azure Blob Storage.

If you don't have an Azure subscription, create a free account before you begin.

Prerequisites

  • Azure SQL Database Managed Instance. You use the database as the source data store. If you don't have an Azure SQL Database Managed Instance, see the Create an Azure SQL Database Managed Instance article for steps to create one.
  • Azure Storage account. You use the blob storage as the sink data store. If you don't have an Azure storage account, see the Create a storage account article for steps to create one. Create a container named raw.

Create a data source table in Azure SQL Database

  1. Launch SQL Server Management Studio, and connect to your Azure SQL Managed Instances server.

  2. In Server Explorer, right-click your database and choose the New Query.

  3. Run the following SQL command against your Azure SQL Managed Instances database to create a table named customers as data source store.

    create table customers (customer_id int, first_name varchar(50), last_name varchar(50), email varchar(100), city varchar(50), CONSTRAINT "PK_Customers" PRIMARY KEY CLUSTERED ("customer_id") );
  4. Enable Change Data Capture mechanism on your database and the source table (customers) by running the following SQL query:

    Note

    • Replace <your source schema name> with the schema of your Azure SQL MI that has the customers table.
    • Change data capture doesn't do anything as part of the transactions that change the table being tracked. Instead, the insert, update, and delete operations are written to the transaction log. Data that is deposited in change tables will grow unmanageably if you do not periodically and systematically prune the data. For more information, see Enable Change Data Capture for a database
    EXEC sys.sp_cdc_enable_db EXEC sys.sp_cdc_enable_table@source_schema = 'dbo',@source_name = 'customers', @role_name = 'null',@supports_net_changes = 1
  5. Insert data into the customers table by running the following command:

     insert into customers (customer_id, first_name, last_name, email, city) values (1, 'Chevy', 'Leward', 'cleward0@mapy.cz', 'Reading'), (2, 'Sayre', 'Ateggart', 'sateggart1@nih.gov', 'Portsmouth'), (3, 'Nathalia', 'Seckom', 'nseckom2@blogger.com', 'Portsmouth');

    Note

    No historical changes to the table are captured prior to change data capture being enabled.

Create a data factory

Follow the steps in the article Quickstart: Create a data factory by using the Azure portal to create a data factory if you don't already have one to work with.

Create linked services

You create linked services in a data factory to link your data stores and compute services to the data factory. In this section, you create linked services to your Azure Storage account and Azure SQL MI.

Create Azure Storage linked service.

In this step, you link your Azure Storage Account to the data factory.

  1. Click Connections, and click + New.

    (Video) Data Flows: How to capture changed data

    Incrementally copy data using Change Data Capture - Azure Data Factory (3)

  2. In the New Linked Service window, select Azure Blob Storage, and click Continue.

    Incrementally copy data using Change Data Capture - Azure Data Factory (4)

  3. In the New Linked Service window, do the following steps:

    1. Enter AzureStorageLinkedService for Name.
    2. Select your Azure Storage account for Storage account name.
    3. Click Save.

    Incrementally copy data using Change Data Capture - Azure Data Factory (5)

Create Azure SQL MI Database linked service.

In this step, you link your Azure SQL MI database to the data factory.

Note

For those using SQL MI see here for information regarding access via public vs private endpoint. If using private endpoint one would need to run this pipeline using a self-hosted integration runtime. The same would apply to those running SQL Server on-prem, in a VM or VNet scenarios.

  1. Click Connections, and click + New.

  2. In the New Linked Service window, select Azure SQL Database Managed Instance, and click Continue.

  3. In the New Linked Service window, do the following steps:

    1. Enter AzureSqlMI1 for the Name field.
    2. Select your SQL server for the Server name field.
    3. Select your SQL database for the Database name field.
    4. Enter name of the user for the User name field.
    5. Enter password for the user for the Password field.
    6. Click Test connection to test the connection.
    7. Click Save to save the linked service.

    Incrementally copy data using Change Data Capture - Azure Data Factory (6)

Create datasets

In this step, you create datasets to represent data source and data destination.

Create a dataset to represent source data

In this step, you create a dataset to represent the source data.

  1. In the treeview, click + (plus), and click Dataset.

    Incrementally copy data using Change Data Capture - Azure Data Factory (7)

  2. Select Azure SQL Database Managed Instance, and click Continue.

    Incrementally copy data using Change Data Capture - Azure Data Factory (8)

  3. In the Set properties tab, set the dataset name and connection information:

    1. Select AzureSqlMI1 for Linked service.
    2. Select [dbo].[dbo_customers_CT] for Table name. Note: this table was automatically created when CDC was enabled on the customers table. Changed data is never queried from this table directly but is instead extracted through the CDC functions.

    Incrementally copy data using Change Data Capture - Azure Data Factory (9)

Create a dataset to represent data copied to sink data store.

In this step, you create a dataset to represent the data that is copied from the source data store. You created the data lake container in your Azure Blob Storage as part of the prerequisites. Create the container if it does not exist (or) set it to the name of an existing one. In this tutorial, the output file name is dynamically generated by using the trigger time, which will be configured later.

  1. In the treeview, click + (plus), and click Dataset.

    Incrementally copy data using Change Data Capture - Azure Data Factory (10)

  2. Select Azure Blob Storage, and click Continue.

    Incrementally copy data using Change Data Capture - Azure Data Factory (11)

  3. Select DelimitedText, and click Continue.

    (Video) Incrementally Copy New and Changed Files Only between Containers in Azure Data Factory -ADF Tutorial

    Incrementally copy data using Change Data Capture - Azure Data Factory (12)

  4. In the Set Properties tab, set the dataset name and connection information:

    1. Select AzureStorageLinkedService for Linked service.
    2. Enter raw for container part of the filePath.
    3. Enable First row as header
    4. Click Ok

    Incrementally copy data using Change Data Capture - Azure Data Factory (13)

Create a pipeline to copy the changed data

In this step, you create a pipeline, which first checks the number of changed records present in the change table using a lookup activity. An IF condition activity checks whether the number of changed records is greater than zero and runs a copy activity to copy the inserted/updated/deleted data from Azure SQL Database to Azure Blob Storage. Lastly, a tumbling window trigger is configured and the start and end times will be passed to the activities as the start and end window parameters.

  1. In the Data Factory UI, switch to the Edit tab. Click + (plus) in the left pane, and click Pipeline.

    Incrementally copy data using Change Data Capture - Azure Data Factory (14)

  2. You see a new tab for configuring the pipeline. You also see the pipeline in the treeview. In the Properties window, change the name of the pipeline to IncrementalCopyPipeline.

    Incrementally copy data using Change Data Capture - Azure Data Factory (15)

  3. Expand General in the Activities toolbox, and drag-drop the Lookup activity to the pipeline designer surface. Set the name of the activity to GetChangeCount. This activity gets the number of records in the change table for a given time window.

    Incrementally copy data using Change Data Capture - Azure Data Factory (16)

  4. Switch to the Settings in the Properties window:

    1. Specify the SQL MI dataset name for the Source Dataset field.

    2. Select the Query option and enter the following into the query box:

    DECLARE @from_lsn binary(10), @to_lsn binary(10); SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE());SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
    1. Enable First row only

    Incrementally copy data using Change Data Capture - Azure Data Factory (17)

  5. Click the Preview data button to ensure a valid output is obtained by the lookup activity

    Incrementally copy data using Change Data Capture - Azure Data Factory (18)

  6. Expand Iteration & conditionals in the Activities toolbox, and drag-drop the If Condition activity to the pipeline designer surface. Set the name of the activity to HasChangedRows.

    Incrementally copy data using Change Data Capture - Azure Data Factory (19)

  7. Switch to the Activities in the Properties window:

    1. Enter the following Expression
    @greater(int(activity('GetChangeCount').output.firstRow.changecount),0)
    1. Click on the pencil icon to edit the True condition.

    Incrementally copy data using Change Data Capture - Azure Data Factory (20)

    1. Expand General in the Activities toolbox and drag-drop a Wait activity to the pipeline designer surface. This is a temporary activity in order to debug the If condition and will be changed later in the tutorial.

    Incrementally copy data using Change Data Capture - Azure Data Factory (21)

    1. Click on the IncrementalCopyPipeline breadcrumb to return to the main pipeline.
  8. Run the pipeline in Debug mode to verify the pipeline executes successfully.

    Incrementally copy data using Change Data Capture - Azure Data Factory (22)

  9. Next, return to the True condition step and delete the Wait activity. In the Activities toolbox, expand Move & transform, and drag-drop a Copy activity to the pipeline designer surface. Set the name of the activity to IncrementalCopyActivity.

    Incrementally copy data using Change Data Capture - Azure Data Factory (23)

  10. Switch to the Source tab in the Properties window, and do the following steps:

  11. Specify the SQL MI dataset name for the Source Dataset field.

    (Video) 8.2 Incremental data load in Azure Data Factory #AzureDataEngineering #AzureETL #ADF

  12. Select Query for Use Query.

  13. Enter the following for Query.

    DECLARE @from_lsn binary(10), @to_lsn binary(10); SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE());SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')

Incrementally copy data using Change Data Capture - Azure Data Factory (24)

  1. Click preview to verify that the query returns the changed rows correctly.

    Incrementally copy data using Change Data Capture - Azure Data Factory (25)

  2. Switch to the Sink tab, and specify the Azure Storage dataset for the Sink Dataset field.

    Incrementally copy data using Change Data Capture - Azure Data Factory (26)

  3. Click back to the main pipeline canvas and connect the Lookup activity to the If Condition activity one by one. Drag the green button attached to the Lookup activity to the If Condition activity.

    Incrementally copy data using Change Data Capture - Azure Data Factory (27)

  4. Click Validate on the toolbar. Confirm that there are no validation errors. Close the Pipeline Validation Report window by clicking >>.

    Incrementally copy data using Change Data Capture - Azure Data Factory (28)

  5. Click Debug to test the pipeline and verify that a file is generated in the storage location.

    Incrementally copy data using Change Data Capture - Azure Data Factory (29)

  6. Publish entities (linked services, datasets, and pipelines) to the Data Factory service by clicking the Publish all button. Wait until you see the Publishing succeeded message.

    Incrementally copy data using Change Data Capture - Azure Data Factory (30)

Configure the tumbling window trigger and CDC window parameters

In this step, you create a tumbling window trigger to run the job on a frequent schedule. You will use the WindowStart and WindowEnd system variables of the tumbling window trigger and pass them as parameters to your pipeline to be used in the CDC query.

  1. Navigate to the Parameters tab of the IncrementalCopyPipeline pipeline and using the + New button add two parameters (triggerStartTime and triggerEndTime) to the pipeline, which will represent the tumbling window start and end time. For debugging purposes add default values in the format YYYY-MM-DD HH24:MI:SS.FFF but ensure the triggerStartTime is not prior to CDC being enabled on the table, otherwise this will result in an error.

    Incrementally copy data using Change Data Capture - Azure Data Factory (31)

  2. Click on the settings tab of the Lookup activity and configure the query to use the start and end parameters. Copy the following into the query:

    @concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = ''',pipeline().parameters.triggerStartTime,''';SET @end_time = ''',pipeline().parameters.triggerEndTime,''';SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time);SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time);SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
  3. Navigate to the Copy activity in the True case of the If Condition activity and click on the Source tab. Copy the following into the query:

    @concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = ''',pipeline().parameters.triggerStartTime,''';SET @end_time = ''',pipeline().parameters.triggerEndTime,''';SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time);SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time);SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
  4. Click on the Sink tab of the Copy activity and click Open to edit the dataset properties. Click on the Parameters tab and add a new parameter called triggerStart

    Incrementally copy data using Change Data Capture - Azure Data Factory (32)

  5. Next, configure the dataset properties to store the data in a customers/incremental subdirectory with date-based partitions.

    1. Click on the Connection tab of the dataset properties and add dynamic content for both the Directory and the File sections.

    2. Enter the following expression in the Directory section by clicking on the dynamic content link under the textbox:

      @concat('customers/incremental/',formatDateTime(dataset().triggerStart,'yyyy/MM/dd'))
    3. Enter the following expression in the File section. This will create file names based on the trigger start date and time, suffixed with the csv extension:

      @concat(formatDateTime(dataset().triggerStart,'yyyyMMddHHmmssfff'),'.csv')

      Incrementally copy data using Change Data Capture - Azure Data Factory (33)

      (Video) Azure Data Factory - Incremental Load From SQL Managed Instance to Lake using Change data Capture

    4. Navigate back to the Sink settings in Copy activity by clicking on the IncrementalCopyPipeline tab.

    5. Expand the dataset properties and enter dynamic content in the triggerStart parameter value with the following expression:

      @pipeline().parameters.triggerStartTime

    Incrementally copy data using Change Data Capture - Azure Data Factory (34)

  6. Click Debug to test the pipeline and ensure the folder structure and output file is generated as expected. Download and open the file to verify the contents.

    Incrementally copy data using Change Data Capture - Azure Data Factory (35)

  7. Ensure the parameters are being injected into the query by reviewing the Input parameters of the pipeline run.

    Incrementally copy data using Change Data Capture - Azure Data Factory (36)

  8. Publish entities (linked services, datasets, and pipelines) to the Data Factory service by clicking the Publish all button. Wait until you see the Publishing succeeded message.

  9. Finally, configure a tumbling window trigger to run the pipeline at a regular interval and set start and end time parameters.

    1. Click the Add trigger button, and select New/Edit

    Incrementally copy data using Change Data Capture - Azure Data Factory (37)

    1. Enter a trigger name and specify a start time, which is equal to the end time of the debug window above.

    Incrementally copy data using Change Data Capture - Azure Data Factory (38)

    1. On the next screen, specify the following values for the start and end parameters respectively.

      @formatDateTime(trigger().outputs.windowStartTime,'yyyy-MM-dd HH:mm:ss.fff')@formatDateTime(trigger().outputs.windowEndTime,'yyyy-MM-dd HH:mm:ss.fff')

      Incrementally copy data using Change Data Capture - Azure Data Factory (39)

Note

The trigger will only run once it has been published. Additionally the expected behavior of tumbling window is to run all historical intervals from the start date until now. More information regarding tumbling window triggers can be found here.

  1. Using SQL Server Management Studio make some additional changes to the customer table by running the following SQL:

    insert into customers (customer_id, first_name, last_name, email, city) values (4, 'Farlie', 'Hadigate', 'fhadigate3@zdnet.com', 'Reading');insert into customers (customer_id, first_name, last_name, email, city) values (5, 'Anet', 'MacColm', 'amaccolm4@yellowbook.com', 'Portsmouth');insert into customers (customer_id, first_name, last_name, email, city) values (6, 'Elonore', 'Bearham', 'ebearham5@ebay.co.uk', 'Portsmouth');update customers set first_name='Elon' where customer_id=6;delete from customers where customer_id=5;
  2. Click the Publish all button. Wait until you see the Publishing succeeded message.

  3. After a few minutes the pipeline will have triggered and a new file will have been loaded into Azure Storage

Monitor the incremental copy pipeline

  1. Click the Monitor tab on the left. You see the pipeline run in the list and its status. To refresh the list, click Refresh. Hover near the name of the pipeline to access the Rerun action and Consumption report.

    Incrementally copy data using Change Data Capture - Azure Data Factory (40)

  2. To view activity runs associated with the pipeline run, click the Pipeline name. If changed data was detected, there will be three activities including the copy activity otherwise there will only be two entries in the list. To switch back to the pipeline runs view, click the All Pipelines link at the top.

    Incrementally copy data using Change Data Capture - Azure Data Factory (41)

Review the results

You see the second file in the customers/incremental/YYYY/MM/DD folder of the raw container.

Incrementally copy data using Change Data Capture - Azure Data Factory (42)

Next steps

Advance to the following tutorial to learn about copying new and changed files only based on their LastModifiedDate:

Copy new files by lastmodifieddate

(Video) Easily capture source changes with ADF

Videos

1. Azure Data Factory - Incremental Data Copy with Parameters
(Azure Bits and Bytes)
2. 39. How to Capture Changed Data using Data flows in Azure Data Factory
(WafaStudies)
3. Incremental data loading with Azure Data Factory and Azure SQL DB
(Azure Data Factory)
4. Azure data factory || Incremental Load or Delta load from SQL to File Storage
(Learn N Relearn)
5. Azure Data Factory (ADF) Quick Tip: Incremental Data Refresh in Cosmos DB – Change Feed Capture​
(Data Platform Central)
6. Azure Data Factory: Upsert with the Copy Data Activity
(Pragmatic Works)
Top Articles
Latest Posts
Article information

Author: Reed Wilderman

Last Updated: 10/26/2022

Views: 6333

Rating: 4.1 / 5 (52 voted)

Reviews: 83% of readers found this page helpful

Author information

Name: Reed Wilderman

Birthday: 1992-06-14

Address: 998 Estell Village, Lake Oscarberg, SD 48713-6877

Phone: +21813267449721

Job: Technology Engineer

Hobby: Swimming, Do it yourself, Beekeeping, Lapidary, Cosplaying, Hiking, Graffiti

Introduction: My name is Reed Wilderman, I am a faithful, bright, lucky, adventurous, lively, rich, vast person who loves writing and wants to share my knowledge and understanding with you.