实时数据同步利器debezium教程

实时数据同步利器debezium教程1debezium 概述 Debezium 是一个开源项目 为捕获数据更改 changedataca CDC 提供了一个低延迟的流式处理平台

大家好,欢迎来到IT知识分享网。

1 debezium概述

Debezium是一个开源项目,为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台。你可以安装并且配置Debezium去监控你的数据库,然后你的应用就可以消费对数据库的每一个行级别(row-level)的更改。

只有已提交的更改才是可见的,所以你的应用不用担心事务(transaction)或者更改被回滚(roll back)。Debezium为所有的数据库更改事件提供了一个统一的模型,所以你的应用不用担心每一种数据库管理系统的错综复杂性。另外,由于Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉。

监控数据库,并且在数据变动的时候获得通知一直是很复杂的事情。关系型数据库的触发器可以做到,但是只对特定的数据库有效,而且通常只能更新数据库内的状态(无法和外部的进程通信)。一些数据库提供了监控数据变动的API或者框架,但是没有一个标准,每种数据库的实现方式都是不同的,并且需要大量特定的知识和理解特定的代码才能运用。确保以相同的顺序查看和处理所有更改,同时最小化影响数据库仍然非常具有挑战性。

Debezium提供了模块为你做这些复杂的工作。一些模块是通用的,并且能够适用多种数据库管理系统,但在功能和性能方面仍有一些限制。另一些模块是为特定的数据库管理系统定制的,所以他们通常可以更多地利用数据库系统本身的特性来提供更多功能。

2 debezium常见应用场景

Debezium有很多非常有价值的使用场景,我们在这儿仅仅列出几个更常见的使用场景。

3 debezium系统架构

Debezium是一个捕获数据更改(CDC)平台,并且利用Kafka和Kafka Connect实现了自己的持久性、可靠性和容错性。

每一个部署在Kafka Connect分布式的、可扩展的、容错性的服务中的connector监控一个上游数据库服务器,捕获所有的数据库更改,然后记录到一个或者多个Kafka topic(通常一个数据库表对应一个kafka topic)。

Kafka确保所有这些数据更改事件都能够多副本并且总体上有序(Kafka只能保证一个topic的单个分区内有序),这样,更多的客户端可以独立消费同样的数据更改事件而对上游数据库系统造成的影响降到很小(如果N个应用都直接去监控数据库更改,对数据库的压力为N,而用debezium汇报数据库更改事件到kafka,所有的应用都去消费kafka中的消息,可以把对数据库的压力降到1)。

另外,客户端可以随时停止消费,然后重启,从上次停止消费的地方接着消费。每个客户端可以自行决定他们是否需要exactly-once或者at-least-once消息交付语义保证,并且所有的数据库或者表的更改事件是按照上游数据库发生的顺序被交付的。

对于不需要或者不想要这种容错级别、性能、可扩展性、可靠性的应用,他们可以使用内嵌的Debezium connector引擎来直接在应用内部运行connector。这种应用仍需要消费数据库更改事件,但更希望connector直接传递给它,而不是持久化到Kafka里。

  • 部署架构
  1. 最常见的是,Debezium是通过Apache Kafka连接部署的。Kafka Connect是一个用于实现和操作的框架和运行时
  • 源连接器,如Debezium,它将数据摄取到Kafka
  • 接收连接器,它将数据从Kafka主题写入到其他系统。
  1. Debezium Server
    Debezium 的另一种部署方法是使用Debezium 服务器。Debezium Server是一个可配置、开箱即用的应用程序。通过Debezium server,可以将源数据库流式变化事件同步到各种不同的消息基础设施。

在这里插入图片描述

  1. 作为嵌入式引擎
    使用Debezium连接器的另一种方法是嵌入式引擎。在这种情况下,Debezium不会通过Kafka Connect运行,而是作为一个嵌入到定制Java应用程序中的库运行。这对于在应用程序内部使用更改事件非常有用,而不需要部署完整的Kafka和Kafka连接集群,或者将更改流到其他消息传递代理(如Amazon Kinesis)。

4 debezium安装

  • kafka connect模式

Kafka Connect当前支持两种模式,standalone和distributed两种模式。

standalone主要用于入门测试,所以我们来实现distributed模式。

Distributed,分布式模式可以在处理工作中自动平衡,允许动态扩展或缩减,并在活动任务以及配置和偏移量提交数据中提供容错能力。和standalone模式非常类似,最大区别在于启动的类和配置参数,参数决定了Kafka Connect流程如果存储偏移量,如何分配工作,在分布式模式下,Kafka Connect将偏移量,配置和任务状态存储在topic中。建议手动创建topic指定分区数,也可以通过配置文件参数自动创建topic。

以Debezium支持MySQL为例(分布式模式)。

先决条件:Kafka集群已安装(包括ZooKeeper)、MySQL已安装并启动实例。

1)下载MySQL connector插件:https://debezium.io/documentation/reference/1.6/install.html

2)在kafka安装目录下创建connect插件目录并配置生效

  • 创建目录connect。 将插件解压缩到该目录下。
  • 修改/config/connect-distributed.properties, 增减一行:
 plugin.path=/opt/kafka/connect 

默认情况下,kafka connect会加载在该目录下的所有插件。

  • 启动connect服务。(该服务可以独立启动,但由于分布式模式下状态信息都保存在kafka中,所以kafka server服务可以先启动)
/opt/kafka/bin/connect-distributed.sh -daemon /opt/kafka/config/connect-distributed.properties 

查看8083端口是否监听:

netstat -nltp | grep 8083 

正常启动情况下,可以看到:

tcp6 0 0 :::8083 :::* LISTEN 6744/java 

5 debezium mysql插件使用

下面我们构造一个实际的使用实例来说明debezium mysql的使用方法。

1)安装mysql,本次用mariadb-10.5.8替代。关于Mariadb的安装方法参考:实时数据同步利器debezium教程

3). 创建表:

 CREATE TABLE IF NOT EXISTS `userinfo`( `userid` int(11) not null, `username` varchar(100) not null, `age` int(11) not null, `num` int(11) not null, PRIMARY KEY ( `userid` ) )ENGINE=InnoDB DEFAULT CHARSET=utf8; 

4). 打开数据库的binlog

首先检查binlog日志选项是否已打开

mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin'; 或者show variables link 'log_bin' 

如果显示为“off”,则需要修改my.cnf文件,在[mysqld]段增加配置项如下:

server-id =  log_bin = mysql-bin binlog_format = ROW binlog_row_image = FULL expire_logs_days = 10 

再次查看日志选项是否已打开:

mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin'; 

binlog选项打开后,执行:show binlog events.,可以看到binlog列表。

{ 
    "name": "debezium-mysql-connector", "config": { 
    "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "168.61.38.11", "database.port": "3306", "database.user": "zst", "database.password": "", "database.server.id": "", "database.server.name": "conndbtest", "database.include.list": "test", "database.history.kafka.bootstrap.servers": "168.61.38.11:9092,168.61.38.12:9092,168.61.38.13:9092,168.61.38.15:9092,168.61.38.16:9092", "database.history.kafka.topic": "debe-mysql", "include.schema.changes": "true" } } 

需要注意的是,server.id可以通过 select @@server_id获取到,该id在my.cnf中配置。server.name则是在此处随便定义一个。通过Postman把该配置打进kafka中,完成启动。
在这里插入图片描述
6)查看kafka,debe-mysql已创建,与此同时,debezium connector会默认创建3个topic为
connect-configs,connect-offsets,connect-status分别存放配置、偏移、状态信息。
执行脚本,查看dbtest-history是否已创建。



./kafka-topics.sh --zookeeper 168.61.38.11:2181 168.61.38.12:2181 168.61.38.13:2181 168.61.38.15:2181 168.61.38.16:2181 --list 

debezium默认情况下,会将同一表上的insert、update、delete等操作日志事件写入到一个kafka topic中,其创建的topic名字规则是:
serverName.databaseName.tableName

alter table userinfo modify username char(50); 

先执行:

./kafka-console-consumer.sh --bootstrap-server 168.61.38.11:9092,168.61.38.12:9092,168.61.38.13:9092,168.61.38.15:9092,168.61.38.16:9092 --topic debe-mysql 

然后修改表,可以看到如下结果

{ "source" : { "server" : "conndbtest" }, "position" : { "transaction_id" : null, "ts_sec" : , "file" : "mysql-bin.000003", "pos" : 47012, "server_id" :  }, "databaseName" : "", "ddl" : "# Dum", "tableChanges" : [ ] } { "source" : { "server" : "conndbtest" }, "position" : { "transaction_id" : null, "ts_sec" : , "file" : "mysql-bin.000003", "pos" : 47124, "server_id" :  }, "databaseName" : "test", "ddl" : "alter table userinfo modify username char(50)", "tableChanges" : [ { "type" : "ALTER", "id" : "\"test\".\"userinfo\"", "table" : { "defaultCharsetName" : "latin1", "primaryKeyColumnNames" : [ "userid" ], "columns" : [ { "name" : "userid", "jdbcType" : 4, "typeName" : "INT", "typeExpression" : "INT", "charsetName" : null, "length" : 11, "position" : 1, "optional" : false, "autoIncremented" : false, "generated" : false }, { "name" : "username", "jdbcType" : 1, "typeName" : "CHAR", "typeExpression" : "CHAR", "charsetName" : "latin1", "length" : 50, "position" : 2, "optional" : false, "autoIncremented" : false, "generated" : false }, { "name" : "age", "jdbcType" : 4, "typeName" : "INT", "typeExpression" : "INT", "charsetName" : null, "length" : 11, "position" : 3, "optional" : false, "autoIncremented" : false, "generated" : false }, { "name" : "num", "jdbcType" : 4, "typeName" : "INT", "typeExpression" : "INT", "charsetName" : null, "length" : 11, "position" : 4, "optional" : false, "autoIncremented" : false, "generated" : false } ] } } ] } 

需要注意的是,结果的json格式是debezium定义好的格式,debezium json格式通常前面定义schema信息,最后才是实际的载荷(payload)信息,详细格式定义可以查看:https://debezium.io/documentation/reference/1.6/connectors/mysql.html

8)在userinfo表上做CUD操作语句,也可以看到相应的结果。

insert into userinfo(userid,username,age,num) values(54,'hhhhh',55,55656); 

则可以消费到如下消息:

{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "userid" }, { "type": "string", "optional": false, "field": "username" }, { "type": "int32", "optional": false, "field": "age" }, { "type": "int32", "optional": false, "field": "num" } ], "optional": true, "name": "conndbtest.test.userinfo.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "userid" }, { "type": "string", "optional": false, "field": "username" }, { "type": "int32", "optional": false, "field": "age" }, { "type": "int32", "optional": false, "field": "num" } ], "optional": true, "name": "conndbtest.test.userinfo.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.Enum", "version": 1, "parameters": { "allowed": "true,last,false" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "sequence" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" } ], "optional": true, "field": "transaction" } ], "optional": false, "name": "conndbtest.test.userinfo.Envelope" }, "payload": { "before": null, "after": { "userid": 54, "username": "hhhhh", "age": 55, "num": 55656 }, "source": { "version": "1.6.1.Final", "connector": "mysql", "name": "conndbtest", "ts_ms": 00, "snapshot": "false", "db": "test", "sequence": null, "table": "userinfo", "server_id": , "gtid": null, "file": "mysql-bin.000003", "pos": 47627, "row": 0, "thread": null, "query": null }, "op": "c", "ts_ms": 11, "transaction": null } } 

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/125919.html

(0)
上一篇 2025-09-22 22:26
下一篇 2025-09-22 22:33

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信