Kafka学习笔记总结

Kafka学习笔记总结本文详细探讨了 Kafka 的实现原理 包括生产者客户端实现 服务端网络层交互 数据读写机制 高可用性保障及消费者客户端拉取消息的流程

大家好,欢迎来到IT知识分享网。

目录

0 本文主要涉及

1研究的几个点

2基本概念简介

消息队列

Kafka

Kafka相关概念

Topic

Producer

Consumer

Consumer Group

Broker

Partition

Replica

Kafka数据流概览

Kafka本质

Kafka 使用

Kafka为开发者提供了四类API:

3kafka生产者客户端实现

Producer发送消息流程:

发送消息时方法调用流程:

主线程部分一些关键对象和流程说明:

ProducerInterceptor

集群元数据相关对象及更新操作

Partitioner.partition()方法

RecordAccumulator部分关键对象和流程说明:

MemoryRecords

RecordBatch

RecordAccumulator

Sender线程部分关键对象和流程说明:

发送消息的流程:

NetworkClient

相关方法:

InFlightRequests

DefaultMetadataUpdater

ClusterConnectionStates

4kafka服务端网络层实现,如何与客户端交互

工作原理:

SocketServer相关对象和方法:

SocketServer

Acceptor

Processor 

RequestChannel

KafkaRequestHandlerPool

KafkaRequestHandler

KafkaApis

请求数据从生产者发送到服务端的流转过程

5kafka服务端如何读写消息数据,如何维护日志数据

日志文件结构

偏移量offset

日志读写相关对象和方法

FileMessageSet

OffsetIndex

LogSegment

Log

LogManager

高效日志文件读写原理

6kafka服务端如何保证高可用

KafkaController机制

KafkaController相关功能主要对象和方法说明:

ControllerChannelManager

ControllerContext

状态机

PartitionStateMachine

ReplicaStateMachine

ZooKeeper Listener

副本机制

副本机制主要对象和方法:

7kafka消费者客户端拉取消息

Consumer接口中定义KafkaConsumer对外的API:

一些关键对象说明:

ConsumerNetworkClient

SubscriptionState

消息拉取

提交拉取消息的进度

事务性 / 原子性广播

Consumer Group Rebalance

服务端负载均衡以及消息进度保存相关对象和方法:

GroupCoordinator


0 本文主要涉及

主要为Kafka实现原理学习笔记,以及一些基本的使用和维护方面记录。

1研究的几个点

2基本概念简介

消息队列

Kafka

  • 以时间复杂度为 O(1) 的方式提供消息持久化能力,并保证即使对 TB 级以上数据也能保证常数时间的访问性能
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条消息的传输
  • 支持 Kafka Server 间的消息分区,及分布式消息消费,同时保证每个 Partition 内的消息顺序传输
  • 同时支持离线数据处理和实时数据处理

Kafka相关概念

Topic

Kafka中程序按照Topic分类来维护消息,Producer生产者往 Topic 里写消息,Consumer消费者从Topic读消息,可以将一种Topic 的消息理解为一个队列 Queue

Producer

Producer生产者客户端以 push 模式将消息发布 (publish) 消息到 Topic 某个分区Partition的对应服务端

Consumer

Consumer消费者客户端通过订阅 (subscribe)Topic 获取并处理 Topic 中消息,通过提交偏移量确认消费进度

Consumer Group

用于定义可重复消费同一Topic消息不同分类的一组消费者(同分组消费者数最多不能超过分区数),同一 Topic 的一条消息只能被同一个 Consumer Group 内的一个 Consumer 消费,但可以同时被多个不同Consumer Group 消费

Broker

Kafka以集群的方式运行,集群中的每一台服务器称之为一个代理 (broker),消费者向他发布消息,消费者从他拉取数据Controller Broker 
Kafka集群中的其中一个服务器,用来进行和 zookeeper 通信,管理和协调 Kafka 集群:

  • 维护更新集群元数据信息
  • 创建 topic,删除topic
    Replica副本Leader选举,维护副本状态
    Broker加入集群、崩溃等情况处理
    受控关闭


  • Controller Leader选举
  • Partition分区分配,维护分区状态

GroupCoordinator Broker &Group Leader
GroupCoordinator Broker 组协调者 为服务端的一个Broker,Group Leader为一组消费者中的一个(由GroupCoordinator Broker选定,他们合作通过组协调协议(group coordination protocol)实现消费者成员管理、消费分配方案制定 (rebalance) 以及提交位移等

Partition

Kafka集群中每个Topic中的消息会被分为一个或若干个 Partition(用于提高消息的处理效率,遇到瓶颈时,可以通过增加 Partition的数量来进行横向扩容),一个Partition是一个有序的 message 序列,每条 message 都有唯一的offset编号,单个 Partition内是保证消息有序的 

Replica

Partition的副本,保障 Partition的高可用,通过算法均匀分布在多个Broker 上(同一个 partition 的 replica 在不同的 broker,同一个 topic+partition 只有一个 leader)
Replica 副本分为 Leader Replica 和 Follower Replica
Producer和 Consumer 只跟 Leader 交互,实际都是作用在 Leader 副本上,然后再同步到其他的 Follower,
在 leader 挂了之后,Controller 会从符合一定条件的 Follower 中选取一个作为新的 Leader 提供服务ISR(is-sync replica) 
同步副本集合,有资格被选为新的Leader 的副本集合,如果 Follower 延迟过大,会被踢出集合,追赶上数据之后,重新向 leader 申请,加入 ISR 集合



Kafka数据流概览

Kafka学习笔记总结
Producers 往 Brokers 里面的指定 Topic 中写消息,Consumers 从 Brokers 里面拉去指定 Topic 的消息
两种信息队列实现流程如下,
发布 – 订阅消息的工作流程:
1,生产者定期向主题发送消息
2,Kafka Broker服务端接收消息并持久化到多个或一个分区
3,多个分组的消费者分别订阅该特定主题,定期poll轮询拉取数据,每组消费者处理消息后提交新的消费进度offset到服务端,服务端保存到特定的Topic记录,消费者可以随时回退 / 跳到所需的主题偏移量,并阅读所有后续消息。
单一队列消息工作流程:
1,生产者定期向主题发送消息
2,Kafka Broker服务端接收消息并持久化并持久化到一个分区
3,唯一的消费者订阅该特定主题,定期poll轮询拉取数据,唯一分区内的消息可保证消息发送顺序









Kafka本质

Kafka核心本质:一套分部式磁盘文件有序批量读写系统

Kafka 使用

Kafka为开发者提供了四类API:

3kafka生产者客户端实现

Producer发送消息流程:

发送消息时方法调用流程:

主线程部分一些关键对象和流程说明:

ProducerInterceptor

添加自定义预操作,功能类似于Java Web中的Filter,它可以在消息发送之前对其进行拦截或修改,也可以先于用户的Callback,对ACK响应进行预处理。自定义使用ProducerInterceptor类,只要实现ProducerInterceptor接口,创建其对象并添加到ProducerInterceptors中即可

集群元数据相关对象及更新操作

Partitioner.partition()方法

RecordAccumulator部分关键对象和流程说明:

MemoryRecords

RecordBatch

RecordAccumulator

Sender线程部分关键对象和流程说明:

发送消息的流程:

NetworkClient

相关方法:

InFlightRequests

主要作用是缓存了已经发出去但没收到响应的ClientRequest。其底层是通过一个Map<String, Deque<ClientRequest>>对象实现

DefaultMetadataUpdater

ClusterConnectionStates

NetworkClient中所有连接的状态由ClusterConnectionStates管理,它底层使用Map<String,NodeConnectionState>实现,key是NodeId,value是NodeConnectionState对象,其中使用ConnectionState枚举表示连接状态,还记录了最近一次尝试连接的时间戳

4kafka服务端网络层实现,如何与客户端交互

工作原理:

服务端和网络层相关的组件:一个接收器线程(Acceptor)、多个处理器(Processor)、一个请求通道(Requestchannel)、一个请求队列(requestoueue)、多个响应队列(responseQueue)、一个请求处理线程连接池(KafkaRequestHandlerPool)、多个请求处理线程(KafkaRequestHandler)、一个服务端请求入口(KafkaApis)

SocketServer相关对象和方法:

SocketServer

Acceptor

Processor 

RequestChannel

KafkaRequestHandlerPool

KafkaRequestHandlerPool是一个简易版的线程池,管理了所有的KafkaRequestHandler线程,其个数默认为8个,由参数num.io.threads决定

KafkaRequestHandler

KafkaRequestHandler的主要职责是从RequestChannel获取请求并调用KafkaApis. handle()方法处理请求

KafkaApis

请求数据从生产者发送到服务端的流转过程

Kafka学习笔记总结
KafkaProducer线程创建ProducerRecord后,将其缓存进RecordAccumulator。
Sender线程从RecordAccumulator中获取缓存的消息,放入KafkaChannel.send字段中等待发送,同时放入InFlightRequests队
列中等待响应。之后,客户端会通过KSelector将请求发送出去。
在服务端,Processor线程使用KSelector读取请求并暂存到stageReceives队列中,KSelector.poll()方法结束后,请求被移转移到completeReceives队列中。之后,Processor将请求进行一些解析操作后,放入RequestChannel.requestQueue队列。Handler线程会从RequestChannel.requestQueue队列中取出请求进行处理,将处理之后生成的响应放入RequestChannel.responseQueue队列。
Processor线程从其对应的RequestChannel. responseQueue队列中取出响应并放入inflightResponses队列中缓存,当响应发送出去之后会将其从inflightResponse中删除。生产者读取响应的过程与服务端读取请求的过程类似,主要的区别是生产者需要对InFlightRequest中的请求进行确认




5kafka服务端如何读写消息数据,如何维护日志数据

Kafka使用日志文件的方式保存生产者发送的消息

日志文件结构

偏移量offset

日志读写相关对象和方法

Kafka学习笔记总结

FileMessageSet

OffsetIndex

LogSegment

Log

LogManager

高效日志文件读写原理

6kafka服务端如何保证高可用

KafkaController机制

所谓瞬间瞬时节点就是读写Zookeeper模块的客户端会维护和Zookeeper集群的连接,当该连接断开的时候,通过此连接之前创建的瞬时节点都会消失,所以说该节点是瞬时的,不是永久存在的。接着当Leader状态的KafkaController离线的时候,其内部的Zookeeper客户端就会失去和Zookeeper集群的连接,那么此时其他KafkaController观察到数据节点消失之后,就会重新尝试创建节点。

Kafka学习笔记总结
Zookeeper上保存了Kafka集群的元数据信息,KafkaController通过在不同目录注册不同的回调函数来达到监测集群状态的目的,及时响应集群状态的变化,ZooKeeper中与KafkaController相关的路径以及该路径中记录的内容的含义:
/brokers/ids/[id]:记录了集群中可用Broker的id,通过监测这个目录的变化可以及时响应Broker的上下线情况等。
/brokers/topics/[topic]/partitions:记录了一个Topic中所有分区的分配信息以及AR集合信息,通过监测这个目录的变化可以及时响应Topic创建和删除的请求;
/brokers/topics/[topic]/partitions/[partition_id]/state:记录了某Partition的Leader副本所在BrokerId、lead_epoch、ISR集合、ZKVersion等信息。
/controller_epoch:记录了当前Controller Leader的年代信息。
/controller:记录了当前Controller Leader的Id,也用于Controller Leader的选举
/admin/reassign_partitions:记录了需要进行副本重新分配的分区,通过监测这个目录的变化可以及时响应Topic分区变化的请求;
/admin/preferred_replica_election:记录了需要进行“优先副本”选举的分区。“优先副本”是在创建分区时为其指定的第一个副本,通过监测这个目录的变化可以及时响应Topic分区副本变化的请求;
/admin/delete_topics:记录了待删除的Topic。
/isr_change_notification:记录了一段时间内ISR集合发生变化的分区。
/config:记录了一些配置信息

Kafka学习笔记总结
如上图KafkaController组织并封装了其他组件,对外提供API接口:
ZookeeperLeaderElector,主要用于Controller Leader的选举
ControllerContext,KafkaController的上下文信息,缓存了ZooKeeper中记录的整个集群的元信息,例如可用Broker、全部的Topic、分区、副本的信息等。
ControllerChannelManager,维护了Controller Leader与集群中其他Broker之间的网络连接,是管理整个集群的基础
TopicDeletionManager,用于对指定的Topic进行删除
PartitionStateMachine,用于管理集群中所有Partition状态的状态机
ReplicaStateMachine,用于管理集群中所有副本状态的状态机
ControllerBrokerRequestBatch,实现了向Broker批量发送请求的功能
*PartitionLeaderSelector,实现了多种Leader副本选举策略。
*Listener,是ZooKeeper上的监听器,实现了对ZooKeeper上某些节点中的数据、子节点或ZooKeeperSession状态的监听,被触发后调用相应的业务逻辑






















KafkaController相关功能主要对象和方法说明:

ControllerChannelManager

ControllerContext

状态机

PartitionStateMachine

ReplicaStateMachine

ZooKeeper Listener

副本机制

副本机制主要对象和方法:

7kafka消费者客户端拉取消息

Consumer接口中定义KafkaConsumer对外的API:

一些关键对象说明:

ConsumerNetworkClient

SubscriptionState

消息拉取

提交拉取消息的进度

事务性 / 原子性广播

Consumer Group Rebalance

Kafka学习笔记总结
通过GroupCoordinator(KafkaServer中用于管理Consumer Group的组件)以及消费者端的Group Leader以及两阶段(Join Group阶段和Synchronizing Group State阶段)式协议事项
步骤:
1,消费者通过发送ConsumerMetadataRequest(包含其Consumer Group的GroupId)到服务端Broker节点,收到请求的Broker会返回ConsumerMetadataResponse作为响应,其中
包含了管理此Consumer Group的GroupCoordinator的ConsumerMetadataResponse信息,根据ConsumerMetadataResponse中的GroupCoordinator信息,连接到GroupCoordinator并周
期性地发送HeartbeatRequest,表示有效在线
2,消费者在加入或退出Consumer Group,取消订阅,下线或Topic出现分区数量的变化,会GroupCoordinator开始Rebalance操作
3,消费者接受到的如果HeartbeatResponse中带有IllegalGeneration异常,说明GroupCoordinator发起了Rebalance操作,此时消费者发送JoinGroupRequest给服务端GroupCoordinator,通知GroupCoordinator,当前消费者要加入指定的Consumer Group。之后,服务端的GroupCoordinator收到JoinGroupRequest后会暂存消息,收集到全部消费者之后,根据JoinGroupRequest中的信息来确定Consumer Group中可用的消费者,从中选取一个消费者成为Group Leader,还会选取使用的分区分配策略,最后将这些信息封装成JoinGroupResponse返回给消费者
4,每个消费者都会收到JoinGroupResponse,但是只有Group Leader收到的JoinGroupResponse中封装了所有消费者的信息。当消费者确定自己是Group Leader后,会根据消费者的信息以及选定的分区分配策略进
行分区分配
5,接下来进入Synchronizing Group State阶段,每个消费者会发送SyncGroupRequest到GroupCoordinator,但是只有Group Leader的SyncGroupRequest请求包含了分区的分配结果,GroupCoordinator根据Group Leader的分区分配结果,形成SyncGroupResponse返回给所有Consumer,消费者就可以根据SyncGroupResponse中分配的分区开始消费数据。
5,消费者成功成为Consumer Group的成员后,又会会周期性发送HeartbeatRequest。如果HeartbeatResponse包含IllegalGeneration异常,则执行步骤3。
如果找不到对应的GroupCoordinator(HeartbeatResponse包含NotCoordinatorForGroup异常),则周期性地执行步骤1,直至成功。











服务端负载均衡以及消息进度保存相关对象和方法:

GroupCoordinator

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/135616.html

(0)
上一篇 2025-07-03 16:15
下一篇 2025-07-03 16:20

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信