目录
1 拉取消息
1.1 封装拉取请求
1.2 处理拉取请求
2 处理消费结果
2.1 并发消息
2.2 顺序消息
3 总结
首页 科技周边 人工智能 阿里二面:RocketMQ 消费者拉取一批消息,其中部分消费失败了,偏移量怎样更新?

阿里二面:RocketMQ 消费者拉取一批消息,其中部分消费失败了,偏移量怎样更新?

Apr 12, 2023 pm 11:28 PM
rocketmq 消费者 consumer

大家好,我是君哥。

最近有读者参加面试时被问了一个问题,如果消费者拉取了一批消息,比如 100 条,第 100 条消息消费成功了,但是第 50 条消费失败,偏移量会怎样更新?就着这个问题,今天来聊一下,如果一批消息有消费失败的情况时,偏移量怎么保存。

1 拉取消息

1.1 封装拉取请求

以 RocketMQ 推模式为例,RocketMQ 消费者启动代码如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
 }catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}
登录后复制

上面的 DefaultMQPushConsumer 是一个推模式的消费者,启动方法是 start。消费者启动后会触发重平衡线程(RebalanceService),这个线程的任务是在死循环中不停地进行重平衡,最终封装拉取消息的请求到 pullRequestQueue。这个过程涉及到的 UML 类图如下:

图片

1.2 处理拉取请求

封装好拉取消息的请求 PullRequest 后,RocketMQ 就会不停地从 pullRequestQueue 获取消息拉取请求进行处理。UML 类图如下:

图片

拉取消息的入口方法是一个死循环,代码如下:

//PullMessageService
public void run(){
 log.info(this.getServiceName() + " service started");

 while (!this.isStopped()) {
try {
 PullRequest pullRequest = this.pullRequestQueue.take();
 this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
 log.error("Pull Message Service Run Method exception", e);
}
 }

 log.info(this.getServiceName() + " service end");
}
登录后复制

这里拉取到消息后,提交给 PullCallback 这个回调函数进行处理。

拉取到的消息首先被 put 到 ProcessQueue 中的 msgTreeMap 上,然后被封装到 ConsumeRequest 这个线程类来处理。把代码精简后,ConsumeRequest 处理逻辑如下:

//ConsumeMessageConcurrentlyService.java
public void run(){
 MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
 ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
 ConsumeConcurrentlyStatus status = null;
 try {
//1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
 } catch (Throwable e) {
 }
 if (!processQueue.isDropped()) {
//2.处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
 } else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
 }
}
登录后复制

2 处理消费结果

2.1 并发消息

并发消息处理消费结果的代码做精简后如下:

//ConsumeMessageConcurrentlyService.java
public void processConsumeResult(
 final ConsumeConcurrentlyStatus status,
 final ConsumeConcurrentlyContext context,
 final ConsumeRequest consumeRequest
){
 int ackIndex = context.getAckIndex();
 switch (status) {
case CONSUME_SUCCESS:
 if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
 }
 int ok = ackIndex + 1;
 int failed = consumeRequest.getMsgs().size() - ok;
 break;
case RECONSUME_LATER:
 break;
default:
 break;
 }

 switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
 }
 break;
case CLUSTERING:
 List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
 msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
 msgBackFailed.add(msg);
}
 }

 if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
 }
 break;
default:
 break;
 }

 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
 if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
 }
}
登录后复制

从上面的代码可以看出,如果处理消息的逻辑是串行的,比如文章开头的代码使用 for 循环来处理消息,那如果在某一条消息处理失败了,直接退出循环,给 ConsumeConcurrentlyContext 的 ackIndex 变量赋值为消息列表中失败消息的位置,这样这条失败消息后面的消息就不再处理了,发送给 Broker 等待重新拉取。代码如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 for (int i = 0; i < msgs.size(); i++) {
try{
 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
 context.setAckIndex(i);
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}
登录后复制

消费成功的消息则从 ProcessQueue 中的 msgTreeMap 中移除,并且返回 msgTreeMap 中最小的偏移量(firstKey)去更新。注意:集群模式偏移量保存在 Broker 端,更新偏移量需要发送消息到 Broker,而广播模式偏移量保存在 Consumer 端,只需要更新本地偏移量就可以。

如果处理消息的逻辑是并行的,处理消息失败后给 ackIndex 赋值是没有意义的,因为可能有多条消息失败,给 ackIndex 变量赋值并不准确。最好的方法就是给 ackIndex 赋值 0,整批消息全部重新消费,这样又可能带来冥等问题。

2.2 顺序消息

对于顺序消息,从 msgTreeMap 取出消息后,先要放到 consumingMsgOrderlyTreeMap 上面,更新偏移量时,是从 consumingMsgOrderlyTreeMap 上取最大的消息偏移量(lastKey)。

3 总结

回到开头的问题,如果一批消息按照顺序消费,是不可能出现第 100 条消息消费成功了,但第 50 条消费失败的情况,因为第 50 条消息失败的时候,应该退出循环,不再继续进行消费。

如果是并发消费,如果出现了这种情况,建议是整批消息全部重新消费,也就是给 ackIndex 赋值 0,这样必须考虑冥等问题。

以上是阿里二面:RocketMQ 消费者拉取一批消息,其中部分消费失败了,偏移量怎样更新?的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
1 个月前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
1 个月前 By 尊渡假赌尊渡假赌尊渡假赌
威尔R.E.P.O.有交叉游戏吗?
1 个月前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

SpringBoot整合RocketMQ的方法是什么 SpringBoot整合RocketMQ的方法是什么 May 14, 2023 am 10:19 AM

1.SpringBoot整合RocketMQ在SpringBoot中集成RocketMQ,只需要简单四步:1.引入相关依赖org.apache.rocketmqrocketmq-spring-boot-starter2.添加RocketMQ的相关配置rocketmq:consumer:group:springboot_consumer_group#一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值pull-batch-size:10name-server:10.5.103.6:9876pr

阿里二面:RocketMQ 消费者拉取一批消息,其中部分消费失败了,偏移量怎样更新? 阿里二面:RocketMQ 消费者拉取一批消息,其中部分消费失败了,偏移量怎样更新? Apr 12, 2023 pm 11:28 PM

大家好,我是君哥。最近有读者参加面试时被问了一个问题,如果消费者拉取了一批消息,比如 100 条,第 100 条消息消费成功了,但是第 50 条消费失败,偏移量会怎样更新?就着这个问题,今天来聊一下,如果一批消息有消费失败的情况时,偏移量怎么保存。1 拉取消息1.1 封装拉取请求以 RocketMQ 推模式为例,RocketMQ 消费者启动代码如下:public static void main(String[] args) throws InterruptedException, MQClie

常用环境部署—Docker安装RocketMQ教程! 常用环境部署—Docker安装RocketMQ教程! Mar 07, 2024 am 09:30 AM

在Docker中安装RocketMQ的过程如下所示:创建Docker网络:在终端中执行以下命令来创建Docker网络,以便在容器之间进行通信:dockernetworkcreaterocketmq-network下载RocketMQ镜像:在终端中执行以下命令来下载RocketMQ的Docker镜像:dockerpullrocketmqinc/rocketmq启动NameServer容器:在终端中执行以下命令来启动NameServer容器:dockerrun-d--namermqnamesrv--

SpringBoot整合RocketMQ遇到的坑怎么解决 SpringBoot整合RocketMQ遇到的坑怎么解决 May 19, 2023 am 11:25 AM

应用场景在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group、Topic以及selectorExpression(数据过滤、选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloudconfig进行动态切换。引入依赖org.apache.rocketmqrocketmq-spring-boot-starter2.0.4消费者代码@RocketMQMessageListener(consumerGroup=&qu

队列的生产者与消费者模式在PHP与MySQL中的实现方法 队列的生产者与消费者模式在PHP与MySQL中的实现方法 Oct 15, 2023 pm 02:33 PM

队列的生产者与消费者模式在PHP与MySQL中的实现方法随着互联网业务的快速发展,系统中处理大量任务的需求变得越来越迫切。队列是一种常见的解决方案,可以高效地处理任务。队列的生产者-消费者模式(Producer-ConsumerPattern)在PHP和MySQL中的实现方法是一种常见的解决方案,本文将介绍具体的实现方法,并提供代码示例。生产者-消费者模式

生产者-消费者问题及其在C++中的实现 生产者-消费者问题及其在C++中的实现 Sep 17, 2023 pm 11:09 PM

并发计算中普遍存在的同步挑战被称为生产者-消费者问题。鉴于多个线程或进程旨在在访问共享源时协调各自的操作;这个问题需要复杂的沟通任务以及平衡的执行程序。今天的讨论将有助于理解这一困难背后的概念,同时认识到它在当代计算机科学框架中的重要性-特别是在C++实现实践中。理解生产者-消费者问题定义和目的解决生产者-消费者问题带来的挑战的解决方案来自于明确划分负责生产和使用信息的人员之间的责任。当生产者自行生成新记录时,消费者通过同步他们的操作来确保它们被正确使用。人们必须小心避免竞争条件或死锁等问题,如

分析Java开发RocketMQ生产者高可用示例 分析Java开发RocketMQ生产者高可用示例 Apr 23, 2023 pm 11:28 PM

1消息publicclassMessageimplementsSerializable{privatestaticfinallongserialVersionUID=8445773977080406428L;//主题名字privateStringtopic;//消息扩展信息,Tag,keys,延迟级别都存在这里privateMapproperties;//消息体,字节数组privatebyte[]body;//设置消息的key,publicvoidsetKeys(Stringkeys){}//设

java Consumer接口有什么作用 java Consumer接口有什么作用 Apr 26, 2023 am 11:34 AM

1、说明consumer表示消耗,接口接受通用参数t,调用accept,对参数执行一系列操作,但没有返回值。2、实例对Consumer来说,我们需要提供入参来消费。classPerson{StringfirstName;StringlastName;Person(){}Person(StringfirstName,StringlastName){this.firstName=firstName;this.lastName=lastName;}}Java可以用来干什么Java主要应用于:1.web

See all articles