开源的数据库增量订阅和消费的中间件——Cancl

开源的数据库增量订阅和消费的中间件——Cancl是一个阿里巴巴开源的数据同步工具 主要用于 MySQL 数据库的增量数据捕获 ChangeDataCa CDC

大家好,欢迎来到IT知识分享网。

目录

工作原理

MySQL主备复制原理

Canal 工作原理

主要功能和特点

应用场景

实验准备

安装JDK11

下载MySQL8.0

配置canal.admin

配置canal-deployer

测试数据读取

新增一台主机用做被同步的目标机器测试


官方地址:https://github.com/alibaba/canal?tab=readme-ov-file 

开源的数据库增量订阅和消费的中间件——Cancl

Canal 是一个阿里巴巴开源的数据同步工具,主要用于 MySQL 数据库的增量数据捕获(Change Data Capture, CDC)。Canal 能够模拟 MySQL 的主从复制协议,解析 MySQL 的 binlog 日志,实时捕获数据库的增量数据变化,并将这些变化数据提供给其他系统。  

 

工作原理

MySQL主备复制原理
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)。MySQL 的 binlog 日志记录的是二进制格式的数据,但这些二进制数据并不直接是机器码或类似的低级别编码,而是 MySQL 自己定义的一种结构化二进制格式。
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
Canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

 

主要功能和特点

 

实时捕获数据变更

  • Canal 能够通过解析 MySQL binlog 实时捕获数据库的数据变更,如增、删、改操作。

支持多种目标数据源

  • 捕获的数据变更可以同步到各种目标数据源,如 Elasticsearch、Kafka、RocketMQ、HBase 等,满足不同的实时数据处理需求。

主从复制机制模拟

  • Canal 模拟 MySQL 的从库,通过基于主从复制协议的方式来订阅和解析 binlog,实现数据捕获。

高可用与容错

  • 支持集群部署和高可用配置,能保证在单节点故障时,其他节点继续工作。

多种数据格式支持

  • 支持 JSON、Protobuf 等多种数据格式,方便与不同的数据处理系统集成。

 

应用场景

 

数据同步

  • 实现 MySQL 数据到其他数据库或大数据系统(如 HBase、Elasticsearch)的实时同步。

数据增量推送

  • 实现将 MySQL 数据库的增量更新实时推送到消息队列或缓存系统,以应对高并发读写场景。

多活数据中心

  • 通过 Canal 实现多个数据中心间的数据同步,保证各数据中心的数据一致性。

实时分析

  • 实现 MySQL 数据的实时采集与分析,常用于实时监控、报警等场景。

 

实验准备

 

一台新虚拟机,关闭防火墙和selinux,进行时间同步

主机名 系统 IP 配置 工具版本
master

Rocky_linux9.4

192.168.226.26 2核心4G内存-20G磁盘

MySQL8.0

同步阿里云的镜像源

sed -e 's|^mirrorlist=|#mirrorlist=|g' \ -e 's|^#baseurl=http://dl.rockylinux.org/$contentdir|baseurl=https://mirrors.aliyun.com/rockylinux|g' \ -i.bak \ /etc/yum.repos.d/rocky*.repo tput bold tput setaf 2 echo "YUM 源配置已更新。" tput sgr0 dnf makecache yum -y install epel-release

安装JDK11

Java Archive Downloads – Java SE 11

开源的数据库增量订阅和消费的中间件——Cancl这里我已经上传好了

[root@master ~]# ll total  -rw-------. 1 root root 815 Jun 6 14:00 anaconda-ks.cfg -rw-r--r-- 1 root root  Jul 29 19:31 jdk-11.0.23_linux-x64_bin.rpm 

安装

[root@master ~]# yum install -y jdk-11.0.23_linux-x64_bin.rpm [root@master ~]# java -version java version "11.0.23" 2024-04-16 LTS Java(TM) SE Runtime Environment 18.9 (build 11.0.23+7-LTS-222) Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.23+7-LTS-222, mixed mode) 

JDK的默认安装目录位置在:/usr/lib/jvm/jdk-11-oracle-x64/ 

下载MySQL8.0

 

下载mysql8.0

[root@master ~]# yum install -y mysql-server 

修改配置文件

[root@master ~]# vim /etc/my.cnf.d/mysql-server.cnf #最后面加入下述配置 default-authentication-plugin=mysql_native_password server_id=1 log_bin=binlog binlog_format=ROW

启动并设置开机自启

[root@master ~]# systemctl enable --now mysqld 

登陆mysql,进行初始化配置

[root@master ~]# mysql Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 8 Server version: 8.0.36 Source distribution Copyright (c) 2000, 2024, Oracle and/or its affiliates. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> ALTER USER 'root'@'localhost' IDENTIFIED BY '1234'; -- 修改 root 用户的密码为 '1234' Query OK, 0 rows affected (0.01 sec) mysql> CREATE USER canal IDENTIFIED BY 'canal'; -- 创建一个名为 canal 的新用户,密码为 'canal' Query OK, 0 rows affected (0.01 sec) mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' WITH GRANT OPTION; --给canal用户授权 Query OK, 0 rows affected (0.01 sec) mysql> FLUSH PRIVILEGES; -- 刷新权限表,使前面的更改生效 Query OK, 0 rows affected (0.00 sec) 

配置canal.admin

 

下载当前时间最新稳定版本canal-1.1.7

https://github.com/alibaba/canal/releases/tag/canal-1.1.7

开源的数据库增量订阅和消费的中间件——Cancl

上传至虚拟机,这里我上传好了

[root@master ~]# ll total  -rw-------. 1 root root 815 Jun 6 14:00 anaconda-ks.cfg -rw-r--r-- 1 root root  Aug 16 13:19 canal.admin-1.1.7.tar.gz -rw-r--r-- 1 root root  Aug 16 13:20 canal.deployer-1.1.7.tar.gz

创建canal文件夹

[root@master ~]# mkdir -p /usr/local/canal/{canal-admin,canal-deployer} 

解压canal.admin

[root@master ~]# tar -xf canal.admin-1.1.7.tar.gz -C /usr/local/canal/canal-admin/ 

配置文件

[root@master ~]# vim /usr/local/canal/canal-admin/conf/application.yml # 修改如下配置 server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 127.0.0.1:3306 # mysql主机的ip和端口,这里就是在本机上 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true # 注意,在这里添加了时区和对mysql8.0的适配,useSSL=false后面的内容在mysql5.7中可以删除 hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: admin 

导入sql

[root@master ~]# mysql -p1234 < /usr/local/canal/canal-admin/conf/canal_manager.sql

启动 Canal Admin 服务

[root@master ~]# cd /usr/local/canal/canal-admin/bin [root@master bin]# sh startup.sh [root@master bin]# ss -tnlp State Recv-Q Send-Q Local Address:Port Peer Address:Port Process LISTEN 0 128 0.0.0.0:22 0.0.0.0:* users:(("sshd",pid=827,fd=3)) LISTEN 0 100 0.0.0.0:8089 0.0.0.0:* users:(("java",pid=15192,fd=108)) LISTEN 0 70 *:33060 *:* users:(("mysqld",pid=15114,fd=21)) LISTEN 0 128 [::]:22 [::]:* users:(("sshd",pid=827,fd=4)) LISTEN 0 151 *:3306 *:* users:(("mysqld",pid=15114,fd=23)) 

日志目录:/usr/local/canal/canal-admin/logs 

浏览器访问IP+8089端口, 这里我访问:http://192.168.226.26:8089/

默认用户名:admin   默认登陆密码:

开源的数据库增量订阅和消费的中间件——Cancl开源的数据库增量订阅和消费的中间件——Cancl 

配置canal-deployer

部署canal-deployer

[root@master bin]# cd [root@master ~]# tar -xf canal.deployer-1.1.7.tar.gz -C /usr/local/canal/canal-deployer/ 

 配置文件

[root@master ~]# vim /usr/local/canal/canal-deployer/conf/canal_local.properties # register ip canal.register.ip = 192.168.226.26 # canal admin config canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CFFC58AAB1D # admin auto register canal.admin.register.auto = true canal.admin.register.cluster = canal.admin.register.name = canal-admin 

启动 Canal 部署器

[root@master ~]# cd /usr/local/canal/canal-deployer/bin/ [root@master bin]# sh startup.sh local [root@master bin]# ss -tnlp State Recv-Q Send-Q Local Address:Port Peer Address:Port Process LISTEN 0 50 0.0.0.0:11111 0.0.0.0:* users:(("java",pid=15793,fd=86)) LISTEN 0 50 0.0.0.0:11110 0.0.0.0:* users:(("java",pid=15793,fd=90)) LISTEN 0 3 0.0.0.0:11112 0.0.0.0:* users:(("java",pid=15793,fd=77)) LISTEN 0 128 0.0.0.0:22 0.0.0.0:* users:(("sshd",pid=827,fd=3)) LISTEN 0 100 0.0.0.0:8089 0.0.0.0:* users:(("java",pid=15192,fd=108)) LISTEN 0 70 *:33060 *:* users:(("mysqld",pid=15114,fd=21)) LISTEN 0 128 [::]:22 [::]:* users:(("sshd",pid=827,fd=4)) LISTEN 0 151 *:3306 *:* users:(("mysqld",pid=15114,fd=23)) 

日志目录:/usr/local/canal/canal-deployer/logs/canal 

​ # 登录mysql查看binlog日志 [root@master ~]# mysql -p1234 mysql: [Warning] Using a password on the command line interface can be insecure. Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 71 Server version: 8.0.36 Source distribution Copyright (c) 2000, 2024, Oracle and/or its affiliates. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> SHOW MASTER STATUS; --查看当前正在使用的 binlog 文件 +---------------+----------+--------------+------------------+-------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | +---------------+----------+--------------+------------------+-------------------+ | binlog.000003 | 29771 | | | | +---------------+----------+--------------+------------------+-------------------+ 1 row in set (0.00 sec) # 修改配置,将上方查到的binlog日志和位置数写进去 [root@master ~]# vim /usr/local/canal/canal-deployer/conf/example/instance.properties canal.instance.master.journal.name=binlog.000003 canal.instance.master.position=29771

 开源的数据库增量订阅和消费的中间件——Cancl

 页面查看server注册

开源的数据库增量订阅和消费的中间件——Cancl

开源的数据库增量订阅和消费的中间件——Cancl 

 

测试数据读取

[root@master conf]# cd [root@master ~]# yum install -y python-pip [root@master ~]# pip3 install protobuf==3.20.1 canal-python -i https://mirrors.aliyun.com/pypi/simple/ 
[root@master ~]# vim client.py import time # 从 canal.client 和 canal.protocol 导入所需的模块 from canal.client import Client from canal.protocol import EntryProtocol_pb2 from canal.protocol import CanalProtocol_pb2 # 创建 Canal 客户端实例 client = Client() # 连接到 Canal 服务器 client.connect(host='127.0.0.1', port=11111) # 检查客户端连接是否有效(无需提供用户名和密码) client.check_valid(username=b'', password=b'') # 订阅 Canal 实例(destination)和表的变更(filter) client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*') # 循环获取数据并处理 while True: # 从 Canal 服务器获取数据,最多获取 100 条消息 message = client.get(100) entries = message['entries'] # 提取消息中的 entries(数据库变更条目) # 遍历所有条目 for entry in entries: entry_type = entry.entryType # 获取条目的类型 # 如果条目是事务开始或事务结束,则跳过 if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]: continue # 解析行变更数据 row_change = EntryProtocol_pb2.RowChange() row_change.MergeFromString(entry.storeValue) # 从字节流中解析 RowChange 数据 event_type = row_change.eventType # 获取事件类型 header = entry.header # 获取条目的头部信息 database = header.schemaName # 获取数据库名 table = header.tableName # 获取表名 event_type = header.eventType # 获取事件类型(与 RowChange 中的 eventType 一致) # 遍历所有行数据 for row in row_change.rowDatas: format_data = dict() # 初始化数据格式字典 # 根据事件类型处理数据 if event_type == EntryProtocol_pb2.EventType.DELETE: # 删除事件:仅处理删除前的数据 for column in row.beforeColumns: format_data = { column.name: column.value } elif event_type == EntryProtocol_pb2.EventType.INSERT: # 插入事件:仅处理插入后的数据 for column in row.afterColumns: format_data = { column.name: column.value } else: # 更新事件:处理更新前后的数据 format_data['before'] = dict() # 初始化删除前数据字典 format_data['after'] = dict() # 初始化插入后数据字典 # 处理删除前数据 for column in row.beforeColumns: format_data['before'][column.name] = column.value # 处理插入后数据 for column in row.afterColumns: format_data['after'][column.name] = column.value # 构建数据字典 data = dict( db=database, # 数据库名 table=table, # 表名 event_type=event_type, # 事件类型 data=format_data, # 数据 ) print(data) # 打印数据 # 暂停 1 秒钟 time.sleep(1) # 断开与 Canal 服务器的连接 client.disconnect()

运行后会看到终端一些数据弹出,就是在读取展示

[root@master ~]# python3 client.py connected to 127.0.0.1:11111 Auth succed Subscribe succed {'db': 'canal_manager', 'table': 'canal_config', 'event_type': 1, 'data': {'modified_time': '2024-08-16 16:27:01'}} {'db': 'canal_manager', 'table': 'canal_instance_config', 'event_type': 1, 'data': {'modified_time': '2024-08-16 19:36:10'}} 。。。。。。 。。。 。

此时新开一个终端,尝试写入数据,然后回到运行的python代码终端查看

-- 创建数据库 CREATE DATABASE IF NOT EXISTS test_db; -- 切换到新创建的数据库 USE test_db; -- 创建表 CREATE TABLE IF NOT EXISTS employees ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100) NOT NULL, position VARCHAR(100), salary DECIMAL(10, 2), hire_date DATE ); -- 插入几条数据 INSERT INTO employees (name, position, salary, hire_date) VALUES ('Alice', 'Software Engineer', 85000.00, '2023-02-15'), ('Bob', 'Project Manager', 95000.00, '2022-11-20'), ('Charlie', 'Data Analyst', 70000.00, '2024-01-10'); -- 查询表中的所有数据以确认插入成功 SELECT * FROM employees; INSERT INTO employees (name, position, salary, hire_date) VALUES ('Charlie Zhang', 'Sales', 68000.00, '2021-12-01'); INSERT INTO employees (name, position, salary, hire_date) VALUES ('Alice Wang', 'Engineering', 75000.00, '2022-01-15'); INSERT INTO employees (name, position, salary, hire_date) VALUES ('Bob Li', 'HR', 72000.00, '2022-05-01'); UPDATE employees SET salary = 80000.00, position = 'Product' WHERE name = 'Alice Wang'; UPDATE employees SET position = 'HR', hire_date = '2022-05-10' WHERE name = 'Bob Li'; DELETE FROM employees WHERE name = 'Charlie Zhang'; DELETE FROM employees WHERE position = 'Sales';

查看运行python代码的终端

开源的数据库增量订阅和消费的中间件——Cancl

 

新增一台主机用做被同步的目标机器测试

新增一台新虚拟机,关闭防火墙和selinux,进行时间同步,配置阿里云的镜像源,安装jdk,这里不在赘述。

主机名 系统 IP 配置 工具版本
localhost

Rocky_linux9.4

192.168.226.29 2核心4G内存-20G磁盘

下载mysql8.0并配置

[root@localhost ~]# yum install -y mysql-server [root@localhost ~]# vim /etc/my.cnf.d/mysql-server.cnf #加入下述配置 default-authentication-plugin=mysql_native_password server_id=2 log_bin=binlog binlog_format=ROW [root@localhost ~]# systemctl enable --now mysqld [root@localhost ~]# mysql Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 9 Server version: 8.0.36 Source distribution Copyright (c) 2000, 2024, Oracle and/or its affiliates. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> ALTER USER 'root'@'localhost' IDENTIFIED BY ''; -- 修改 root 用户的密码为 '' Query OK, 0 rows affected (0.00 sec) mysql> CREATE USER canal IDENTIFIED BY 'canal'; -- 创建一个名为 canal 的新用户,密码为 'canal' Query OK, 0 rows affected (0.01 sec) mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' WITH GRANT OPTION; --给canal用户授权 Query OK, 0 rows affected (0.01 sec) mysql> FLUSH PRIVILEGES; -- 刷新权限表,使前面的更改生效 Query OK, 0 rows affected (0.00 sec)

上传并解压canal.deployer-1.1.7.tar.gz

[root@localhost ~]# ll total  -rw-------. 1 root root 815 Jun 6 14:00 anaconda-ks.cfg -rw-r--r-- 1 root root  Aug 16 13:20 canal.deployer-1.1.7.tar.gz [root@localhost ~]# mkdir -p /usr/local/canal [root@localhost ~]# tar -xf canal.deployer-1.1.7.tar.gz -C /usr/local/canal/ 

编辑配置文件

# 找到如下三个字段,需要求修改即可 [root@localhost ~]# vim /usr/local/canal/conf/canal_local.properties # Canal 注册的 IP 地址 # 用于 Canal 实例的注册过程中的 IP 地址 canal.register.ip = 192.168.226.29 # Canal 管理端的 IP 地址和端口 # 通过此 IP 和端口可以访问 Canal 管理界面 canal.admin.manager = 192.168.226.26:8089 # Canal 管理端的注册名称 # 用于标识 Canal 实例在注册过程中使用的名称 canal.admin.register.name = hello 
[root@localhost ~]# sh /usr/local/canal/bin/startup.sh local [root@localhost ~]# ss -tnlp State Recv-Q Send-Q Local Address:Port Peer Address:Port Process LISTEN 0 128 0.0.0.0:22 0.0.0.0:* users:(("sshd",pid=850,fd=3)) LISTEN 0 50 0.0.0.0:11111 0.0.0.0:* users:(("java",pid=2045,fd=85)) LISTEN 0 50 0.0.0.0:11110 0.0.0.0:* users:(("java",pid=2045,fd=89)) LISTEN 0 3 0.0.0.0:11112 0.0.0.0:* users:(("java",pid=2045,fd=76)) LISTEN 0 151 *:3306 *:* users:(("mysqld",pid=14045,fd=24)) LISTEN 0 70 *:33060 *:* users:(("mysqld",pid=14045,fd=21)) LISTEN 0 128 [::]:22 [::]:* users:(("sshd",pid=850,fd=4)) 
[root@localhost ~]# mysql -p mysql: [Warning] Using a password on the command line interface can be insecure. Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 11 Server version: 8.0.36 Source distribution Copyright (c) 2000, 2024, Oracle and/or its affiliates. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> SHOW MASTER STATUS; +---------------+----------+--------------+------------------+-------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | +---------------+----------+--------------+------------------+-------------------+ | binlog.000002 | 1370 | | | | +---------------+----------+--------------+------------------+-------------------+ 1 row in set (0.01 sec) 

开源的数据库增量订阅和消费的中间件——Cancl

开源的数据库增量订阅和消费的中间件——Cancl 

现在回到192.168.226.26主机

[root@master ~]# vim client.py # 修改这两处地方 client.connect(host='192.168.226.29', port=11111) client.subscribe(client_id=b'1001', destination=b'hello', filter=b'.*\\..*') 

开源的数据库增量订阅和消费的中间件——Cancl

再次运行python文件

[root@master ~]# python3 client.py 

再回到192.168.226.29主机的mysql中,执行一些sql语句

-- 创建数据库 CREATE DATABASE people_db; -- 使用数据库 USE people_db; -- 创建表 CREATE TABLE people ( id INT AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(50) NOT NULL, last_name VARCHAR(50) NOT NULL, date_of_birth DATE, email VARCHAR(100), phone_number VARCHAR(15) ); -- 插入数据 INSERT INTO people (first_name, last_name, date_of_birth, email, phone_number) VALUES ('Alice', 'Smith', '1990-05-15', '', '123-456-7890'), ('Bob', 'Johnson', '1985-08-20', '', '234-567-8901'), ('Charlie', 'Williams', '2000-01-30', '', '345-678-9012'), ('Diana', 'Brown', '1992-11-25', '', '456-789-0123'); -- 更新数据 UPDATE people SET email = '', phone_number = '999-888-7777' WHERE first_name = 'Bob' AND last_name = 'Johnson'; -- 删除数据 DELETE FROM people WHERE first_name = 'Charlie' AND last_name = 'Williams'; -- 查询所有数据 SELECT * FROM people; 

在192.168.226.26主机中,查看python运行代码的终端同步的结果:

开源的数据库增量订阅和消费的中间件——Cancl

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/126873.html

(0)
上一篇 2025-09-15 15:00
下一篇 2025-09-15 15:10

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信