Stream Postgres Data using Apache Kafka & Debezium | Real Time ETL

Haq Nawaz
Dev Genius
Published in
4 min readApr 2, 2023

--

Using Apache Kafka, Debezium and Postgres

Streaming Postgres Data to Kafka

Today we will continue with the Stream Processing and stream Postgres database changes to Apache Kafka. Previously we set up the environment and set the stage for real-time data processing from Postgres. In the previous session we installed Apache Kafka, Debezium and the rest of the required components, and configured the Postgres database for data streaming. First of all, we need a table to stream data from. We have loaded the following tables in this database during the Python ETL session. We will use the FactInternetSales table for data streaming. This table has sales transactions. However, all the data is loaded into this table and we want to simulate data streaming. For example in a real world scenario we receive a sales transaction every few seconds. We persist this transaction and stream it to a Kafka topic.

This tutorial will enable you to:

  • Setup Postgres for Data Streaming
  • Stream Database Changes to a Kafka topic
  • Configure Postgres Kafka Connector for data streaming
  • Consumer data stream via Python Kafka Consumer

If you are a visual learner then I have an accompanying video on YouTube with a walk-through of the complete code.

Table for Data Streaming

We create a new table for data streaming . This table contains the required columns we use for the data streaming. We will use this empty table as the source for our Kafka topic and we will incrementally insert rows into this table.

CREATE TABLE IF NOT EXISTS public.factinternetsales_streaming
(
productkey bigint,
customerkey bigint,
salesterritorykey bigint,
salesordernumber text COLLATE pg_catalog."default",
totalproductcost double precision,
salesamount double precision
)

TABLESPACE pg_default;

ALTER TABLE IF EXISTS public.factinternetsales_streaming
OWNER to postgres;

Insert Data Incrementally

To insert rows into this table one row at a time. I have put together a python script so we don’t have to manually run insert statements. It will look similar if you have been following along the Python ETL series. When we execute this script it will incrementally insert records simulating an application that writes to a database. This takes care of the database setup for the data streaming.

engine = create_engine(f'postgresql://{uid}:{pwd}@{server}:{port}/{db}')
df = pd.read_sql('Select * from public.factinternetsales', engine)
df = df[['productkey', 'customerkey', 'salesterritorykey', 'salesordernumber', 'totalproductcost', 'salesamount']]
#
for index, row in df.head(100).iterrows():
mod = pd.DataFrame(row.to_frame().T)
mod.to_sql(f"factinternetsales_streaming", engine, if_exists='append', index=False)
print("Row Inserted " + mod.salesordernumber.astype(str) + ' ' + mod.salesamount.astype(str).astype(str))
time.sleep(3)

Kafka Postgres Connector

Now we move to Kafka configuration. We can use Debezium to stream data from Postgres without additional code. All we have to do is configure a connector. Our Debezium API’s endpoint is localhost:8083. To configure a connector, we make a post request on this endpoint and provide a JSON payload.

{
"name": "source-productcategory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "your-host-ip",
"database.port": "5432",
"database.user": "user",
"database.password": "password",
"database.dbname": "AdventureWorks",
"plugin.name": "pgoutput",
"database.server.name": "source",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"table.include.list": "public.factinternetsales_streaming",
"slot.name" : "dbz_sales_transaction_slot"
}
}

Kafka Python Consumer

We can spin up a Python Kafka Consumer that subscribes to the topic related to our table. We will add an additional parameter to this Consumer called group id. This makes sure that we only read the message once. If we stop the consumer and re-run it, it won’t read the same messages again. It remembers the last message it read and carries on from that point. We run this consumer and start our script that inserts data into the source table at regular intervals. We are successfully inserting records into the source table.

Insert Records into Source Table

If we move back to our Consumer we see that the database changes are streamed to this topic. Our consumer is receiving these changes and displaying them. We are successfully streaming the database changes to a Kafka topic and simultaneously our Python Consumer is reading this stream from the Kafka topic. We are successfully streaming data from Postgres to Kafka with Debezium.

Reading Database Stream

Conclusion

  • We showcased how to configure Postgres database for real time Data Streaming.
  • We create a Kafka Connector to Stream Database Changes to a Kafka topic.
  • We create a Python script to incrementally insert data into the database.
  • We created a Python Consumer to read a real time data stream from Postgres database.
  • The full code can be found here

--

--

I am a business intelligence developer and data science enthusiast. In my free time I like to travel and code, and I enjoy landscape photography.