Realtime Change Data Capture To BigQuery — No Kafka, No Dataflow

A proof-of-concept running CDC from MySQL to BigQuery in realtime with Debezium and Pub/Sub…

Taufiq Ibrahim
Dev Genius

--

Photo by Caspar Camille Rubin on Unsplash

It’s been a while since I am thinking a proof-of-concept using Debezium to perform change data capture or CDC, from RDBMS and ingest the data into BigQuery, low-latency, without Kafka and Kafka Connect. Last time I tried using Debezium Server-Cloud Pub/Sub combo, but hit a disappoinment, because I need to deploy Dataflow jobs, which generate additional cost and more resource to manage. So, I dumped the idea at that time. Until…

Update 2022–08–23 The issue on using FLOAT or DOUBLE PRECISION data types has been solved. Thanks Google Cloud team!

Recently, Google Cloud launched new feature to stream data from Pub/Sub directly to BigQuery. With this new BigQuery Subscription feature, I might have another alternative solution. I am quite interested to give it a try.

Note: This is just for fun and not intended for production use-cases. I’ll put more detail considerations at the end of this post.

Preparing Database And Debezium Server Locally

I am using MySQL for the source database and Debezium Server deployed on my laptop using Docker. You can find the docker-compose.yml file below as reference.

Let’s do a little walkthrough about above Docker Compose file:

  • MySQL container is preloaded with a database called inventory. We will use a table called inventory.products as test table for this trial.
  • Debezium Server container has 2 mount volumes. The first one is the service account file (demo-sa.json) with write access to Pub/Sub and BigQuery. The second one, we mount conf/ directory which contains Debezium Server configuration file called application.properties. You can see the initial configuration file below. Please refer the details on Debezium Server sink configuration documentation.

This is my directory structure.

├── conf
│ └── application.properties
├── demo-sa.json
└── docker-compose.yml

At this point, we are not yet ready to run the MySQL and Debezium containers, because we need to have these resources available on Google Cloud:

  • Pub/Sub schema(s) — optional
  • Pub/Sub topic(s)
  • Pub/Sub subscription(s)
  • BigQuery table(s)

We will create those resources later.

Understand BigQuery Subscription

Pub/Sub Service Account Permission

To create a BigQuery subscription, the Pub/Sub service account must have permission to write to the specific BigQuery table and to read the table metadata. Grant the BigQuery Data Editor (roles/bigquery.dataEditor) role and the BigQuery Metadata Viewer (roles/bigquery.metadataViewer) role to the Pub/Sub service account. Here’s how to do it:

  • In the console, go to the IAM page.
  • Go to IAM
  • Select Include Google-provided role grants.
  • Filter by Name: Pub/Sub Service Account. The service account in the format service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com
  • Click Edit for the Pub/Sub Service Account.
  • In the Edit Permissions pane, click Add another role.
  • In the Select a role drop-down, enter BigQuery, and select the BigQuery Data Editor role.
  • Click Add another role again.
  • In the Select a role drop-down, enter BigQuery, and select the BigQuery Metadata Viewer role.

For more information, see Assign BigQuery roles to the Pub/Sub service account.

Properties of a BigQuery subscription: Topic Schema

BigQuery subscription has an option called Use topic schema. This option lets Pub/Sub writes the fields in the topic messages into the corresponding columns in BigQuery table. This sounds interesting. However, it comes with additional requirements:

  • The fields in the topic schema and the BigQuery schema must have the same names and their types must be compatible with each other.
  • Any optional field in the topic schema must also be optional in the BigQuery schema.
  • Required fields in the topic schema do not need to be required in the BigQuery schema.
  • If there are BigQuery fields that are not present in the topic schema, these BigQuery fields must be in mode NULLABLE.
  • If the topic schema has additional fields that are not present in the BigQuery schema and these fields can be dropped, select the option Drop unknown fields.

If we don’t use Use topic schema option, the subscription writes to a column called data . So, it’s easier to use this option. Just make sure the destination table have the data column.

More on that in Properties of a BigQuery subscription documentation.

I’ll start with the second option.

But, before that let’s set some environment variables to make our life easier.

So, without further ado…

MySQL CDC Into BigQuery Without Schema Definition

Let’s prepare the required resources before we run the CDC part.

First, create a BigQuery table. Since we use no schema, we will just create a table with data column.

CREATE OR REPLACE TABLE
<project_id>.<dataset>.mysql_inventory_products_no_schema (
data STRING
);

Next, create a Pub/Sub topic called mysql.inventory.products. The reason of this naming is, Debezium Server for MySQL use the convention of serverName.databaseName.tableName as the target topic name.

gcloud pubsub topics create mysql.inventory.products

Next, create a BigQuery Subscription.

gcloud pubsub subscriptions create \
mysql.inventory.products-bq-sub \
--topic mysql.inventory.products \
--bigquery-table=$PROJECT_ID.$BQ_DATASET.mysql_inventory_products_no_schema

Now, let’s run the containers.

docker-compose up

The Debezium Server container might exited and restarted several times while waiting MySQL container to be ready.

Check BigQuery using SQL.

Try 1: It works! But what’s that horrible hard to read rows?

Impressive! But’s why those long-hairy records?

Because Debezium Server by default sends the raw CDC format, includes the payload and the schema of the data itself. You can learn more about this on Debezium MySQL Data change events documentation page.

We need to use Debezium’s Single Message Transform (SMT) called New Record State Extraction. Let’s add these lines into application.properties file.

debezium.source.transforms=unwrap
debezium.source.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.source.transforms.unwrap.add.fields=op,table,source.ts_ms
debezium.source.transforms.unwrap.delete.handling.mode=rewrite

Let’s reset all the things and start over.

Check again using SQL.

Now, that’s better. But what are those schema things? Can we get rid of them?

Of course we can. Add these lines on application.properties and start over.

debezium.source.key.converter.schemas.enable=false
debezium.source.value.converter.schemas.enable=false

Check again using SQL.

Well that’s great!

There you go!

Note that we have additional fields __op, __table, __source_ts_ms and __deleted. It’s because we have these additional configuration on application.properties .

debezium.source.transforms.unwrap.add.fields=op,table,source.ts_ms

MySQL CDC Into BigQuery With Schema Definition

We have successfully stream MySQL CDC without schema definition into a single-column BigQuery table. Now, let’s try using schema so that we have a tabular data having separated fields matches MySQL source table schema.

First, let’s clean up all the mess we created earlier.

# Delete subscription
gcloud pubsub subscriptions delete mysql.inventory.products-bq-sub
# Delete topic
gcloud pubsub topics delete mysql.inventory.products
# Drop BigQuery table (Using SQL UI)
DROP TABLE <project_id>.<dataset>.mysql_inventory_products_no_schema;

Create a new BigQuery table.

CREATE OR REPLACE TABLE tibrahim_debezium.mysql_inventory_products (
id INT64 NOT NULL,
name STRING,
description STRING,
weight float64,
__op STRING,
__table STRING,
__source_ts_ms INT64,
__deleted STRING
);

Next, let’s create schema for Pub/Sub topic. This schema must be a valid AVRO schema which match the source table definition.

gcloud pubsub schemas create mysql.inventory.products-schema \
--type=AVRO \
--definition='{"type":"record","name":"MysqlInventoryProductsSchema","fields":[{"type":"int","optional":false,"name":"id"},{"type":"string","optional":false,"name":"name"},{"type":"string","optional":false,"name":"description"},{"type":"float","optional":false,"name":"weight"},{"type":"string","optional":true,"name":"__op"},{"type":"string","optional":true,"name":"__table"},{"type":"long","optional":true,"name":"__source_ts_ms"},{"type":"string","optional":true,"name":"__deleted"}]}'

Now, let’s create Pub/Sub topic using the schema.

gcloud pubsub topics create mysql.inventory.products --message-encoding=json --schema=mysql.inventory.products-schema

Now, the new feature, BigQuery Subscription…with schema

gcloud pubsub subscriptions create mysql.inventory.products-bq-sub \
--topic mysql.inventory.products \
--bigquery-table=$PROJECT_ID.$BQ_DATASET.mysql_inventory_products \
--use-topic-schema

Ouch, we got an error here.

ERROR: Failed to create subscription [projects/<PROJECT_ID>/subscriptions/mysql.inventory.products-bq-sub]: Incompatible schema type for field weight: DOUBLE vs. FLOAT.
ERROR: (gcloud.pubsub.subscriptions.create) Failed to create the following: [mysql.inventory.products-bq-sub].

It seems that BigQuery Subscription is having an issue/bug related to float data type. I wrote a post on Google Cloud Community Forum about this issue, and it seems that I am not the only one experience this error.

Okay, I am a bit disappointed about this issue. But, let’s not stop here. We’ll try using another table without float column. Let’ use addresses table.

Changed Debezium Server application.properties:

debezium.source.table.include.list=inventory.addresses

Let’s create a new BigQuery table for addresses table.

CREATE OR REPLACE TABLE <project_id>.<dataset>.mysql_inventory_addresses (
`id` int64 NOT NULL,
`customer_id` int NOT NULL,
`street` STRING NOT NULL,
`city` STRING NOT NULL,
`state` STRING NOT NULL,
`zip` STRING NOT NULL,
`type` STRING NOT NULL,
__op STRING,
__table STRING,
__source_ts_ms INT64,
__deleted STRING
);

Next, create Pub/Sub schema, topic, BigQuery Subscription and run Debezium Server.

There you go, my friend, a beautiful tabular view of our data.

My Takeaways

Running change data capture without Kafka and Kafka Connect is possible with the help of Debezium Server and Pub/Sub. With the release of BigQuery Subscription feature, it is even easier to stream directly, without Cloud Dataflow, into BigQuery table. Less resource to monitor, less hassle and less cost.

However, this simpler approach comes with another issues:

  • Schema evolution
    Currently, after a schema is associated with a topic, you cannot update the schema. If your source table schema changed, then the CDC stream will fail. Confluent Schema Registry or Apicurio Schema Registry already solved this challenge. There’s a Google Issue Tracker about this. Let’s hope Google Pub/Sub team will solve this in near future. If your data source have a frequent schema changes, perhaps you’d be better stick to Kafka stack.
  • Topic and Destination Table Auto-Creation/Schema Auto-Evolution
    As of the time of writing this article, we need to create schema, topic, subscription and BigQuery table before running Debezium. If you have tens or hundreds of tables, then you need to come up with some automation on creating, managing and destroying the resources. More mature framework like Kafka Connect and the open source Kafka Connect BigQuery Sink already solve this issue. Again, if you have problem with this issue, then perhaps you should stick with Kafka Connect or other mature framework.

If you don’t have any of above issues, then this approach might work for you. Anyway, this feature is really great. Not only for change data capture purpose, you might use it for a lot of things such as sending IoT sensor data directly to your BigQuery. Or capturing web events. The sky is the limit.

--

--