大家好,欢迎来到IT知识分享网。
文章目录
1、软件下载&配置环境
下面的操作无论是单机部署还是分布式集群环境下都是通用的。
准备好安装的目录,我们将所有软件安装包统一上传至/export/software目录下,下载完毕后软件解压至/export/server:
mkdir -p /export/server /export/software
1_JDK安装
下载地址:https://www.oracle.com/java/technologies/downloads/
下载并上传至服务器对应的目录中。
解压缩JDK安装文件:
tar -zxvf jdk-8u421-linux-x64.tar.gz -C /export/server
配置JDK的软链接:
ln -s /export/server/jdk1.8.0_421 jdk
配置JAVA_HOME环境变量,以及将$JAVA_HOME/bin文件夹加入PATH环境变量中
vim /etc/profile
尾部添加如下:
export JAVA_HOME=/export/server/jdk export PATH=$PATH:$JAVA_HOME/bin
生效环境变量:
source /etc/profile
有些安装的系统可能自带jdk,这种jdk可以使用java -version检查,但如果使用javac就不行了,最好进行卸载。
如果是 yum 安装,可以按照下述流程卸载:
#1.查看CentOS自带JDK是否已安装 yum list installed | grep java #2.假使存在自带的jdk,删除centos自带的JDK yum -y remove java-1.8.0-openjdk* yum -y remove tzdata-java.noarch #删除系统自带的Java程序 rm -f /usr/bin/java #软连接我们自己安装的Java程序 ln -s /export/server/jdk/bin/java /usr/bin/java
rpm安装的,可以按照下述流程卸载:
rpm -qa | grep jdk #查看是否存在安装的jdk rpm -e `rpm -qa | grep jdk` # 卸载,如果存在依赖的情况可以使用下方的强制卸载 rpm -e --nodeps `rpm -qa | grep jdk` # 强制删除jdk
2_Zookeeper安装
下载地址:https://zookeeper.apache.org/releases.html#download
选择历史版本——本次选择3.5.7进行安装下载
下载完成后,上传到服务器后进行解压:
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /export/server
配置 Zookeeper 的软链接:
ln -s apache-zookeeper-3.5.7-bin zookeeper
配置 ZK_HOME 环境变量,以及将$ZK_HOME/bin文件夹加入PATH环境变量中
vim /etc/profile
尾部添加如下:
export ZK_HOME=/export/server/zookeeper export PATH=$PATH:$ZK_HOME/bin
生效环境变量:
source /etc/profile
Zookeeper的下载和安装已经完成,我们接下来直接对其配置进行修改:
cd /export/server/zookeeper/conf #进入zookeeper配置文件目录下 cp zoo_sample.cfg zoo.cfg #将Zookeeper提供的配置文件复制一份,复制成Zookeeper默认寻找的文件
进入 zookeeper 目录下,创建数据存放的目录:
cd /export/server/zookeeper mkdir data
修改 zookeeper 配置文件,将数据存放目录指定为我们创建的目录:
vim /export/server/zookeeper/conf/zoo.cfg 修改以下配置 dataDir=/export/server/zookeeper/data
启动 Zookeeper,Zookeeper的bin目录下为我们提供了服务脚本:
#./zkServer.sh [--config <conf-dir>] {start|start-foreground|stop|restart|status|print-cmd} ./bin/zkServer.sh start zoo.cfg #启动
查看是否启动成功
jps # 看到控制台成功输出 QuorumPeerMain,表示启动成功
使用jps发现Zookeeper成功启动了,也并不代表我们可以正常使用了,使用如下命了查看其状态:
./bin/zkServer.sh status zoo.cfg Mode: standalone表示ok
控制台打印 Mode: standalone 表示可以正常使用了。
3_Kafka安装
下载地址:https://kafka.apache.org/downloads。
选择 kafka_2.12-3.8.0.tgz 进行下载,Scala 2.12 和 Scala 2.13 主要是使用Scala编译的版本不同,两者皆可。
下载完成后,上传到服务器后进行解压:
tar -zxvf kafka_2.12-3.8.0.tgz -C /export/server
配置 Kafka 的软链接:
ln -s kafka_2.12-3.8.0.tgz kafka
配置 KAFKA_HOME 环境变量,以及将$KAFKA_HOME/bin文件夹加入PATH环境变量中
vim /etc/profile
尾部添加如下:
export KAFKA_HOME=/export/server/kafka export PATH=:$PATH:${KAFKA_HOME}
生效环境变量:
source /etc/profile
2、单机安装
Kafka一般情况下都是分布式架构的,单机节点的使用情况基本不存在——了解即可。
按照以下步骤执行:
- 安装JDK1.8+,配置JAVA_HOME (CentOS 64bit)
- 配置主机名和IP映射
- 关闭防火墙 | 防火墙开机自启动
- 安装&启动Zookeeper
- 安装&启动|关闭Kafka
上述有些步骤部分已经完成,不再重复叙述。
1_配置主机名和IP映射
配置主机名——我是用的是SHENYANG,替换成自己的即可:
echo HOSTNAME=SHENYANG >> /etc/sysconfig/network cat /etc/sysconfig/network reboot #重启生效
如果上述方式不成功可以试试下面这种:
hostnamectl set-hostname SHENYANG
配置主机名和IP的映射关系——使用自己当前服务器的主机名和IP:
vi /etc/hosts 192.168.193.141 SHENYANG ping SHENYANG #测试是否ok
如果在内网测试环境下最好关闭防火墙。
2_单机Kafka配置
在Kafka的config目录下存在相关的配置信息——本次我们只想让Kafka快速启动起来只关注server.properties文件即可:
cd ${KAFKA_HOME}/config ls #connect-console-sink.properties connect-file-source.properties consumer.properties server.properties #connect-console-source.properties connect-log4j.properties kraft tools-log4j.properties #connect-distributed.properties connect-mirror-maker.properties log4j.properties trogdor.conf #connect-file-sink.properties connect-standalone.properties producer.properties zookeeper.properties
打开配置文件,并主要注意以下几个配置:
vim server.properties broker.id=0 #kafka服务节点的唯一标识,这里是单机不用修改 # listeners = PLAINTEXT://your.host.name:9092 别忘了设置成自己的主机名 listeners=PLAINTEXT://SHENYANG:9092 #kafka底层监听的服务地址,注意是使用主机名,不是ip。 # log.dirs 指定的目录 kafka启动时可以自动创建,因此不要忘了让kafka可以有读写这个目录的权限。 log.dirs=/export/server/kafka/data kafka的分区以日志的形式存储在集群中(其实就是broker数据存储的目录) # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 #日志的留存策略,默认168小时也就是一周 # zookeeper 的连接地址 ,别忘了设置成自己的主机名,单机情况下可以使用 localhost zookeeper.connect=SHENYANG:2181
上述配置完成后就可以在单机环境下成功启动 Kafka了。
./bin/kafka-server-start.sh -daemon config/server.properties #后台启动kafka
使用 jps 查看是否成功启动kafka:
jps 34843 QuorumPeerMain 21756 Jps Kafka
单机启动完成。
3、集群安装
Kafka 集群模式是kafka最常见的使用架构,按照以下步骤执行:
- 安装JDK1.8+,配置JAVA_HOME (CentOS 64bit)
- 配置主机名和IP映射
- 关闭防火墙 | 防火墙开机自启动
- 同步时钟 ntpdate cn.pool.ntp.org | ntp[1-7].aliyun.com
- 安装&启动Zookeeper
- 安装&启动|关闭Kafka
本次使用三个节点模拟集群环境:CentOSA、CentOSB、CentOSC。
针对每个节点修改主机名称,查看当前主机的ip地址并固定,添加子网掩码,网关,DNS1。
#修改主机名 hostnamectl set-hostname node1 #修改IP地址 vim /etc/sysconfig/network-scripts/ifcfg-ens33 #不要直接copy,针对自己的进行修改 TYPE="Ethernet" PROXY_METHOD="none" BROWSER_ONLY="no" BOOTPROTO="static" DEFROUTE="yes" IPV4_FAILURE_FATAL="no" IPV6INIT="yes" IPV6_AUTOCONF="yes" IPV6_DEFROUTE="yes" IPV6_FAILURE_FATAL="no" IPV6_ADDR_GEN_MODE="stable-privacy" NAME="ens33" UUID="0d7ad579-b7c8-4b92-a94c-00d421" DEVICE="ens33" ONBOOT="yes" IPADDR="192.168.193.151" NETMASK="255.255.255.0" GETEWAY="192.168.193.2" DNS1="192.168.193.2"
按照第一节,每个节点都将软件下载并配置好环境变量和配置信息。
1_配置主机名和IP的映射关系
针对每一个节点的hosts文件添加节点的ip映射信息:
vim /etc/hosts 192.168.193.141 CentOSA 192.168.193.142 CentOSB 192.168.193.143 CentOSC ping #测试
也可以采取修改一个主机的hosts文件,通过 scp的方式进行拷贝——比如在CentOSA节点修改完成后执行:
scp /etc/hosts CentOSB:/etc/ scp /etc/hosts CentOSC:/etc/
别忘了在自行测试时关闭防火墙。
2_时钟同步
因为是在集群环境下,所以不要忘记同步三个主机节点的时钟:
yum install ntp -y ntpdate cn.pool.ntp.org | ntp[1-7].aliyun.com #两个时钟同步地址选择一个就行
3_Zookeeper配置信息
在第一节中我们编辑了 zoo.cfg 文件配置,但是如果是集群环境下,还需要添加服务节点的信息:
vim /export/server/zookeeper/conf/zoo.cfg 额外添加以下配置 server.1=CentOSA:2888:3888 #数据同步端口:领导选举时服务器监听的端口 server.2=CentOSB:2888:3888 server.3=CentOSC:2888:3888
配置完成后通过scp命令将配置文件拷贝到其他节点——假设当前配置的是A节点:
scp -r /export/server/zookeeper/conf/zoo.cfg CentOSB:/export/server/zookeeper/conf/ scp -r /export/server/zookeeper/conf/zoo.cfg CentOSC:/export/server/zookeeper/conf/
当然,也可以只安装并配置一台zookeeper,然后将整个软件拷贝过去。
接下来我们还需要在每个 zookeeper 节点的数据目录下创建对应的服务id文件与上面配置的server.id进行绑定:
# A echo 1 > /export/server/zookeeper/data/myid #在这个文件中写入自己服务的id号 # B echo 2 > /export/server/zookeeper/data/myid # C echo 3 > /export/server/zookeeper/data/myid
接下来启动所有的zookeeper节点。别忘了关防火墙!
Zookeeper的配置文件zoo.cfg中包含多个参数,每个参数都有特定的作用和意义。以下是一些主要参数及其解释:
- tickTime=2000: 通信心跳数,用于设置Zookeeper服务器与客户端之间的心跳时间间隔,单位是毫秒。这个时间间隔是Zookeeper使用的基本时间单位,用于服务器之间或客户端与服务器之间维持心跳的时间间隔。
- initLimit=10: LF初始通信时限,用于设置集群中的Follower跟随者服务器与Leader领导者服务器之间启动时能容忍的最多心跳数。如果在这个时限内(10个心跳时间)领导和根随者没有发出心跳通信,就视为失效的连接,领导和根随者彻底断开。
- syncLimit=5: LF同步通信时限,用于设置集群启动后,Leader与Follower之间的最大响应时间单位。假如响应超过这个时间(syncLimit * tick Time -> 10秒),Leader就认为Follower已经死掉,会将Follower从服务器列表中删除。
- dataDir: 数据文件目录+数据持久化路径,主要用于保存Zookeeper中的数据。
- dataLogDir: 日志文件目录,用于存储Zookeeper的日志文件。
- clientPort=2181: 客户端连接端口,用于监听客户端连接的端口。
以上是zoo.cfg文件中一些主要参数的含义和作用。
4_集群Kafka配置
集群的 server.properties配置文件与单机的有所不同:
cd ${KAFKA_HOME}/config vim server.properties broker.id=0 #kafka服务节点的唯一标识 A 0,B 1,C 2 # listeners = PLAINTEXT://your.host.name:9092 别忘了设置成自己的主机名 listeners=PLAINTEXT://CentOSA:9092 #集群中需要设置成每个节点自己的 # log.dirs 指定的目录 kafka启动时可以自动创建,因此不要忘了让kafka可以有读写这个目录的权限。 log.dirs=/export/server/kafka/data kafka的分区以日志的形式存储在集群中(其实就是broker数据存储的目录) # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 #日志的留存策略,默认168小时也就是一周 # zookeeper 集群的连接地址 zookeeper.connect=CentOSA:2181,CentOSB:2181,CentOSC:2181
在配置完成 A节点后,还是使用 scp命令进行传输配置文件,然后再修改broker.id和listeners:
scp -r /export/server/kafka/config/server.properties CentOSB:/export/server/kafka/config/ scp -r /export/server/kafka/config/server.properties CentOSC:/export/server/kafka/config/ 修改差异配置 cd ${KAFKA_HOME}/config vim server.properties # B节点 broker.id=1 listeners=PLAINTEXT://CentOSB:9092 # C节点 broker.id=2 listeners=PLAINTEXT://CentOSC:9092
配置完成后,kafka集群即可正常启动。集群的配置和单机的配置基本相似。
4、kafka的其他脚本命令
kafka服务还提供了除了启动外的其他命令:
./bin/kafka-server-stop.sh #关闭kafka kafka-console-consumer.sh #消费命令 kafka-console-producer.sh #生产命令 kafka-consumer-groups.sh #查看消费者组,重置消费位点等 kafka-topics.sh #查询topic状态,新建,删除,扩容 kafka-acls.sh #配置,查看kafka集群鉴权信息 kafka-configs.sh #查看,修改kafka配置 kafka-mirror-maker.sh #kafka集群间同步命令 kafka-preferred-replica-election.sh #重新选举topic分区leader kafka-producer-perf-test.sh #kafka自带生产性能测试命令 kafka-reassign-partitions.sh #kafka数据重平衡命令 kafka-run-class.sh #kafka执行脚本
重点关注前面几个,消费者,生产、消费者组和Topic管理:
创建Topic;
#副本因子数不能大于broker数,如果同一台机器上存储了多个备份就多余了 ./bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --create --topic topic02 --partitions 3 --replication-factor 3
查看:
./bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --list
详情:
./bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --describe --topic topic01 Topic:topic01 PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes= Topic: topic01 Partition: 0 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3 Topic: topic01 Partition: 1 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0 Topic: topic01 Partition: 2 Leader: 0 Replicas: 3,0,2 Isr: 0,2,3
修改:
./bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --alter --topic topic01 --partitions 2
删除:
./bin/kafka-topics.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --delete --topic topic03
消费者订阅:
# 别忘了指定消费者组 ./bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01 --group g1 --property print.key=true --property print.value=true --property key.separator=,
生产:
./bin/kafka-console-producer.sh --broker-list CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01
消费组:
./bin/kafka-consumer-groups.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --list g1 ./bin/kafka-consumer-groups.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --describe --group g1 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID topic01 1 0 0 0 consumer-1- /192.168.52.130 consumer-1 topic01 0 0 0 0 consumer-1- /192.168.52.130 consumer-1 topic01 2 1 1 0 consumer-1- /192.168.52.130 consumer-1
5、监控工具Kafka-eagle安装
这是一个监视系统,国内开发的产品,监视您的kafka群集以及可视的使用者线程,偏移量,所有者等。当您安装Kafka Eagle时,用户可以看到当前的使用者组,对于每个组,他们正在消耗的Topic以及该组在每个主题中的偏移量,滞后,日志大小和位置。这对于了解用户从消息队列消耗的速度以及消息队列增加的速度很有用。
下载地址:https://www.kafka-eagle.org/。
安装包下载完成后还是上传到服务器的/export/software文件目录下,,下载完毕后软件解压至/export/server。
本文安装的是3.0.1版本,支持Kafka版本0.8,0.9.x,0.10.x,0.11.x,1.x,2.x
tar -zxvf kafka-eagle-bin-3.0.1.tar.gz -C ../server/
这个软件比较特殊,解压后会发现里面还有一个压缩包,需要再次解压。
cd /export/server/kafka-eagle-bin-3.0.1/ tar -zxvf efak-web-3.0.1-bin.tar.gz -C /export/server/ cd ../ ln -s efak-web-3.0.1 efak
设置环境变量——必须设置,ke.sh启动的脚本中需要这个环境变量
vim /etc/profile #添加以下内容 export KE_HOME=/export/server/efak export PATH=$PATH:$KE_HOME/bin source /etc/profile
修改配置文件system-config.properties:
cd ${KE_HOME}/conf vim system-config.properties
注意以下配置:
#zookeeper集群信息id kafka.eagle.zk.cluster.alias=cluster1 #集群配置的zookeeper节点信息 可以监测这三台机器上的kafka cluster1.zk.list=CentOSA:2181,CentOSB:2181,CentOSC:2181 #有的老版本是把偏移量存储在zookeeper中,这里是存储在kafka中的,默认可以不改 cluster1.kafka.eagle.offset.storage=kafka # 报表信息 开启后kafka必须开启JMX的服务管理端口,用于统计信息 kafka.eagle.metrics.charts=true # mysql的连接信息 eagle 需要将信息存储在数据库中,数据库eagle启动时会自行创建,配置好mysql的地址就行。 kafka.eagle.driver=com.mysql.cj.jdbc.Driver kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=
开启JMX监控,修改kafka启动文件或在启动时添加JMX_PORT=9999参数:
vim ${KAFKA_HOME}/bin/kafka-server-start.sh if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70" export JMX_PORT="9999" #添加此行即可 #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi
重启kafka后,启动 eagle 就可以监控kafka集群信息了。
chmod u+x bin/ke.sh#如果没有文件的执行权限 $KE_HOME/bin/ke.sh start#启动程序
登录账号和密码默认为 admin/——内网测试不要忘记开放对应的访问端口或关闭防火墙。
登录成功后即可看到以上信息。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/112241.html







