Delivering data from Yandex Managed Service for PostgreSQL using Debezium
You can track data changes in Managed Service for PostgreSQL and send them to Managed Service for Apache Kafka® using Change Data Capture (CDC).
In this article, you will learn how to create a virtual machine in Yandex Cloud and set up Debezium, software used for CDC.
Before you begin
-
Create a source cluster with the following settings:
- With publicly available hosts.
- With the
db1
database. - With the
user1
user.
-
Create a Managed Service for Apache Kafka® target cluster in any applicable configuration with publicly available hosts.
-
Create a virtual machine with Ubuntu 20.04 and a public IP address.
-
Set up security groups so that you can connect to clusters from the internet and created VM, and connect to this VM over SSH from the internet:
-
Connect to a virtual machine over SSH and perform preliminary setup:
-
Install the dependencies:
sudo apt update && \ sudo apt install kafkacat openjdk-17-jre postgresql-client --yes
-
Create a folder for Apache Kafka®:
sudo mkdir -p /opt/kafka/
-
Download and unpack the archive with Apache Kafka® executable files in this folder. For example, to download and unpack Apache Kafka® 3.0, run the command:
wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz && \ sudo tar xf kafka_2.13-3.0.0.tgz --strip 1 --directory /opt/kafka/
You can check the current Apache Kafka® version on the page with project downloads.
-
Install certificates on the VM and check the availability of clusters:
- Managed Service for Apache Kafka® (use
kafkacat
). - Managed Service for PostgreSQL (use
psql
).
- Managed Service for Apache Kafka® (use
-
Create a folder that will store the files required for the operation of the Debezium connector:
sudo mkdir -p /etc/debezium/plugins/
-
The Debezium connector can connect to Managed Service for Apache Kafka® broker hosts if an SSL certificate is added to Java secure storage (Java Key Store). For added storage security, add a password that is at least 6 characters long to the
-storepass
parameter:sudo keytool \ -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexCA.crt \ -keystore /etc/debezium/keystore.jks \ -storepass <JKS password> \ --noprompt
-
Preparing the source cluster
-
Assign to the user
user1
the rolemdb_replication
.This is necessary to create a publication for Debezium to monitor changes in a Managed Service for PostgreSQL cluster.
-
Connect to the
db1
database on behalf ofuser1
. -
Add test data to the database. In this example, a simple table with information from car sensors is used.
Create a table:
CREATE TABLE public.measurements ( "device_id" text PRIMARY KEY NOT NULL, "datetime" timestamp NOT NULL, "latitude" real NOT NULL, "longitude" real NOT NULL, "altitude" real NOT NULL, "speed" real NOT NULL, "battery_voltage" real, "cabin_temperature" real NOT NULL, "fuel_level" real );
Populate the table with data:
INSERT INTO public.measurements VALUES ('iv9a94th6rztooxh5ur2', '2020-06-05 17:27:00', 55.70329032, 37.65472196, 427.5, 0, 23.5, 17, NULL), ('rhibbh3y08qmz3sdbrbu', '2020-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32), ('iv9a94th678tooxh5ur2', '2020-06-07 15:00:10', 55.70985913, 37.62141918, 417.0, 15.7, 10.3, 17, NULL);
-
Create a publication for the table:
CREATE PUBLICATION mpg_publication FOR TABLE public.measurements;
Configure Debezium
-
Connect to the virtual machine over SSH.
-
Download and unpack a proper Debezium connector to the folder
/etc/debezium/plugins/
.You can check the current connector version on the project page. The commands for version
1.9.4.Final
are below.VERSION="1.9.4.Final" wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/${VERSION}/debezium-connector-postgres-${VERSION}-plugin.tar.gz && \ sudo tar -xzvf debezium-connector-postgres-${VERSION}-plugin.tar.gz -C /etc/debezium/plugins/
-
Create the file
/etc/debezium/mdb-connector.conf
with Debezium connector settings for connecting to the source cluster:name=debezium-mpg connector.class=io.debezium.connector.postgresql.PostgresConnector plugin.name=pgoutput database.hostname=c-<cluster ID>.rw.mdb.yandexcloud.net database.port=6432 database.user=user1 database.password=<user1 password> database.dbname=db1 database.server.name=mpg table.include.list=public.measurements publication.name=mpg_publication slot.name=debezium_slot heartbeat.interval.ms=15000 heartbeat.topics.prefix=__debezium-heartbeat
Where:
-
name
: Logical name of the Debezium connector. Used for the connector's internal needs. -
database.hostname
: A special FQDN to connect to the source cluster master host.You can get the cluster ID with a list of clusters in the folder.
-
database.user
: The PostgreSQL username. -
database.dbname
: The PostgreSQL database name. -
database.server.name
: Name of the database server that Debezium will use when choosing a topic for sending messages. -
table.include.list
: Names of tables for which Debezium should track changes. Specify full names that include the schema name (default:public
). Debezium will use values from this field when selecting a topic for sending messages. -
publication.name
: The name of the publication created on the source cluster. -
slot.name
: The name of the replication slot that will be created by Debezium when working with the publication. -
heartbeat.interval.ms
andheartbeat.topics.prefix
: The heartbeat settings needed for Debezium.
-
Prepare the target cluster
-
Create a topic to store data from the source cluster:
-
Name:
mpg.public.measurements
.The following conventions are used for topic names:
<server name>.<schema name>.<table name>
.According to the Debezium configuration file:
- The name of the
mpg
server is specified in thedatabase.server.name parameter
. - The name of the
public
schema is specified together with the name of themeasurements
table in thetable.include.list
parameter.
- The name of the
If you need to track data changes in multiple tables, create a separate topic for each of them.
-
-
Create a service topic to track the connector status:
-
Name:
__debezium-heartbeat.mpg
.Names for service topics follow the convention
<prefix for heartbeat>.<server name>
.According to the Debezium configuration file:
- The
__debezium-heartbeat
prefix is specified in theheartbeat.topics.prefix
parameter. - The name of the
mpg
server is specified in thedatabase.server.name parameter
.
- The
-
Cleanup policy:
Compact
.
If you need data from multiple source clusters, create a separate service topic for each of them.
-
-
Create a user named
debezium
. -
Grant to the
debezium
user the rightsACCESS_ROLE_CONSUMER
andACCESS_ROLE_PRODUCER
to the created topics.
Start Debezium
-
Create a file with Debezium worker settings:
/etc/debezium/worker.conf
# AdminAPI connect properties bootstrap.servers=<FQDN of broker host 1>:9091,...,<FQDN of broker host N>:9091 sasl.mechanism=SCRAM-SHA-512 security.protocol=SASL_SSL ssl.truststore.location=/etc/debezium/keystore.jks ssl.truststore.password=<JKS password> sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<debezium user password>"; # Producer connect properties producer.sasl.mechanism=SCRAM-SHA-512 producer.security.protocol=SASL_SSL producer.ssl.truststore.location=/etc/debezium/keystore.jks producer.ssl.truststore.password=<JKS password> producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<debezium user password>"; # Worker properties plugin.path=/etc/debezium/plugins/ key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/etc/debezium/worker.offset
-
In a separate terminal, start the connector:
sudo /opt/kafka/bin/connect-standalone.sh \ /etc/debezium/worker.conf \ /etc/debezium/mdb-connector.conf
Check the health of Debezium
-
In a separate terminal, run the
kafkacat
utility in consumer mode:kafkacat \ -C \ -b <FQDN of broker host 1>:9091,...,<FQDN of broker host N>:9091 \ -t mpg.db1.measurements \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=debezium \ -X sasl.password=<password> \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexCA.crt \ -Z \ -K:
The data format schema of the
db1.public.measurements
table and the information about the previously added rows will be printed.Example of the message fragment{ "schema": { ... }, "payload": { "before": null, "after": { "device_id": "iv9a94th6rztooxh5ur2", "datetime": 1591378020000000, "latitude": 55.70329, "longitude": 37.65472, "altitude": 427.5, "speed": 0.0, "battery_voltage": 23.5, "cabin_temperature": 17.0, "fuel_level": null }, "source": { "version": "1.8.1.Final", "connector": "postgresql", "name": "mpg", "ts_ms": 1628245046882, "snapshot": "true", "db": "db1", "sequence": "[null,\"4328525512\"]", "schema": "public", "table": "measurements", "txId": 8861, "lsn": 4328525328, "xmin": null }, "op": "r", "ts_ms": 1628245046893, "transaction": null } }
-
Connect to the source cluster and add another row to the
measurements
table:INSERT INTO public.measurements VALUES ('iv7b74th678tooxh5ur2', '2020-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL);
-
Make sure the terminal running
kafkacat
displays details about the added row.
Delete the resources you created
If you no longer need these resources, delete them:
-
Delete the virtual machine.
If you reserved a public static IP address for the virtual machine, release and delete it.
-
Delete the clusters: