大家好,欢迎来到IT知识分享网。
环境准备
- 虚拟机一台
- 安装MySQL:5.7(博主安装到docker上了,安装步骤可参考: Linux安装MySQL或 Docker安装MySQL
- 下载Canal部署端,canal.deployer-1.1.2.tar.gz,下载地址:链接:https://pan.baidu.com/s/1VndAAVgleydhymcyzW56vA?pwd=wjq8
MySQL 的准备
- 创建数据库和表
CREATE TABLE user_info( `id` VARCHAR(255), `name` VARCHAR(255), `sex` VARCHAR(255) );
- 修改配置文件开启 Binlog
在MySQL安装的目录,找到修改配置文件my.cnf。--进入容器 如果映射了虚拟容器目录 可直接修改映射目录的文件位置 没有映射就进入容器中修改 docker exec -it mysql /bin/bash --进入配置文件目录 cd /etc/mysql/ --在mysqld配置下加入以下内容 cat my.cnf [mysqld] server-id=1 log-bin=mysql-bin binlog_format=row binlog-do-db=canal
- 重启 MySQL 使配置生效
docker restart mysql
- 测试 Binlog 是否开启
--进入存储binlog日志目录(还在容器中) cd /var/lib/mysql ls -l mysql
- 往表中插入一条数据,查看日志文件大小是否变化
日志文件大小发生变化,证明日志记录成功。
- 创建用户赋予权限(可省略,配置时,可使用root用户)
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ; #查看 master 节点状态 show master status
Canal 的下载和安装
- 下载 canal.deployer-1.1.2.tar.gz
- 上传至虚拟机上
- 解压
tar -zxvf canal.deployer-1.1.2.tar.gz -C 自己目录
- 修改 canal.properties 的配置
cd /mydata/canal/conf vim canal.properties
- 说明:这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,修改 canal 的
输出 model,默认 tcp - 一个 canal 服务 中可以有多个 instance,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改canal.properties 中的 canal.destinations=实例 1,实例 2,实例 3。
- 说明:这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,修改 canal 的
- 修改 instance.properties
- 我们这里只读取一个 MySQL 数据,所以只有一个实例,这个实例的配置文件在
conf/example 目录下cd /mydata/canal/conf/example vim instance.properties
- 配置 MySQL 服务器地址,也可以直接配置成虚拟机ip:端口号,如:192.168.56.10:3306。
- 配置连接 MySQL 的用户名和密码,默认就是我们前面授权的 canal
- 我们这里只读取一个 MySQL 数据,所以只有一个实例,这个实例的配置文件在
建项目,配置 pom.xml
- 建立maven项目,略。
- 导入pox
<dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> </dependencies>
- 编写监视类 –CanalClient
public class CanalClient { public static void main(String[] args) throws InvalidProtocolBufferException { //1.获取 canal 连接对象 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.56.10", 11111), "example", "", ""); while (true) { //2.获取连接 canalConnector.connect(); //3.指定要监控的数据库 canalConnector.subscribe("canal.*"); //4.获取 Message 获取数据,自动进行确认,该方法返回的条件:尝试拿batchSize条记录,有多少取多少,不会阻塞等待 Message message = canalConnector.get(100); List<CanalEntry.Entry> entries = message.getEntries(); if (entries.size() <= 0) { System.out.println("没有数据,休息一会"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { for (CanalEntry.Entry entry : entries) { //TODO 获取表名 String tableName = entry.getHeader().getTableName(); // TODO Entry 类型 CanalEntry.EntryType entryType = entry.getEntryType(); // TODO 判断 entryType 是否为 ROWDATA if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { // TODO 序列化数据 ByteString storeValue = entry.getStoreValue(); // TODO 反序列化 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); //TODO 获取事件类型 CanalEntry.EventType eventType = rowChange.getEventType(); //TODO 获取具体的数据 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); //TODO 遍历并打印数据 for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); JSONObject beforeData = new JSONObject(); for (CanalEntry.Column column : beforeColumnsList) { beforeData.put(column.getName(), column.getValue()); } JSONObject afterData = new JSONObject(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { afterData.put(column.getName(), column.getValue()); } System.out.println("TableName:" + tableName + ",EventType:" + eventType + ",After:" + beforeData + ",After:" + afterData); } } } } } } }
- Message中大致的属性如下
启动 canal.deployer-1.1.2
- 进入解压的bin目录
cd /mydata/canal/bin
- 启动 startup.sh
./startup.sh
- 启动后,启动监视类 –CanalClient,修改数据表数据,控制台输出
- 如果控制未打印数据,可能是没有 canal.deployer-1.1.2 服务未启动成功,可进入 log目录查看日志情况,后排查。
cd /mydata/canal/log
- 错误记录
- canal.deployer-1.1.2 启动日志报错:
服务端:com.alibaba.otter.canal.parse.exception.CanalParseException: can’t find start position for examplem,是由于你改了配置文件,导致meta.dat 中保存的位点信息和数据库的位点信息不一致;导致canal抓取不到数据库的动作;
解决方法:删除meta.dat删除,再重启canal,问题解决;
- canal.deployer-1.1.2 启动日志报错:
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/135725.html