Stream Postgres Data using Apache Kafka & Debezium | Real Time ETL
Using Apache Kafka, Debezium and Postgres
Today we will continue with the Stream Processing and stream Postgres database changes to Apache Kafka. Previously we set up the environment and set the stage for real-time data processing from Postgres. In the previous session we installed Apache Kafka, Debezium and the rest of the required components, and configured the Postgres database for data streaming. First of all, we need a table to stream data from. We have loaded the following tables in this database during the Python ETL session. We will use the FactInternetSales table for data streaming. This table has sales transactions. However, all the data is loaded into this table and we want to simulate data streaming. For example in a real world scenario we receive a sales transaction every few seconds. We persist this transaction and stream it to a Kafka topic.
This tutorial will enable you to:
- Setup Postgres for Data Streaming
- Stream Database Changes to a Kafka topic
- Configure Postgres Kafka Connector for data streaming
- Consumer data stream via Python Kafka Consumer
If you are a visual learner then I have an accompanying video on YouTube with a walk-through of the complete code.
Table for Data Streaming
We create a new table for data streaming . This table contains the required columns we use for the data streaming. We will use this empty table as the source for our Kafka topic and we will incrementally insert rows into this table.
CREATE TABLE IF NOT EXISTS public.factinternetsales_streaming
(
productkey bigint,
customerkey bigint,
salesterritorykey bigint,
salesordernumber text COLLATE pg_catalog."default",
totalproductcost double precision,
salesamount double precision
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS public.factinternetsales_streaming
OWNER to postgres;
Insert Data Incrementally
To insert rows into this table one row at a time. I have put together a python script so we don’t have to manually run insert statements. It will look similar if you have been following along the Python ETL series. When we execute this script it will incrementally insert records simulating an application that writes to a database. This takes care of the database setup for the data streaming.
engine = create_engine(f'postgresql://{uid}:{pwd}@{server}:{port}/{db}')
df = pd.read_sql('Select * from public.factinternetsales', engine)
df = df[['productkey', 'customerkey', 'salesterritorykey', 'salesordernumber', 'totalproductcost', 'salesamount']]
#
for index, row in df.head(100).iterrows():
mod = pd.DataFrame(row.to_frame().T)
mod.to_sql(f"factinternetsales_streaming", engine, if_exists='append', index=False)
print("Row Inserted " + mod.salesordernumber.astype(str) + ' ' + mod.salesamount.astype(str).astype(str))
time.sleep(3)
Kafka Postgres Connector
Now we move to Kafka configuration. We can use Debezium to stream data from Postgres without additional code. All we have to do is configure a connector. Our Debezium API’s endpoint is localhost:8083. To configure a connector, we make a post request on this endpoint and provide a JSON payload.
{
"name": "source-productcategory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "your-host-ip",
"database.port": "5432",
"database.user": "user",
"database.password": "password",
"database.dbname": "AdventureWorks",
"plugin.name": "pgoutput",
"database.server.name": "source",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"table.include.list": "public.factinternetsales_streaming",
"slot.name" : "dbz_sales_transaction_slot"
}
}
Kafka Python Consumer
We can spin up a Python Kafka Consumer that subscribes to the topic related to our table. We will add an additional parameter to this Consumer called group id. This makes sure that we only read the message once. If we stop the consumer and re-run it, it won’t read the same messages again. It remembers the last message it read and carries on from that point. We run this consumer and start our script that inserts data into the source table at regular intervals. We are successfully inserting records into the source table.
If we move back to our Consumer we see that the database changes are streamed to this topic. Our consumer is receiving these changes and displaying them. We are successfully streaming the database changes to a Kafka topic and simultaneously our Python Consumer is reading this stream from the Kafka topic. We are successfully streaming data from Postgres to Kafka with Debezium.
Conclusion
- We showcased how to configure Postgres database for real time Data Streaming.
- We create a Kafka Connector to Stream Database Changes to a Kafka topic.
- We create a Python script to incrementally insert data into the database.
- We created a Python Consumer to read a real time data stream from Postgres database.
- The full code can be found here