cancal安装对接mysql和kafka

cancal安装对接mysql和kafka其中 kafka 2 12 3 3 2 tgz 中 2 12 是 scala 版本 3 3 2 是 kafka 版本 3 0 以后是不需要 zk 做集群的

大家好,欢迎来到IT知识分享网。

目录

使用场景

安装kafka

 下载kafka

 解压配置:

 启动zookeeper ,然后启动kafka

安装canal

新建目录并且下载canal

 修改canal的数据库配置文件

 修改canal配置文件

 启动 

查看数据采集报错:

测试

测试前提:

测试过程


  • 使用场景

此处canal的角色相当于mysql一个follower的角色。将mysql中master的数据同步到canal,canal又将数据发送到消息队列(此处kafka,也支持其他mq),进而被其他比如redis等用来和数据库进行同步。

  • 安装kafka

  •  下载kafka

  •  解压配置

[root@localhost kafka]# tar -zxvf  kafka_2.12-3.3.2.tgz 

#修改kafka配置文件 server.properties  本次安装是单机 

[root@localhost kafka]#  vi /opt/kafka/kafka_2.12-3.3.2/config/server.properties

因为本次安装是单机安装,只修改了日志路径

log.dirs=/opt/kafka/logs

如果要详细配置以及集群,请参考文章:手把手教你 在linux上安装kafka

  •  启动zookeeper ,然后启动kafka

/opt/kafka/kafka_2.12-3.3.2/bin 目录下

启动zookeeper:(如果zk启动路径配置文件有问题,kafka会存在启动后又停止的问题

zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties

启动kafka

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

  • 安装canal

  • 新建目录并且下载canal

[root@localhost /]# mkdir -p /opt/canal [root@localhost /]# cd /opt/canal/ [root@localhost canal]# wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
  • 下载完后解压
[root@localhost canal]# ls canal.deployer-1.0.19.tar.gz [root@localhost canal]# mkdir /usr/local/canal #解压到/usr/local/canal目录下 [root@localhost canal]# tar -zxvf canal.deployer-1.1.5.tar.gz -C /usr/local/canal

切换目录到/usr/local/canal/ 可以看到有四个文件夹

  •  修改canal的数据库配置文件

[root@localhost canal]# vi conf/example/instance.properties

mysql serverId

此处canal类似作为mysql的一个从机   slaveId必须唯一

canal.instance.mysql.slaveId = 1234

设置数据地址 必须是master地址,此处我只搭建了一台mysql 

我本地是192.168.110.131 根据实际情况进行修改

canal.instance.master.address = 192.168.110.131:3306

设置数据用户名和密码

  •  修改canal配置文件

[root@localhost conf]# vi canal.properties

关键配置文件参数说明:

canal.port = 11111 #对外提供的socket接口。如端口不冲突,使用默认即可

canal.serverMode = kafka # 这一步是关键,选择kafka的推送模式,发送kafka消息

kafka.bootstrap.servers = 127.0.0.1:9092 # 配置kafka服务器地址

  •  启动 

cd /usr/local/canal/bin

sh startup.sh

查看日志

cd /usr/local/canal/logs 有两个日志文件夹

canal中是本服务启动情况  example是数据采集情况

cancal安装对接mysql和kafka

查看数据采集报错:

[root@localhost example]# tail -f /usr/local/canal/logs/example/example.log

出现如下错误:

2023-07-05 20:46:14.731 [destination = example , address = /192.168.110.131:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler – destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: command : ‘show master status’ has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation

这个是由于链接无权限或者mysql没有进行binlog配置导致

1.无权限时候

  CREATE USER canal IDENTIFIED BY ‘canal’;

  GRANT ALL PRIVILEGES ON *.* TO ‘canal’@’%’ ;

  FLUSH PRIVILEGES;

  

2.mysql数据库没有进行binlog配置开启

如下在mysql下面配置binlog相关信息,重新启动数据库可以解决。

cancal安装对接mysql和kafka

然后启动保证启动成功。

  • 测试

  • 测试前提:

  • mysql已经启动;
  • 启动kafka(先启动zookeeper,然后启动kafka);
  • 启动canal 。
  • 测试过程

查看topic信息

[root@localhost kafka_2.12-3.3.2]# bin/kafka-topics.sh –list –bootstrap-server localhost:9092

#数据库添加一条数据再次查看 已经有了topic为example

#监控kafka收到的数据情况 数据新添加一条数据 ,kafka收到如下图红色标记数据

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/129193.html

(0)
上一篇 2025-08-25 22:00
下一篇 2025-08-25 22:15

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信