The 5 minute introduction to Log-Based Change Data Capture with Debezium


Few years back I was working on an application where I had to pull data from an event table(populated using database triggers) and update my application specific data stores . This is a common problem that most software web developers need to solve. At that time I was not aware that this problem has a name. Sometime later I learnt that this is called Change Data Capture (CDC). As per wikipedia article on change data capture,

In databases, change data capture (CDC) is a set of software design patterns used to determine (and track) the data that has changed so that action can be taken using the changed data.

The key benefit of CDC is that you can identify the changed data in your source database which you can then incrementally apply to your target system. In absence of CDC, we are left with doing bulk loading of the data which is both time consuming and costly.

There are multiple ways to implement CDC. The trigger table with a polling process that I implemented back then is one valid implementation of CDC. The biggest drawback of any polling based approach is that it put undue load on the database server as you end up polling database even when no state has changed on the server.

One way we can avoid polling is by making our database server do the heavy lifting. We can write trigger on our events table and invoke a program that will push the changes to the target system. But, this will solution will not work beyond simple use cases. To build a reliable system, we need to add fault tolerance logic to the database side. To do this reliably we need to answer following questions:

  1. How we will handle failure situations where program invoked by trigger will fail?
  2. How to invoke our trigger again in case of failure?
  3. What will happen if our target system is unable to cope up with the events pushed from our source database system?

In recent years another solution to implement CDC has become popular. The solution is based on scanning transaction logs of the databases. Almost all databases have a concept of transaction log where history of actions executed against the database is maintained. This is done to guarantee ACID properties of the database in case of crashes, restarts, and hardware failures. To implement CDC using transaction log, you have to scan and interpret the database transaction log and publish events on a message broker.

There are many advantages of using transaction logs for CDC. Some of these are mentioned below:

  1. This solution is transparent to databases and has minimal impact of the database.
  2. Near realtime publishing of the changes to the target system. This means business can take accurate decisions based on the most current data
  3. It has no performance impact on the production database. Database triggers can slow down your database depending on the work you are doing in triggers. Log scanning approach has no impact on the database
  4. Because CDC transfer only incremental changes it reduced the cost of transferring data.
  5. This approach maintains the order in which original transactions were committed. This is important when target application depends on the order of transactions in the source system. The ordering guarantees are most often desired by target applications
  6. Target system can take time to process the messages. This is especially true when your messaging broker is Apache Kafka

Common CDC Use Cases

There are variety of use cases that CDC can help us solve:

  1. You can use CDC for data replication This was the first use case of CDC. You can replicate data from OLTP databases to OLAP databases.
  2. You can use CDC to implement ETL(Extract, Transform, and Load) jobs. In our application we were replicating data from IBM DB 2 to PostgreSQL. You can also push data to full text search engines like Elasticsearch or Solr.
  3. You can use CDC to update/patch or invalidate a cache. In our applications, our processing tier was working on in-memory data which we were updating using CDC pipeline
  4. You can use CDC to create/update CQRS read models
  5. You can use CDC to achieve auditing and compliance requirements
  6. You can use CDC to ingest data into your data lake
  7. You can use CDC to build near real-time applications.

Debezium

Debezium is an open source distributed platform for change data capture. It supports multiple source databases like MySQL, PostgreSQL, MongoDB, etc. You can learn about the complete list of supported databases by referring to Debezium documentation. It uses Apache Kafka as a message broker to reliably transfer changeset to the target systems. The architecture diagram shown below makes it clear different parts of Debezium project.

Debezium Architecture

  1. Source database: On the left we have a source database like MySQL or PostgreSQL whose data we want to copy to a target database like Elasticsearch or any analytics database.
  2. Kafka connect connector that parse and interpret transaction log and writes it to a Kafka partition
  3. Kafka acts as a persistent, ordered message broker
  4. Then, we have Kafka connector that are polling Kafka and pushing change to the target database

Debezium in action

Now that we know what Debezium does let’s take it for a ride. We will Debezium Docker setup that is described in the extensive Debezium official tutorial.

This section assumes you are comfortable with Docker.

To start Debezium, we will have to run following docker containers.

We will first start ZooKeeper. Apache Kafka uses ZooKeeper for leader election, configuration storage, access control lists, and to maintain membership of list of brokers.

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.10

Next, we will start Apache Kafka. It is the message broker used by Debezium

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.10

Next, we will run MySQL database. It is the source database that we want to track for changes.

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.10

You might be wondering why we are not using standard Docker image of MySQL. If you look closely we are using debezium/example-mysql image. The reason for this is that the default MySQL image does not come enabled with the binlog. The debezium/example-mysql image uses following mysql.conf to enable binlog.

# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id         = 223344
log_bin           = mysql-bin
expire_logs_days  = 1
binlog_format     = row

So, before you can use your MySQL database with Debezium you have to first enable binlog as shown above. For complete MySQL connector configuration you can refer to the documentation.

Also, the MySQL image comes repopulated with a sample inventory database. The default username/password combination is debezium/dbz. You can refer to the inventory.sql that runs after MySQL database is started.

We can connect to MySQL database using the following command.

docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

Finally, we will start the Kafka connect container that will start reading the MySQL database binlog file and publish events on Kafka topics.

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.10

We will start another container that will watch a Kafka topic dbserver1.inventory.customers and print messages published to it.

docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.10 watch-topic -a -k dbserver1.inventory.customers

But for the above to work we have to create a new connector by making HTTP POST call to connect as shown below.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

This command uses the Kafka Connect service’s RESTful API to submit a POST request against /connectors resource with a JSON document that describes our new connector.

After making this call, we will see messages printed in the watcher container. As there are four customers in our customers table so we saw four messages in the watcher console.

mysql> use inventory;
mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+

Inserting a record

Let’s insert a new customer and see how events are pushed to Kafka.

insert into customers(first_name, last_name, email) values ('Shekhar', 'Gulati', 'sgulati@acme.com');

The event printed on the watcher console is shown below.

{
    "payload": {
        "before": null,
        "after": {
            "id": 1005,
            "first_name": "Shekhar",
            "last_name": "Gulati",
            "email": "sgulati@acme.com"
        },
        "source": {
            "version": "0.10.0.Final",
            "connector": "mysql",
            "name": "dbserver1",
            "ts_ms": 1575704999000,
            "snapshot": "false",
            "db": "inventory",
            "table": "customers",
            "server_id": 223344,
            "gtid": null,
            "file": "mysql-bin.000003",
            "pos": 364,
            "row": 0,
            "thread": 2,
            "query": null
        },
        "op": "c",
        "ts_ms": 1575704999762
    }
}

Let’s understand each of the sections of the payload message

  1. before: This is null because this is the insert event. There is no previous value for this record
  2. after: This is the latest value in the source database for that row
  3. source: This gives information about the source — database name, table name, other information.
  4. op: The type of event. In this case, it is CREATE event.

Updating a record

Let’s see update in action as well.

update customers set email='shekhar.gulati@acme.com' where id=1005;

The event published in the Kafka is shown below. I am only showing part of the message.

    {
        "payload": {
            "before": {
                "id": 1005,
                "first_name": "Shekhar",
                "last_name": "Gulati",
                "email": "sgulati@acme.com"
            },
            "after": {
                "id": 1005,
                "first_name": "Shekhar",
                "last_name": "Gulati",
                "email": "shekhar.gulati@acme.com"
            },
            "source": {

            },
            "op": "u",
            "ts_ms": 1575705673230
        }
    }

This time we have op as u i.e. UPDATE and we have both before and after values.

Deleting a record

Finally, let’s see delete in action as well.

delete from customers where id = 1005;

The event published on Kafka is shown below.

{

    "payload": {
        "before": {
            "id": 1005,
            "first_name": "Shekhar",
            "last_name": "Gulati",
            "email": "shekhar.gulati@acme.com"
        },
        "after": null,
        "source": {

        },
        "op": "d",
        "ts_ms": 1575705856250
    }
}

The op attribute is d i.e. DELETE and the after object is null. The before captures state of the object before deletion.

Conclusion

You will agree that CDC enables interesting solution for specific problems. Debezium is an awesome open source tool that can help you solve CDC use cases with ease. There are organisations like WePay that are using Debezium in production.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s