Debezium JSON
On this page
This article introduces you to usage examples, configuration options, and typemaps of the Debezium-json format.
Background Information
Debezium is a CDC (Changelog Data Capture) tool that streams changes from MySQL, PostgreSQL, Oracle, Microsoft SQL Server, and many other databases into Kafka in real time. Debezium provides a unified format structure for changelogs and supports serialization of messages using JSON.
Flink supports parsing Debezium JSON messages into INSERT, UPDATE or DELETE messages into the Flink SQL system. In many cases, it is very useful to take advantage of this feature, for example:
- Synchronize incremental data from database to other systems
- Log audit
- Real-time materialized view of the database
- Temporal join change history of database tables
Flink also supports encoding INSERT, UPDATE or DELETE messages in Flink SQL into JSON messages in Debezium format, and output them to storage such as Kafka.
Currently Flink does not support combining UPDATE_BEFORE and UPDATE_AFTER into one UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER as Debezium messages of type DELETE and INSERT respectively.
Example of Use
Assuming the MySQL products table has 4 columns (id, name, description, weight), a simple example of an update operation captured from the MySQL products table in JSON format is as follows:
1 {
2 "before": {
3 "id": 111,
4 "name": "scooter",
5 "description": "Big 2-wheel scooter",
6 "weight": 5.18
7 },
8 "after": {
9 "id": 111,
10 "name": "scooter",
11 "description": "Big 2-wheel scooter",
12 "weight": 5.15
13 },
14 "source": {...},
15 "op": "u",
16 "ts_ms": 1589362330904 ,
17 "transaction": null
18 };For the meaning of each field in the example, see Debezium for details.
The JSON message above is an update event on the products table where the row with id = 111 has a weight value changed from 5.18 to 5.15. Assuming this message is synced to a Kafka topic called products_binlog, the following DDL can be used to consume this topic and parse change events.
1 -- Use 'debezium-json' format to parse Debezium's JSON message
2 CREATE TABLE topic_products (
3 -- schema is exactly the same as MySQL's products table
4 id BIGINT,
5 name STRING,
6 description STRING,
7 weight DECIMAL(10, 2)
8 ) WITH (
9 'connector ' = 'kafka',
10 'topic' = 'products_binlog',
11 'properties.bootstrap.servers' = 'localhost:9092',
12 'properties.group.id' = 'testGroup',
13 -- use 'debezium-json' format to Parse Debezium's JSON messages
14 -- use 'debezium-avro-confluent' if Debezium encodes messages with Avro
15 );In some cases, when setting up Debezium Kafka Connect, the Kafka configuration value.converter.schemas.enable may be enabled to include schema information in the message body. A Debezium JSON message might look like this:
1 {
2 "schema": {...},
3 "payload": {
4 "before": {
5 "id": 111,
6 "name": "scooter",
7 "description": "Big 2-wheel scooter",
8 "weight" : 5.18
9 },
10 "after": {
11 "id": 111,
12 "name": "scooter",
13 "description": "Big 2-wheel scooter",
14 "weight": 5.15
15 },
16 "source": {... },
17 "op": "u",
18 "ts_ms": 1589362330904,
19 "transaction": null
20 }
21 }'debezium-json.schema-include' = 'true' (default is false) to the above DDL WITH clause . In general, it is not recommended to include a schema description, as this can make the message very verbose and reduce parsing performance.
After registering a topic as a Flink table, Debezium messages can be used as a changelog source.
1 -- A real-time materialized view of MySQL "products".
2 -- Calculate the latest average weight of the same product.
3 SELECT name, AVG(weight) FROM topic_products GROUP BY name;
4
5 -- Synchronize all data and incremental changes to the MySQL "products" table.
6 -- Elasticsearch "products" index for future lookups.
7 INSERT INTO elasticsearch_products
8 SELECT * FROM topic_products;Configuration Options
Flink provides debezium-json to parse messages in JSON format generated by Debezium.
debezium-json
Use debezium-json to parse Debezium JSON messages.
Type Mapping
Currently, Debezium uses JSON format for serialization and deserialization. For more details on data type mapping, please refer to the JSON Format documentation and the Confluent Avro Format documentation.
Other Instructions for Use
Available Metadata
The following format metadata can be declared as read-only (VIRTUAL) columns in DDL statements.
The format metadata field is only available if the corresponding connector forwards format metadata. Currently, only Kafka Connectors are able to declare metadata fields in their value format.
The following example shows how to access Debezium metadata fields in Kafka:
1
2 CREATE TABLE KafkaTable (
3 origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
4 event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
5 origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
6 origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
7 origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
8 origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
9 user_id BIGINT,
10 item_id BIGINT,
11 behavior STRING
12 ) WITH (
13 'connector' = 'kafka',
14 'topic' = 'user_behavior',
15 'properties.bootstrap.servers' = 'localhost:9092',
16 'properties.group.id' = 'testGroup',
17 'scan.startup .mode' = 'earliest-offset',
18 'value.format' = 'debezium-json'
19 );Common Problem
Posting Duplicate Change Events on Failure
In a normal operating environment, Debezium can deliver each change event with exactly-once semantics, and Flink can normally consume the change events generated by Debezium. In abnormal situations (such as failures), Debezium can only guarantee at-least-once delivery semantics. At this time, Debezium may deliver duplicate change events to Kafka, and when Flink consumes from Kafka, it will get duplicate events, which may lead to wrong results or unexpected exceptions in the operation of Flink query. Therefore, in this case, it is recommended to set the job parameter table.exec.source.cdc-events-duplicate to true and define the PRIMARY KEY on the source table. The Flink system will generate an additional stateful operator that uses the PRIMARY KEY to deduplicate change events and generate a normalized changelog stream.
Data Produced by Debezium Postgres Connector cannot Be Parsed Correctly
If you are using Debezium PostgreSQL Connector to capture changes to Kafka, make sure that the REPLICA IDENTITY of the monitored table has been configured as FULL, and the default value is DEFAULT. Otherwise, Flink SQL will not be able to parse Debezium data correctly.
When configured to FULL, update and delete events will fully contain the previous values of all columns. When configured for other, the before field of update and delete events will only contain the value of the PRIMARY KEY field, or be null (no PRIMARY KEY). You can change the configuration of REPLICA IDENTITY by running ALTER TABLE (your-table-name) REPLICA IDENTITY FULL.