运维学习————kafka(1)

运维学习————kafka(1)kafka 中文文档 kafka 是由 apache 软件基金会开发的一个开源流处理框架 由 JAVA 和 scala 语言编写

大家好,欢迎来到IT知识分享网。

关于消息队列的概念,三个作用(解耦、异步、削峰),使用场景,消息队列的两种模式,常见的MQ的框架等等,可以看我之前的RabbitMQ的文章,那里有详细介绍,这里就不过多赘述了……..直接就kafka走起

一、Kafka简介 

中文网址:kafka中文文档

Kafka官网:http://kafka.apache.org/

kafka是由apache软件基金会开发的一个开源流处理框架,由JAVA和scala语言编写。是一个高吞吐量的分布式的发布和订阅消息的一个系统。Kafka用于构建实时的数据管道和流式的app.它可以水平扩展,高可用,速度快,并且已经运行在数千家公司的生产环境。

二、基本术语 

topic(话题):kafka将消息分门别类,每一类的消息称之话题,是逻辑上的一个概念,如果是真正到磁盘上,映射的是一个partition分区的一个目录。

生产者(producer):发布消息的对象称之为生产者,只负责数据的产生,生产的来源,可以不在kafka集群上,而是来自其他的业务系统。

消费者(consumer):订阅消息并处理发布消息的对象,称为消费者。

消费者组(consumerGroup):多个消费者可以构成消费者组,同一个消费者组的消费者,只能消费一个topic数据,不能重复消费。

broker :kafka本身可以是一个集群,集群中的每一个服务器都是一个代理,这个代理称为broker。只负责消息的存储,不管生产者和消费者,和他们没有任何关系。在集群中每个broker有唯一个ID,不能重复。

三、集群搭建

以下操作需要同时在三台机子进行,不想的话,可以先操作一台,然后把这一台的配置全部发送到另外两台,这个时候就需要在第一台机子上做免密登录的操作,保证第一个机子可以发送文件到另外两台

kafka_2.12-2.7.0.tar:kafka-2.12-2.7.0.tar

#解压缩 tar -xzvf /software/kafka_2.12-2.7.0.tgz -C /usr/ #修改名称 mv /usr/kafka_2.12-2.7.0/  /usr/kafka #配置环境变量 vi /etc/profile #添加以下代码 export KAFKA_HOME=/usr/kafka export PATH=$PATH:$ZK_HOME/bin:$KAFKA_HOME/bin #使配置文件生效 source /etc/profile #测试,是否生效 echo $KAFKA_HOME

运维学习————kafka(1)

kafka是使用scala 语言编写 2.12是该语言的版本 2.7.0这是kafka版本

#进入kafka目录 cd /usr/kafka #创建目录(存放消息),为后面配置做准备 mkdir logs #修改配置server.properties文件 vim /usr/kafka/config/server.properties

server.properties配置

#broker的全局唯一编号,不能重复 broker.id=0 #是否允许删除topic delete.topic.enable=true #处理网络请求和响应的线程数量 num.network.threads=3 #用来处理磁盘IO的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes= #接收套接字的缓冲区大小 socket.receive.buffer.bytes= #请求套接字的最大缓冲区大小 socket.request.max.bytes= #kafka运行日志存放的路径 log.dirs=/usr/kafka/logs #topic在当前broker上的分区个数 num.partitions=1 #用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 # Internal Topic Settings # # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 #以下配置控制日志段的处理。可以将策略设置为在一段时间后或在给定大小累积后删除段。只要满足这些条件中的*任一*项,就会删除段。删除总是从日志的末尾开始 #segment文件保留的最长时间,超时将被删除,单位小时,默认是168小时,也就是7天 log.retention.hours=168 #基于大小的日志保留策略。除非剩余的段低于log.retention.bytes,否则将从日志中删除段。独立于log.retention.hours的功能 #log.retention.bytes= #日志段文件的最大大小。当达到此大小时,将创建一个新的日志段。 log.segment.bytes= #检查日志段以查看是否可以根据保留策略删除日志段的间隔 log.retention.check.interval.ms= # Zookeeper # #配置连接Zookeeper集群地址 zookeeper.connect=cluster1:2181,cluster2:2181,cluster3:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=18000 # Group Coordinator Settings # group.initial.rebalance.delay.ms=0 

集群配置那里,需要先在三个机子的/etc/hosts下添加这三个机子的ip配置

192.168.37.152 cluster1

192.168.37.153 cluster2

192.168.37.154 cluster3

 producer.properties

#修改producer.properties vim /usr/kafka/config/producer.properties #修改这一行就行 bootstrap.servers=cluster1:9092,cluster2:9092,cluster3:9092

consumer.properties

#修改consumer.properties vim /usr/kafka/config/consumer.properties #修改这一行就行 bootstrap.servers=cluster1:9092,cluster2:9092,cluster3:9092

免密登录

#发送配置好的kafka到另外两台机子(先做免密登录): ssh-keygen -t rsa ssh-copy-id cluster2 ssh-copy-id cluster3 scp -r /usr/kafka/  cluster2:/usr/ scp -r /usr/kafka/  cluster3:/usr/ #检查发送是否成功,在all session执行: ls /usr #修改broker.id(切记) 在cluster2和cluster3上修改broker.id vim /usr/kafka/config/server.properties broker.id=2 broker.id=3 #发送环境变量配置文件: scp -r /etc/profile cluster2:/etc/ scp -r /etc/profile cluster3:/etc/ #在all session执行: source /etc/profile echo $KAFKA_HOME #发送hosts配置文件: scp -r /etc/hosts cluster2:/etc/ scp -r /etc/hosts cluster3:/etc/ #测试是否成功:在all session执行: cat /etc/hosts

四、集群的启动和关闭

启动kafka之前一定要保证zk在启动,并且可用

启动zookeeper

启动zk ,关于zk脚本同时启动,可以参考我的另一个文章

运维学习————kafka(1)

zk启动之后的效果:

运维学习————kafka(1)

启动kafka

在all session中执行:

#启动kafka kafka-server-start.sh -daemon /usr/kafka/config/server.properties #检查是否运行成 jps ps -ef | grep java #停止kafka kafka-server-stop.sh 

五、常用命令

#查看当前服务器中的所有topic主题: kafka-topics.sh --zookeeper cluster1:2181 --list #如果是zk集群可以使用这样的命令: kafka-topics.sh --zookeeper cluster1:2181,cluster2:2181,cluster3:2181 --list #创建topic: list kafka-topics.sh --zookeeper cluster2:2181 --create --replication-factor 3 --partitions 5 --topic ordertopic kafka-topics.sh --zookeeper cluster2:2181 --create --replication-factor 2 --partitions 2 --topic goodstopic #删除topic kafka-topics.sh --zookeeper cluster1:2181 --delete --topic goodstopic #生产消息 kafka-console-producer.sh --broker-list cluster2:9092 --topic goodstopic #消费消息 kafka-console-consumer.sh  --bootstrap-server cluster2:9092  --from-beginning  --topic  goodstopic #同组消费者消费消息(多个窗口) kafka-console-consumer.sh --bootstrap-server kafka1:9092 --consumer-property group.id=gtest --from-beginning --topic goodstopic #查看一个topic详情 kafka-topics.sh --zookeeper  cluster2:2181,cluster1:2181 --describe --topic tp1 

参数说明:

–zookeeper    链接zk

–replication-factor   指定副本数目(副本数目不能大于总的brokers数目)

–partitions  指定分区数

–topic  指定topic名称

运维学习————kafka(1)

partitioncount   分区总数量

replicationfactor    副本数量

partition 分区

leader  每个分区有3个副本,每个副本都有leader

replicas   所有副本节点,不管leader follower

isr: 正在服务中的节点

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/99726.html

(0)
上一篇 2026-01-31 13:20
下一篇 2022-12-16 16:40

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信