0

0

使用Debezium进行MySQL变更数据捕获(CDC)实战

夜晨

夜晨

发布时间:2025-09-11 14:26:01

|

462人浏览过

|

来源于php中文网

原创

Debezium通过监听MySQL binlog实现数据实时同步,需配置MySQL、部署Connector、设置Kafka Connect并消费变更事件;选择合适配置需根据需求设定server.id、连接信息、包含/排除表及快照模式;变更事件以JSON格式发布至Kafka,含before、after、op等字段,下游应用解析后执行对应操作;可通过Kafka Streams或Flink处理;使用Kafka Connect REST API和JMX指标监控Connector状态与性能;Schema演化通过Schema History Topic和注册表(如Confluent Schema Registry)管理;初始快照可配置模式与锁策略以减少数据库压力;性能优化包括提升资源、调整参数与数据库配置;数据一致性可通过事务性Outbox、Heartbeat、Kafka事务及数据比对保障。

使用debezium进行mysql变更数据捕获(cdc)实战

Debezium通过捕获MySQL的变更数据,可以实时同步数据到其他系统,实现数据集成和微服务架构。它监听MySQL的binlog,将数据变更转化为事件流,供下游应用消费。

使用Debezium进行MySQL CDC实战主要涉及配置MySQL、部署Debezium Connector、配置Kafka Connect以及消费变更事件。

配置MySQL以启用binlog。 部署Debezium Connector到Kafka Connect集群。 配置Connector以连接到MySQL数据库并指定要捕获的数据库和表。 下游应用通过Kafka消费变更事件。

如何选择合适的Debezium Connector配置?

选择合适的Debezium Connector配置取决于你的具体需求。关键配置包括:

  • database.server.id
    : MySQL服务器的唯一ID,确保在集群中唯一。
  • database.hostname
    database.port
    : MySQL服务器的地址和端口。
  • database.user
    database.password
    : 用于连接MySQL的用户名和密码,需要具有足够的权限读取binlog。
  • database.include.list
    database.exclude.list
    : 指定要捕获或排除的数据库列表。
  • table.include.list
    table.exclude.list
    : 指定要捕获或排除的表列表。
  • snapshot.mode
    : 定义初始快照模式,例如
    initial
    (首次启动时执行快照)或
    never
    (不执行快照)。
  • topic.prefix
    : 用于生成Kafka主题的前缀。

例如,如果你只想捕获

inventory
数据库中的
customers
表,可以这样配置:

{
  "name": "mysql-inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql.example.com",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "inventory",
    "database.include.list": "inventory",
    "table.include.list": "inventory.customers",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "snapshot.mode": "initial"
  }
}

这个配置首先指定了连接器类型为

MySqlConnector
,然后配置了MySQL的连接信息。
database.server.name
定义了逻辑数据库服务器的名称,
database.include.list
table.include.list
分别限制了捕获的数据库和表。
database.history.kafka.bootstrap.servers
database.history.kafka.topic
用于存储数据库schema历史,这对于Debezium的正常运行至关重要。
snapshot.mode
设置为
initial
,表示首次启动时会执行快照。

如何处理Debezium捕获的变更事件?

Debezium捕获的变更事件以JSON格式发布到Kafka主题。每个事件包含

before
after
source
op
字段。

  • before
    : 变更前的数据,如果操作是插入,则为
    null
  • after
    : 变更后的数据,如果操作是删除,则为
    null
  • source
    : 包含关于变更事件来源的信息,如数据库名称、表名称、时间戳等。
  • op
    : 表示操作类型,例如
    c
    (创建)、
    u
    (更新)、
    d
    (删除)、
    r
    (快照读取)。

下游应用需要解析这些JSON事件,并根据

op
字段执行相应的操作。例如,如果
op
c
,则将
after
中的数据插入到目标数据库;如果
op
u
,则更新目标数据库中对应的数据;如果
op
d
,则从目标数据库中删除对应的数据。

使用Kafka Streams或Apache Flink等流处理框架可以方便地处理这些事件。例如,使用Kafka Streams可以这样处理:

KStream stream = builder.stream("inventory.customers");

stream.foreach((key, value) -> {
  try {
    JsonNode root = objectMapper.readTree(value);
    String op = root.get("payload").get("op").asText();

    if ("c".equals(op)) {
      JsonNode after = root.get("payload").get("after");
      // 将after中的数据插入到目标数据库
      System.out.println("Insert: " + after.toString());
    } else if ("u".equals(op)) {
      JsonNode after = root.get("payload").get("after");
      // 更新目标数据库中对应的数据
      System.out.println("Update: " + after.toString());
    } else if ("d".equals(op)) {
      JsonNode before = root.get("payload").get("before");
      // 从目标数据库中删除对应的数据
      System.out.println("Delete: " + before.toString());
    }
  } catch (Exception e) {
    e.printStackTrace();
  }
});

这段代码从

inventory.customers
主题读取事件,解析JSON,并根据
op
字段执行相应的操作。

如何监控和管理Debezium Connector?

监控和管理Debezium Connector对于确保数据同步的稳定性和可靠性至关重要。Kafka Connect提供了REST API,可以用于监控Connector的状态、配置和任务。

可以使用以下API来获取Connector的状态:

curl -X GET http://localhost:8083/connectors/mysql-inventory-connector/status

这个API会返回Connector的状态信息,包括状态(running、failed等)、任务状态以及错误信息。

还可以使用以下API来更新Connector的配置:

curl -X PUT \
  http://localhost:8083/connectors/mysql-inventory-connector/config \
  -H 'Content-Type: application/json' \
  -d '{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql.example.com",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "inventory",
    "database.include.list": "inventory",
    "table.include.list": "inventory.customers",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "snapshot.mode": "never"
  }'

这个API会更新Connector的配置,例如修改

snapshot.mode
never

除了Kafka Connect API,还可以使用Debezium提供的JMX指标来监控Connector的性能。这些指标包括捕获的事件数量、延迟、错误率等。

微信 WeLM
微信 WeLM

WeLM不是一个直接的对话机器人,而是一个补全用户输入信息的生成模型。

下载

如果Connector出现问题,例如无法连接到MySQL或无法解析binlog事件,可以查看Kafka Connect的日志来排查问题。

如何处理Debezium Connector的Schema演化?

Schema演化是CDC过程中常见的问题。当MySQL表的结构发生变化时,例如添加、删除或修改列,Debezium需要能够正确处理这些变化。

Debezium通过Schema History Topic来管理Schema演化。每次表的结构发生变化时,Debezium会将新的Schema信息写入到Schema History Topic。下游应用可以读取Schema History Topic,并根据新的Schema来解析变更事件。

为了处理Schema演化,可以使用Avro或Protobuf等Schema注册表。这些注册表可以存储Schema信息,并为每个Schema分配一个唯一的ID。Debezium可以将Schema ID写入到变更事件中,下游应用可以使用Schema ID从注册表中获取Schema信息。

例如,使用Confluent Schema Registry可以这样配置Debezium Connector:

{
  "name": "mysql-inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql.example.com",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "inventory",
    "database.include.list": "inventory",
    "table.include.list": "inventory.customers",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "snapshot.mode": "initial",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

这个配置指定了使用AvroConverter作为Key和Value的转换器,并配置了Schema Registry的URL。Debezium会将Avro Schema信息写入到Schema Registry,并将Schema ID写入到变更事件中。

下游应用可以使用Confluent提供的AvroDeserializer来读取变更事件,并从Schema Registry中获取Schema信息。

如何处理Debezium Connector的初始快照?

Debezium Connector在首次启动时会执行初始快照,将MySQL数据库中的所有数据读取到Kafka主题。初始快照可能会对MySQL数据库造成性能影响,特别是对于大型数据库。

为了减少初始快照对MySQL数据库的影响,可以采取以下措施:

  • 使用
    snapshot.mode
    配置来控制快照模式。例如,可以设置为
    schema_only
    ,只读取表的结构,不读取数据;或者设置为
    never
    ,不执行快照。
  • 使用
    snapshot.locking.mode
    配置来控制快照期间的锁模式。例如,可以设置为
    minimal
    ,使用最小的锁,减少对MySQL数据库的影响。
  • 使用
    snapshot.new.tables
    配置来控制是否对新创建的表执行快照。
  • 在MySQL数据库的低峰期执行初始快照。

如果初始快照失败,可以查看Kafka Connect的日志来排查问题。常见的错误包括无法连接到MySQL、权限不足或内存不足。

如何优化Debezium Connector的性能?

优化Debezium Connector的性能可以提高数据同步的速度和可靠性。以下是一些优化建议:

  • 增加Kafka Connect集群的资源,例如CPU和内存。
  • 调整Kafka Connect的配置,例如
    tasks.max
    consumer.override.max.poll.records
  • 优化MySQL数据库的配置,例如
    binlog_format
    binlog_row_image
  • 使用合适的Schema注册表,例如Confluent Schema Registry。
  • 监控Debezium Connector的性能指标,例如捕获的事件数量、延迟和错误率。

如何确保Debezium Connector的数据一致性?

确保Debezium Connector的数据一致性是CDC的关键目标。以下是一些建议:

  • 使用事务性Outbox模式来确保数据变更的原子性。
  • 使用Debezium提供的Heartbeat功能来检测数据同步的延迟。
  • 使用Kafka的事务性功能来确保数据同步的Exactly-Once语义。
  • 定期验证目标数据库中的数据与MySQL数据库中的数据是否一致。

总的来说,使用Debezium进行MySQL CDC需要仔细规划和配置,并根据实际情况进行优化。通过合理的配置和监控,可以实现高效、可靠的数据同步,为微服务架构和数据集成提供强大的支持。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
mysql修改数据表名
mysql修改数据表名

MySQL修改数据表:1、首先查看数据库中所有的表,代码为:‘SHOW TABLES;’;2、修改表名,代码为:‘ALTER TABLE 旧表名 RENAME [TO] 新表名;’。php中文网还提供MySQL的相关下载、相关课程等内容,供大家免费下载使用。

667

2023.06.20

MySQL创建存储过程
MySQL创建存储过程

存储程序可以分为存储过程和函数,MySQL中创建存储过程和函数使用的语句分别为CREATE PROCEDURE和CREATE FUNCTION。使用CALL语句调用存储过程智能用输出变量返回值。函数可以从语句外调用(通过引用函数名),也能返回标量值。存储过程也可以调用其他存储过程。php中文网还提供MySQL创建存储过程的相关下载、相关课程等内容,供大家免费下载使用。

247

2023.06.21

mongodb和mysql的区别
mongodb和mysql的区别

mongodb和mysql的区别:1、数据模型;2、查询语言;3、扩展性和性能;4、可靠性。本专题为大家提供mongodb和mysql的区别的相关的文章、下载、课程内容,供大家免费下载体验。

281

2023.07.18

mysql密码忘了怎么查看
mysql密码忘了怎么查看

MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS 应用软件之一。那么mysql密码忘了怎么办呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

515

2023.07.19

mysql创建数据库
mysql创建数据库

MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS 应用软件之一。那么mysql怎么创建数据库呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

256

2023.07.25

mysql默认事务隔离级别
mysql默认事务隔离级别

MySQL是一种广泛使用的关系型数据库管理系统,它支持事务处理。事务是一组数据库操作,它们作为一个逻辑单元被一起执行。为了保证事务的一致性和隔离性,MySQL提供了不同的事务隔离级别。php中文网给大家带来了相关的教程以及文章欢迎大家前来学习阅读。

386

2023.08.08

sqlserver和mysql区别
sqlserver和mysql区别

SQL Server和MySQL是两种广泛使用的关系型数据库管理系统。它们具有相似的功能和用途,但在某些方面存在一些显著的区别。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

532

2023.08.11

mysql忘记密码
mysql忘记密码

MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。那么忘记mysql密码我们该怎么解决呢?php中文网给大家带来了相关的教程以及其他关于mysql的文章,欢迎大家前来学习阅读。

601

2023.08.14

拼多多赚钱的5种方法 拼多多赚钱的5种方法
拼多多赚钱的5种方法 拼多多赚钱的5种方法

在拼多多上赚钱主要可以通过无货源模式一件代发、精细化运营特色店铺、参与官方高流量活动、利用拼团机制社交裂变,以及成为多多进宝推广员这5种方法实现。核心策略在于通过低成本、高效率的供应链管理与营销,利用平台社交电商红利实现盈利。

31

2026.01.26

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.3万人学习

Node.js 教程
Node.js 教程

共57课时 | 9.4万人学习

CSS3 教程
CSS3 教程

共18课时 | 4.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号