大家好,欢迎来到IT知识分享网。
前言
- Pull:拉模式,消费者自行拉取消息、上报消费结果
- Push:推模式,消费者主动拉取消息,看起来像是 Broker 主动推送消息,主动上报消费结果
Pop 消费模式可以看作是 Push 的升级版,Push 消费模式存在以下痛点:
- 客户端负责队列重平衡、消息拉取、消费位点上报、消费失败重试等功能,逻辑太重了,不利于多语言客户端发展
- 队列数/消费者数量变更会触发重平衡操作,期间消息无法消费,容易造成消息堆积
- 队列和消费者强绑定,消费者数量超过队列数后,无法再水平扩容
- 队列和消费者强绑定,消费者僵死状态下导致消息堆积
Pop 消费模式就是要解决这些痛点的,它的设计目标:
- 消费者只管消息拉取、消息消费、上报 ACK,客户端 SDK 轻量级
- 队列不再绑定消费者,消费者可以消费所有队列消息
- 消费者可以很方便的水平扩容
要实现这个目标还是有不小的挑战,看看 RocketMQ 是如何做到的吧。
设计难点
- Broker 消息投递处理简单,根据消费者请求的拉取位点投递
- Broker 消费位点管理简单,无需记录消息是否被消费,只需要记录消费位点,位点前的都消费了,位点后的都没消费
2、Broker 要记录哪些消息消费成功了,哪些消息消费失败了,不能单纯记录消费位点了
因为队列可以被多个消费者消费,大家都在上报队列粒度的消费位点,Broker 没法管理了。
你看,为了可以让队列被多个消费者消费,Broker 已经不能再按队列维度去管理信息了,必须精确到消息粒度,这就需要额外的数据结构来支撑。
设计实现
取消客户端队列 Rebalance
客户端重平衡,不仅限制了消费者的消费能力,还增加了客户端复杂度,重平衡期间消息无法被消费,还容易造成消息堆积。Pop 消费模式下,消费者可以消费所有队列,也就不再需要客户端重平衡了。消费者查询给自己分配的队列时,Proxy 返回的是 Broker 维度且 queueId=-1 的逻辑队列,Broker 端会投递所有队列的消息。
invisibleTime
Pop 消费模式下,消息投递后会有一个invisibleTime
的概念,即消息的不可见时间,默认是 60 秒。比如 M1 投递给消费者A后,在不可见时间段内,其它消费者是无法消费这条消息的。
另外 Broker 还提供了changeInvisibleTime()
接口修改单条消息的不可见时间,比如消息消费失败后,会根据重试次数来设置新的不可见时间。
CK & ACK 消息
RocketMQ 为了记录消息是否被消费成功,引入了 CK 和 ACK 消息,以及一个专属的 Topic:rmq_sys_REVIVE_LOG_{clusterName}
。
消费者在拉取消息时,Broker 端会给拉取到的这一批消息发一个 CK 消息,CK 消息记录了各消息的偏移量可以定位到具体消息,同时用位图记录了各消息的 ACK 情况。Consumer 消费成功后会调用ackMessage
接口,Broker 会发送一个对应的 ACK 消息;消费失败后会调用changeInvisibleTime
接口,延长消息不可见时间,底层是先发一个旧消息的 ACK 消息,再发一个新消息的 CK 消息。
上述流程的运转是基于消费者正常处理消费结果的前提下的,如果消费者挂了,既不发 ack 也不发 nack,Broker 又该怎么处理这些消息呢?其实也能正常处理,因为 CK 和 ACK 消息均是延时消息,延迟的时间即消息的不可见时间,CK 消息会提前一秒消费,目的是匹配 ACK 消息。
RocketMQ 会启动八个线程消费 REVIVE Topic 对应的八个队列,匹配 CK 消息里还有哪些消息没被 ack,再将这些没被 ack 的消息发送到 RETRY Topic,消费者就可以重新消费了。
源码
ProcessQueue
PushConsumer 启动后,会定时向 Proxy 查询分配的队列:
protected void startUp() throws Exception {
...... scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {
try {
// 扫描分配的队列 scanAssignments(); } catch (Throwable t) {
log.error("Exception raised while scanning the load assignments, clientId={}", clientId, t); } }, 1, 5, TimeUnit.SECONDS); ...... }
- 队列不再分配给自己,停止拉取消息,移除队列
- 新分配的队列,立即拉取消息
对于新分配的队列,消费者会创建ProcessQueue
对象开始拉取队列消息:
private void receiveMessageImmediately() {
// Broker端点列表 gRPC负载均衡调用 final Endpoints endpoints = mq.getBroker().getEndpoints(); // 构建请求 final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression); // 发请求 final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq, consumer.getPushConsumerSettings().getLongPollingTimeout()); Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {
@Override public void onSuccess(ReceiveMessageResult result) {
// 处理消息 onReceiveMessageResult(result); } } }
PopMessageProcessor
Proxy 端会开启 gRPC 服务,GrpcMessagingApplication 用来处理客户端的请求,拉取消息对应的处理方法是receiveMessage
,最终会交给 Broker 的 PopMessageProcessor 处理,主要步骤:
- 校验请求参数、队列写权限等等
- 构建消息过滤器 ExpressionMessageFilter
- 按照 1/5 的概率先尝试从重试队列里拉取消息
- 遍历所有队列拉取消息,直到拉取最大消息数
- 如果普通队列拉取的消息数没达到最大消息数,再尝试拉取重试队列
- 如果还有剩余消息,通知其它被长轮询挂起的请求继续拉取
- 返回结果
private RemotingCommand processRequest(final Channel channel, RemotingCommand request) throws RemotingCommandException {
......// 参数、权限校验 // Topic配置 TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); // 消费组订阅配置 SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); // 消息过滤 ExpressionMessageFilter messageFilter = ...... // POP模式下,消费者要遍历所有队列,随机一个下标开始读取,都从0开始读的话,存在热点队列,竞争锁 int randomQ = random.nextInt(100); int reviveQid; // REVEIE队列ID 顺序消息999 普通消息:0~7轮询 if (requestHeader.isOrder()) {
reviveQid = KeyBuilder.POP_ORDER_REVIVE_QUEUE; } else {
reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum()); } int commercialSizePerMsg = this.brokerController.getBrokerConfig().getCommercialSizePerMsg(); GetMessageResult getMessageResult = new GetMessageResult(commercialSizePerMsg); long restNum = 0; // 剩余消息数 // 平均每5次请求,读取一下重试队列 boolean needRetry = randomQ % 5 == 0; if (needRetry && !requestHeader.isOrder()) {
restNum = popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, messageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo); } // POP模式下,请求的queueId=-1,读所有队列 if (requestHeader.getQueueId() < 0) {
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % topicConfig.getReadQueueNums(); restNum = popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, messageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo); } } // 拉取到的消息数量不满足消费者期望的数量,接着拉取重试队列:%RETRY%{consumerGroup}_{topic} ...... if (!getMessageResult.getMessageBufferList().isEmpty()) {
// 拉取到消息,且队列里面还有消息,通知其它拉取请求 if (restNum > 0) {
notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId()); } } else {
// 没有消息,进入长轮询状态 挂起 int pollingResult = polling(channel, request, requestHeader); } ...... }
通过队列获取消息的方法是popMsgFromQueue
,虽然队列可以被多个消费者消费,但是同一个消费组下,针对同一个队列拉取消息的行为必须保证串行,所以 Broker 首先会构建一个topic@group@queueId
格式的字符串作为 lockKey 保证加锁成功,然后再获取消息拉取位点 offset,调用 MessageStore 获取消息,最终给这批消息记录 CK 消息。
private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult, PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid, Channel channel, long popTime, ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo, StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()) : requestHeader.getTopic(); // 给队列加锁 同一消费组下的队列串行读取 topic@group@queueId String lockKey = topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId; // 加锁 if (!queueLockManager.tryLock(lockKey)) {
restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum; return restNum; } // 拉取位点 offset = getPopOffset(topic, requestHeader, queueId, true, lockKey); // 获取消息 GetMessageResult getMessageTmpResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup() , topic, queueId, offset, requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter); if (!getMessageTmpResult.getMessageMapedList().isEmpty()) {
// 追加CheckPoint appendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName()); } ...... queueLockManager.unLock(lockKey); }
这里的锁实现很简单,通过 AtomicBoolean CAS 修改上锁标记位来判断是否加锁成功:
static class TimedLock {
private final AtomicBoolean lock; private volatile long lockTime; }
加锁成功后,Broker 得知道该从哪里开始给消费者投递消息,这就是拉取位点的获取:
- 首先取已提交的消费位点
- 如果 CK 缓冲区有已投递的 PopCheckPoint,则取缓冲区的拉取位点,避免已经投递过的消息重复投递
private long getPopOffset(String topic, PopMessageRequestHeader requestHeader, int queueId, boolean init, String lockKey) {
// 已提交的消费位点 long offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), topic, queueId); if (offset < 0) {
// 还没消费过 判断是从最旧还是最新的开始消费 if (ConsumeInitMode.MIN == requestHeader.getInitMode()) {
offset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId); } else {
// pop last one,then commit offset. offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1; // max & no consumer offset if (offset < 0) {
offset = 0; } if (init) {
this.brokerController.getConsumerOffsetManager().commitOffset("getPopOffset", requestHeader.getConsumerGroup(), topic, queueId, offset); } } } // CK缓冲区的位点,这些是已投递、还没提交消费的位点 long bufferOffset = this.popBufferMergeService.getLatestOffset(lockKey); if (bufferOffset < 0) {
return offset; } else {
return bufferOffset > offset ? bufferOffset : offset; } }
有了拉取位点,接下来就是通过 MessageStore 查询消息了,底层会读取 consumequeue 和 commitlog 文件,这里不赘述。拉取到消息列表后,Broker 会给这一批消息记录一个 CK 消息,用于后续匹配 ACK 消息。
PopCheckPoint
CK 消息对应的类是 PopCheckPoint,主要记录了:消息拉取时间、消息偏移量、不可见时间、消息 ACK 位图等等。
public class PopCheckPoint {
// 起始偏移量 @JSONField(name = "so") private long startOffset; // 消息拉取时间 @JSONField(name = "pt") private long popTime; @JSONField(name = "it") private long invisibleTime; // 位图 收到ACK消息则把对应位设为1 @JSONField(name = "bm") private int bitMap; // 消息数量 @JSONField(name = "n") private byte num; @JSONField(name = "q") private byte queueId; @JSONField(name = "t") private String topic; // 消费组 @JSONField(name = "c") private String cid; @JSONField(name = "ro") private long reviveOffset; // 消息增量偏移量 @JSONField(name = "d") private List<Integer> queueOffsetDiff; @JSONField(name = "bn") String brokerName; }
构建好 PopCheckPoint 对象,Broker 会把它作为一个普通消息写入 commitlog 持久化,消息内容:
topic: rmq_sys_REVIVE_LOG_{clusterName} queueId: 0~7轮询 tag: ck body: json(PopCheckPoint) deliverTimeMs: invisibleTime-1s
CK 消息存储完毕后,就可以正常返回消息了。需要注意的是,消息在返回给消费者前,Broker 会给消息设置一个很重要的属性:POP_CK
。它是消息关联的 CK 消息的句柄字符串,消费者基于该属性来 ack 消息。
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(key), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(), messageExt.getTopic(), messageQueue.getBrokerName(), messageExt.getQueueId(), msgQueueOffset) );
POP_CK
由以下八个属性构成,空格连接,通过该属性可以快速定位到 CK 消息。
{ckQueueOffset} {popTime} {invisibleTime} {reviveQid} {
0/1} {brokerName} {queueId} {msgQueueOffset} # 示例值 2 35 60000 0 0 broker-a 3 2
AckMessageProcessor
消费者消费完消息后,会调用eraseMessage()
擦除消息,也就是根据消费结果判断是 ack 还是 nack。
public void eraseMessage(MessageViewImpl messageView, ConsumeResult consumeResult) {
statsConsumptionResult(consumeResult); ListenableFuture<Void> future = ConsumeResult.SUCCESS.equals(consumeResult) ? ackMessage(messageView) : nackMessage(messageView); future.addListener(() -> evictCache(messageView), MoreExecutors.directExecutor()); }
如果消费成功,则调用ackMessage
接口,Broker 的处理方式也很简单,就是构建一个 AckMsg 对象,然后把它作为消息体发一个 ACK 消息。
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException {
...... AckMsg ackMsg = new AckMsg(); ackMsg.setAckOffset(requestHeader.getOffset()); ackMsg.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfo)); ackMsg.setConsumerGroup(requestHeader.getConsumerGroup()); ackMsg.setTopic(requestHeader.getTopic()); ackMsg.setQueueId(requestHeader.getQueueId()); ackMsg.setPopTime(ExtraInfoUtil.getPopTime(extraInfo)); ackMsg.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo)); if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
顺序消息处理 } // 构建消息存储 MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(reviveTopic); msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset)); msgInner.setQueueId(rqId); msgInner.setTags(PopAckConstants.ACK_TAG); msgInner.setBornTimestamp(System.currentTimeMillis()); msgInner.setBornHost(this.brokerController.getStoreHost()); msgInner.setStoreHost(this.brokerController.getStoreHost()); // 延时消息 拉取时间+不可见时间 msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + ExtraInfoUtil.getInvisibleTime(extraInfo)); msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg)); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); return response; }
ACK 消息内容如下:
topic: rmq_sys_REVIVE_LOG_{clusterName} queueId: 0~7轮询 tag: ack body: json(AckMsg) deliverTimeMs: invisibleTime
注意:CK & ACK 消息同属一个Topic
ChangeInvisibleTimeProcessor
如果消费失败,会根据重试次数调用changeInvisibleDuration
接口延长消息不可见时间。Broker 最终会由 ChangeInvisibleTimeProcessor 处理请求,因为消息是按照紧凑的方式顺序写入 commitlog,所以写入后就不支持修改了,所谓的修改消息不可见时间,其实是先发一个新的 CK 消息,再发一个旧消息的 ACK 消息。
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException {
...... // add new ck long now = System.currentTimeMillis(); PutMessageResult ckResult = appendCheckPoint(requestHeader, ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(), requestHeader.getOffset(), now, ExtraInfoUtil.getBrokerName(extraInfo)); // ack old msg ackOrigin(requestHeader, extraInfo); }
所以,无论消费者是调用 ack 还是 nack,消息一定会被 ack 掉。另外还有一种场景就是消费者异常,没有上报消费结果,Broker 在消息不可见时间到期后会把这些没被 ack 掉的消息发到重试队列里,让其它消费者消费。
PopReviveService
public void run() {
...... ConsumeReviveObj consumeReviveObj = new ConsumeReviveObj(); // 消费Revive队列消息,构建PopCheckPoint放入map consumeReviveMessage(consumeReviveObj); // 处理PopCheckPoint mergeAndRevive(consumeReviveObj); }
protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
HashMap<String, PopCheckPoint> map = consumeReviveObj.map; // 消费位点 long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(PopAckConstants.REVIVE_GROUP, reviveTopic, queueId); while (true) {
// 查询REVIVE队列消息 List<MessageExt> messageExts = getReviveMessage(offset, queueId); for (MessageExt messageExt : messageExts) {
//ck消息 if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) {
String raw = new String(messageExt.getBody(), DataConverter.charset); PopCheckPoint point = JSON.parseObject(raw, PopCheckPoint.class); map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime(), point); } else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {
// ack消息 String raw = new String(messageExt.getBody(), DataConverter.charset); AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class); PopCheckPoint point = map.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime()); // 设置CheckPoint ACK位图 int indexOfAck = point.indexOfAck(ackMsg.getAckOffset()); if (indexOfAck > -1) {
point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true)); } else {
POP_LOGGER.error("invalid ack index, {}, {}", ackMsg, point); } } } } }
CK 消息到期后,会触发reviveMsgFromCk
恢复没有被 ack 的消息,处理方式是构建新消息发到重试队列。
private void reviveMsgFromCk(PopCheckPoint popCheckPoint) throws Throwable {
for (int j = 0; j < popCheckPoint.getNum(); j++) {
if (DataConverter.getBit(popCheckPoint.getBitMap(), j)) {
// 已经ack了,跳过 continue; } // 查询实际消息 long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j); MessageExt messageExt = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName()); if (messageExt == null) {
POP_LOGGER.warn("reviveQueueId={},can not get biz msg topic is {}, offset is {} , then continue ", queueId, popCheckPoint.getTopic(), msgOffset); continue; } //skip ck from last epoch if (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {
POP_LOGGER.warn("reviveQueueId={},skip ck from last epoch {}", queueId, popCheckPoint); continue; } // 重试 构建一条消息重新Put到commitlog topic:%RETRY%{consumerGroup}_{topic} reviveRetry(popCheckPoint, messageExt); } }
PopBufferMergeService
public boolean addCk(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) {
// key: point.getT() + point.getC() + point.getQ() + point.getSo() + point.getPt() if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {
// 启用内存匹配 return false; } if (!serving) {
return false; } long now = System.currentTimeMillis(); if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.warn("[PopBuffer]add ck, timeout, {}, {}", point, now); } return false; } if (this.counter.get() > brokerController.getBrokerConfig().getPopCkMaxBufferSize()) {
POP_LOGGER.warn("[PopBuffer]add ck, max size, {}, {}", point, this.counter.get()); return false; } PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset); if (!checkQueueOk(pointWrapper)) {
// 队列默认不超过20000个 return false; } // 添加到commitOffsets putOffsetQueue(pointWrapper); // 添加到缓冲区 this.buffer.put(pointWrapper.getMergeKey(), pointWrapper); this.counter.incrementAndGet(); if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]add ck, {}", pointWrapper); } return true; }
同样的,Broker 在接收到消息 ack 请求时,也会优先只改缓冲区的 PopCheckPoint ack 位图,无需存储 ACK 消息。
public boolean addAk(int reviveQid, AckMsg ackMsg) {
if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {
return false; } if (!serving) {
return false; } try {
PopCheckPointWrapper pointWrapper = this.buffer.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName()); if (pointWrapper == null) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, no ck, {}", reviveQid, ackMsg); } return false; } if (pointWrapper.isJustOffset()) {
return false; } PopCheckPoint point = pointWrapper.getCk(); long now = System.currentTimeMillis(); if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, almost timeout for revive, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now); } return false; } if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() - 1500) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, stay too long, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now); } return false; } // 直接更改内存里的位图 int indexOfAck = point.indexOfAck(ackMsg.getAckOffset()); if (indexOfAck > -1) {
markBitCAS(pointWrapper.getBits(), indexOfAck); } else {
POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point); return true; } if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]add ack, rqId={}, {}, {}", reviveQid, pointWrapper, ackMsg); } return true; } catch (Throwable e) {
POP_LOGGER.error("[PopBuffer]add ack error, rqId=" + reviveQid + ", " + ackMsg, e); } return false; }
内存资源有限,一直往里堆 PopCheckPoint 也不行啊,这时候就需要做两件事:
- 把内存里已经匹配完 ack 的 PopCheckPoint 移除掉
- 隔太久还没 ack 掉的 PopCheckPoint,必须要落盘存储了
- PopCheckPoint 消息已经持久化了
- PopCheckPoint 在内存里就已经全被 ack 掉了
还没有完全被 ack 掉的 PopCheckPoint 移除前需要做两件事:
- PopCheckPoint 消息持久化
- 已经被 ack 掉的消息也要持久化
private void scan() {
long startTime = System.currentTimeMillis(); int count = 0, countCk = 0; // 迭代缓冲区的PopCheckPoint Iterator<Map.Entry<String, PopCheckPointWrapper>> iterator = buffer.entrySet().iterator(); while (iterator.hasNext()) {
Map.Entry<String, PopCheckPointWrapper> entry = iterator.next(); PopCheckPointWrapper pointWrapper = entry.getValue(); // CheckPoint已持久化,或已被ACK,从缓冲区删除 // just process offset(already stored at pull thread), or buffer ck(not stored and ack finish) if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper) || isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]ck done, {}", pointWrapper); } iterator.remove(); counter.decrementAndGet(); continue; } // 把超时或停留时间超10s的CheckPoint从缓冲区删除,删除前要先持久化 PopCheckPoint point = pointWrapper.getCk(); long now = System.currentTimeMillis(); boolean removeCk = !this.serving; // ck will be timeout if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {
removeCk = true; } // 内存停留时间超过10秒,也要移除掉 if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {
removeCk = true; } if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() * 2L) {
POP_LOGGER.warn("[PopBuffer]ck finish fail, stay too long, {}", pointWrapper); } // double check if (isCkDone(pointWrapper)) {
continue; } else if (pointWrapper.isJustOffset()) {
// just offset should be in store. if (pointWrapper.getReviveQueueOffset() < 0) {
// reviveQueueOffset<0代表还没存储,要先存储 putCkToStore(pointWrapper, false); countCk++; } continue; } else if (removeCk) {
// put buffer ak to store if (pointWrapper.getReviveQueueOffset() < 0) {
putCkToStore(pointWrapper, false); countCk++; } if (!pointWrapper.isCkStored()) {
continue; } for (byte i = 0; i < point.getNum(); i++) {
// reput buffer ak to store // 存储ack消息 if (DataConverter.getBit(pointWrapper.getBits().get(), i) && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
if (putAckToStore(pointWrapper, i)) {
count++; markBitCAS(pointWrapper.getToStoreBits(), i); } } } if (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]ck finish, {}", pointWrapper); } iterator.remove(); counter.decrementAndGet(); continue; } } } // 扫描commitOffsets,提交消费位点 int offsetBufferSize = scanCommitOffset(); long eclipse = System.currentTimeMillis() - startTime; if (eclipse > brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() - 1000) {
POP_LOGGER.warn("[PopBuffer]scan stop, because eclipse too long, PopBufferEclipse={}, " + "PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}", eclipse, count, countCk, counter.get(), offsetBufferSize); this.serving = false; } else {
if (scanTimes % countOfSecond1 == 0) {
POP_LOGGER.info("[PopBuffer]scan, PopBufferEclipse={}, " + "PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}", eclipse, count, countCk, counter.get(), offsetBufferSize); } } scanTimes++; if (scanTimes >= countOfMinute1) {
counter.set(this.buffer.size()); scanTimes = 0; } }
内存里的 PopCheckPoint 除了会根据唯一键,构建一个 Map,还会根据topic@cid@queueId
构建一个 QueueWithTime 队列连接起来。
ConcurrentHashMap<String/*topic@cid@queueId*/, QueueWithTime<PopCheckPointWrapper>> commitOffsets = new ConcurrentHashMap<>();
QueueWithTime 队列的用途有两个:
- 内存里的 PopCheckPoint 处理完毕后,提交消费位点
- 消息拉取时,获取拉取位点,避免已经投递的消息重复投递
内存里的 PopCheckPoint 只要持久化了或者全被 ack 掉了,就可以提交消费位点了。因为这些消息要么被成功消费了,要么后续在处理 CK 消息时也会被发送到重试队列里。提交消费位点的方法是commitOffset()
private boolean commitOffset(final PopCheckPointWrapper wrapper) {
if (wrapper.getNextBeginOffset() < 0) {
return true; } final PopCheckPoint popCheckPoint = wrapper.getCk(); final String lockKey = wrapper.getLockKey(); // 加锁 if (!queueLockManager.tryLock(lockKey)) {
return false; } try {
// 旧的消费位点 final long offset = brokerController.getConsumerOffsetManager().queryOffset(popCheckPoint.getCId(), popCheckPoint.getTopic(), popCheckPoint.getQueueId()); if (wrapper.getNextBeginOffset() > offset) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("Commit offset, {}, {}", wrapper, offset); } } else {
// maybe store offset is not correct. POP_LOGGER.warn("Commit offset, consumer offset less than store, {}, {}", wrapper, offset); } // 提交消费位点 brokerController.getConsumerOffsetManager().commitOffset(getServiceName(), popCheckPoint.getCId(), popCheckPoint.getTopic(), popCheckPoint.getQueueId(), wrapper.getNextBeginOffset()); } finally {
queueLockManager.unLock(lockKey); } return true; }
尾巴
RocketMQ 5.0 的 Pop 消费模式是 Push 模式的升级版,它解决了原先 Push 模式下队列只能由一个消费者消费的问题、去除了客户端繁重的重平衡逻辑、降低了消息堆积的风险。核心逻辑是 Broker 给每次拉取的一批消息发一个 CK 延时消息,客户端 ack 时再发一个 ACK 延时消息,消息到期后对 CK 消息做 ACK 匹配,把未被 ack 掉的消息发到重试独立里。这种模式下,不可避免的会带来额外开销,所以 RocketMQ 也支持优先在内存里完成匹配,CK 和 ACK 消息就不用存储了。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/134465.html