Change Data Capture, Use Cases and real-world example using Debezium

Eresh Gorantla
Geek Culture
Published in
12 min readAug 14, 2021

--

This story focuses on Change Data Capture (CDC), What is it? Where it is used and what problems does it solve and finally a real-world sample code.

What is CDC?

CDC is a set of technologies/processes that allow in identifying and capturing the data that has changed in the database. Such that you can take action on the data at a later stage or define a business use case in a certain bounded context.

Change Data Capture is a process of detecting changes made to the database. The changes can then be streamed and integrated with other databases and systems. In other words: we receive a stream of events from our database.

This allows us to make faster and more accurate decisions based on data (Stream Processing and Streaming ETL). It does not overload systems and networks as classical solutions do. Often it is the only reasonable solution to upgrade legacy systems.

Change Data Capture (CDC) typically alludes to a mechanism for capturing all changes happening to a system’s data. The need for such a system is not difficult to imagine — audit for sensitive information, data replication across multiple DB instances or data centers, moving changes from transactional databases to data lakes/OLAP stores, etc. Transaction management in ACID-compliant databases is essentially CDC. A CDC system is a record of every change ever made to an entity and the metadata of that change (changed by, change time, etc).

CDC is about capturing data changes within a system’s bounded context, usually in the terms of the physical model. The system is recorded changes to its own data. Even if we have a separate service or system which stores these changes (some sort of audit store), the separation is an implementation detail. There is a continuity of domain modeling between the actual data and changes to it, hence both belong logically inside the same boundary.

Let us talk about few use cases where CDC can solve certain problems. There are many but I am going to talk about a few.

Streaming data to a Data Lake or Data Ware House

Typically this streaming data takes through ETL or ELT which is typically a batch loading of data achieved. This can be achieved either by

  • Background processes to extract the data from a complex query periodically that increases the load on the master database.
  • Pooling the data from the master source to get delta changes.

The Cons with the above approaches

  • Spikes in Data Usage eventually reduce the performance (Can be done during low traffic times).
  • Huge Data transferred can strain the network which will demand vertical scaling to handle huge data.
  • As we pool the data in intervals, which is not real-time data Synch can affect the business decisions.

How CDC addresses the above problems

  • We don’t do any high load queries on a periodic basis that reduces the data spikes.
  • The data that is transferred is much much lower batches so there won’t be network strain. The network traffic is evenly spread out here.
  • The CDC is real-time data Synch, so no delay in business decisions.

Event-Driven Architecture

One of the hardest things to accomplish is safely and consistently deliver data between service boundaries. In EDA, an individual service has to commit any changes to its data local and should publish messages to underlying Queueing systems, and the consumers listening to the message will take action accordingly. The data transfer is tricky here:

  • If data is committed in DB and publishing to event stream is failed.
  • The data is published to event stream but in actual DB has triggered rollback due to some error handling.

This can be avoided/fixed using the OUTBOX Pattern. I Will explain this in another post.

There are several approaches to achieve CDC and industry trade-offs too.

Approaches to implementing Change Data Capture

Approach 1 — Using Postgres Audit Triggers

Trigger-based methods involve creating audit triggers on the database to capture all the events related to INSERT, UPDATE and DELETE methods. The advantage of this method is that everything happens at the SQL level and the developer just needs to read a separate table that contains all the audit logs.

To create a trigger in the Postgres table execute the below command.

SELECT audit.audit_table('target_table');

This statement is to be executed with a user role with permission to access the audit schema. The app which writes to the tables must not be connecting to the database using a superuser role.

Limitations of Using Postgres Audit Triggers for PostgreSQL CDC

A downside with this method is that having triggers affects the performance of the database. A common practice to avoid this performance hit is to have a separate table that tracks the main table and have triggers on the second table. The syncing between the master and the secondary table can be done using Postgres logical replication feature.

Approach 2 — Using Postgres Logical Decoding

Logical decoding uses the content of the write-ahead log to create a log of activities happening in the database. Write ahead log is an internal log that describes database changes on a storage level. The advantage of this approach is that it does not affect the performance of the database in any way.

The approach works based on the installation of an output plugin. To enable the use of logical decoding the following parameters need to be set in the Postgres configuration.

wal_level = logical
max_replication_slots = 10

Once these parameters are set, execute the following command to create a logical decoding slot.

SELECT * FROM pg_create_logical_replication_slot('slot_repl', 'decode_test');

Limitations of Using Postgres Logical Decoding for Postgres CDC

In this case, also, the developers need to write an elaborate logic to process these events and then transform them into statements for the target database. Depending on the use case that you are trying to solve, this can increase the complexity.

Approach 3 — Using Timestamp Column

The above two methods are provided by the Postgres engine to implement CDC. There is also a slightly convoluted custom method if you have the flexibility of a timestamp column in your table. This means the developer will need to query the table periodically and monitor the changes to the timestamp column. When changes are detected, the script can create appropriate database statements to write these changes to the target database. This approach is, however, effort-intensive and would demand a lot of time and effort from the developer.

In a nutshell, even though Postgres provides support for continuous syncing through triggers, logical decoding, or custom logic, it is still upon the developers to capture these events and convert them to the target database. This logic will need to be specifically written for different target databases according to use cases. An alternative is to use a cloud-based service like Hevo which can use Postgres CDC to continuously sync data to most destinations.

The above three approaches have their benefits and downsides. I did extensive research on CDC and there are few readily available solutions.

Among the above available solutions, I explored Debezium because this is open source. Apart from Debezium Hevo seems promising.

Debezium

Debezium is an open-source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong.

Let us consider a use case. Suppose you have an application similar to IMDB, that has a movie-library microservice that acts as a single source of truth for all movie information and their ratings and likes recorded in real-time from the users. We have a recommenation-engine service that will list down the below features for users for every half hour (consider it).

  • Top 100 most likes movies.
  • Top 100 most-watched movies.
  • Top 100 most rated movies.
  • Top 100 critically acclaimed movies.
  • Top 100 movies by Genre.

The movie-library services database is updated when someone watches a movie, the view count is updated and similarly likes dislikes and rating too. The recommendation-service needs these data real-to predict the top recommendations to users for better experience and personification. One way is directly query the master DB which is not a good practice in the microservice ecosystem.

Consider this what if the master DB pushes out every change made to their tables as a stream to external systems? That’s a good idea and Babi too might be happy with this. Debezium serves this purpose it helps to stream the data changes to external subsystems which are subscribed.

This should be a good time that you to understand Debezium.

How Debezium Works?

Debezium’s goal is to build up a library of connectors that capture changes from a variety of database management systems and produce events with very similar structures, making it far easier for your applications to consume and respond to the events regardless of where the changes originated. (Copied from original Source)

Typically Debezium architecture revolves around connectors. The connectors help in capturing data changes as streams from the source system and sync the data into the target system.

Debezium currently supports connectors for MySQL, PostgreSQL, SQL Server, Oracle, Db2, and MongoDB.

Debezium provides three different types of deployment approaches.

As a library embedded into the application

The documentation is pretty good for this approach. Please find it here.

As a standalone server

Another way to deploy Debezium is using the Debezium server. The Debezium server is a configurable, ready-to-use application that streams change events from a source database to a variety of messaging infrastructures.

Standalone Server Architecture — Source

The Debezium server is configured to use one of the Debezium source connectors to capture changes from the source database. Change events can be serialized to different formats like JSON or Apache Avro and then will be sent to one of a variety of messaging infrastructures such as Amazon Kinesis, Google Cloud Pub/Sub, or Apache Pulsar.

Apache Kafka Connect service a standard approach (I am going to demo this approach, this is used typically for enterprise use cases).

Most commonly, you deploy Debezium by means of Apache Kafka Connect. Kafka Connect is a framework and runtime for implementing and operating:

  • Source connectors such as Debezium send records into Kafka.
  • Sink connectors that propagate records from Kafka topics to other systems.

The following image shows the architecture of a change data capture pipeline based on Debezium:

Kafka Connect Service Architecture — Source

As shown in the image, the Debezium connectors for MySQL and PostgresSQL are deployed to capture changes to these two types of databases. Each Debezium connector establishes a connection to its source database:

  • The MySQL connector uses a client library for accessing the binlog.
  • The PostgreSQL connector reads from a logical replication stream.
  • Kafka Connect operates as a separate service besides the Kafka broker.

By default, changes from one database table are written to a Kafka topic whose name corresponds to the table name. If needed, you can adjust the destination topic name by configuring Debezium’s topic routing transformation. For example, you can:

  • Route records to a topic whose name is different from the table’s name.
  • Stream change event records for multiple tables into a single topic

After change event records are in Apache Kafka, different connectors in the Kafka Connect eco-system can stream the records to other systems and databases such as Elasticsearch, data warehouses, and analytics systems, or caches such as Infinispan. Depending on the chosen sink connector, you might need to configure Debezium’s new record state extraction transformation. This Kafka Connect SMT propagates the after structure from Debezium’s change event to the sink connector. This is in place of the verbose change event record that is propagated by default.

Implementing a use case with Kafka Connector

The infra is built on docker. All the resources are bundled under one docker-compose file. The docker-compose file is very huge so I am not posting here. Please find the GitHub link here.

A docker-compose is used to set up the infrastructure needed for the demo:

Demo

export DEBEZIUM_VERSION=1.4 (or latest)

# Build the docker file using docker compose. It will take a while for first time.
docker-compose up --build

# Configure the connectors. For simplicity I created shell script.
./init.sh

I have loaded few records during Postgres startup the DB name is user_data and the tables are users, user_address and user_additional_info. So the records will be there when the docker-compose is done.

Note: Every time script generates random data. The id can be different for one another.

FROM debezium/postgres:latest
COPY ./create_schema.sql /home/create_schema.sql
COPY ./seed_data.sql /home/seed_data.sql
COPY ./init-db.sh /docker-entrypoint-initdb.d/init-db.sh

Create debezium Connectors

Postgres data connector

Elastic Search Sink for users table to index

Check the data in the Postgres database instance

Similarly, the Check-in Elastic search index

I have added Kibana in docker-compose so we can query elastic search from the Kibana dev console. Please click here to load the Kibana dev console locally.

Now let us update any record and see if that reflects in Elastic search or not.

Let us query against Elastic Search

Let us delete that record from the source system

Let us verify in the elastic search index

Let us check The Kafka Messages for events

We have used Kafdrop as a UI. We can see a list of messages published on a topic. To open Kafdrop in local please click here

The users , user_address and user_additional_info are the topics related to the source DB table

Let us understand the message content produced by the Debezium Kafka connector for data change.

Babi has another use case. Here is what he says

I have clearly documented it in the Github repo here.

Babi is convinced but he had one last use case.

I have clearly documented it in the GitHub repo here.

Some Complications in implementing Synching to the target system

It is common that we face issues when syncing data from RDBMS to Elastic search, there are a lot of options in Debezium connectors to customize the connection options. But in my experience, I faced few issues integrating Postgres to Elastic search.

But I created only a source connector for Postgres published the changes as a stream to Kafka. Then I wrote a custom consumer of our language and framework to consume the message and updated the index in Elastic search. The easy way could be to use lambda to consume the message from Kafka.

References

--

--

Eresh Gorantla
Geek Culture

Experience in Open source stack, microservices, event-driven, analytics. Loves Cricket, cooking, movies and travelling.