Canle搭建和基本使用

Canle搭建和基本使用文章详细介绍了如何在 Docker 中安装 MySQL5 7 配置开启 Binlog 创建数据库和表 然后下载并部署 Canal1 1 2 用于数据库变更监听

大家好,欢迎来到IT知识分享网。

环境准备

  1. 虚拟机一台
  2. 安装MySQL:5.7(博主安装到docker上了,安装步骤可参考: Linux安装MySQL或 Docker安装MySQL
  3. 下载Canal部署端,canal.deployer-1.1.2.tar.gz,下载地址:链接:https://pan.baidu.com/s/1VndAAVgleydhymcyzW56vA?pwd=wjq8

MySQL 的准备

  1. 创建数据库和表
    CREATE TABLE user_info( `id` VARCHAR(255), `name` VARCHAR(255), `sex` VARCHAR(255) ); 
  2. 修改配置文件开启 Binlog
    在MySQL安装的目录,找到修改配置文件my.cnf。
    --进入容器 如果映射了虚拟容器目录 可直接修改映射目录的文件位置 没有映射就进入容器中修改 docker exec -it mysql /bin/bash --进入配置文件目录 cd /etc/mysql/ --在mysqld配置下加入以下内容 cat my.cnf [mysqld] server-id=1 log-bin=mysql-bin binlog_format=row binlog-do-db=canal 

    在这里插入图片描述

  3. 重启 MySQL 使配置生效
    docker restart mysql 
  4. 测试 Binlog 是否开启
    --进入存储binlog日志目录(还在容器中) cd /var/lib/mysql ls -l mysql 

    在这里插入图片描述

  5. 往表中插入一条数据,查看日志文件大小是否变化
    日志文件大小发生变化,证明日志记录成功。
    在这里插入图片描述
    在这里插入图片描述


  6. 创建用户赋予权限(可省略,配置时,可使用root用户)
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ; #查看 master 节点状态 show master status 

    在这里插入图片描述

Canal 的下载和安装

  1. 下载 canal.deployer-1.1.2.tar.gz
  2. 上传至虚拟机上
  3. 解压
    tar -zxvf canal.deployer-1.1.2.tar.gz -C 自己目录 
  4. 修改 canal.properties 的配置
    cd /mydata/canal/conf vim canal.properties 

    在这里插入图片描述

    • 说明:这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,修改 canal 的
      输出 model,默认 tcp
    • 一个 canal 服务 中可以有多个 instance,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改canal.properties 中的 canal.destinations=实例 1,实例 2,实例 3。

  5. 修改 instance.properties
    • 我们这里只读取一个 MySQL 数据,所以只有一个实例,这个实例的配置文件在
      conf/example 目录下
      cd /mydata/canal/conf/example vim instance.properties 


    • 配置 MySQL 服务器地址,也可以直接配置成虚拟机ip:端口号,如:192.168.56.10:3306。
      在这里插入图片描述
    • 配置连接 MySQL 的用户名和密码,默认就是我们前面授权的 canal
      在这里插入图片描述

建项目,配置 pom.xml

  1. 建立maven项目,略。
  2. 导入pox
    <dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> </dependencies> 
  3. 编写监视类 –CanalClient
    public class CanalClient {       public static void main(String[] args) throws InvalidProtocolBufferException {       //1.获取 canal 连接对象 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.56.10", 11111), "example", "", ""); while (true) {       //2.获取连接 canalConnector.connect(); //3.指定要监控的数据库 canalConnector.subscribe("canal.*"); //4.获取 Message 获取数据,自动进行确认,该方法返回的条件:尝试拿batchSize条记录,有多少取多少,不会阻塞等待 Message message = canalConnector.get(100); List<CanalEntry.Entry> entries = message.getEntries(); if (entries.size() <= 0) {       System.out.println("没有数据,休息一会"); try {       Thread.sleep(1000); } catch (InterruptedException e) {       e.printStackTrace(); } } else {       for (CanalEntry.Entry entry : entries) {       //TODO 获取表名 String tableName = entry.getHeader().getTableName(); // TODO Entry 类型 CanalEntry.EntryType entryType = entry.getEntryType(); // TODO 判断 entryType 是否为 ROWDATA if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {       // TODO 序列化数据 ByteString storeValue = entry.getStoreValue(); // TODO 反序列化 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); //TODO 获取事件类型 CanalEntry.EventType eventType = rowChange.getEventType(); //TODO 获取具体的数据 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); //TODO 遍历并打印数据 for (CanalEntry.RowData rowData : rowDatasList) {       List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); JSONObject beforeData = new JSONObject(); for (CanalEntry.Column column : beforeColumnsList) {       beforeData.put(column.getName(), column.getValue()); } JSONObject afterData = new JSONObject(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) {       afterData.put(column.getName(), column.getValue()); } System.out.println("TableName:" + tableName + ",EventType:" + eventType + ",After:" + beforeData + ",After:" + afterData); } } } } } } } 
  4. Message中大致的属性如下
    在这里插入图片描述

启动 canal.deployer-1.1.2

  1. 进入解压的bin目录
    cd /mydata/canal/bin 
  2. 启动 startup.sh
    ./startup.sh 
  3. 启动后,启动监视类 –CanalClient,修改数据表数据,控制台输出

    在这里插入图片描述

  4. 如果控制未打印数据,可能是没有 canal.deployer-1.1.2 服务未启动成功,可进入 log目录查看日志情况,后排查。
     cd /mydata/canal/log 
  5. 错误记录
    • canal.deployer-1.1.2 启动日志报错:
      服务端:com.alibaba.otter.canal.parse.exception.CanalParseException: can’t find start position for examplem,是由于你改了配置文件,导致meta.dat 中保存的位点信息和数据库的位点信息不一致;导致canal抓取不到数据库的动作;
      解决方法:删除meta.dat删除,再重启canal,问题解决

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/135725.html

(0)
上一篇 2025-07-02 18:10
下一篇 2025-07-02 18:15

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信