阿里二面:RocketMQ 消费者拉取一批消息,其中部分消费失败了,偏移量怎样更新?
大家好,我是君哥。
最近有读者参加面试时被问了一个问题,如果消费者拉取了一批消息,比如 100 条,第 100 条消息消费成功了,但是第 50 条消费失败,偏移量会怎样更新?就着这个问题,今天来聊一下,如果一批消息有消费失败的情况时,偏移量怎么保存。
1 拉取消息
1.1 封装拉取请求
以 RocketMQ 推模式为例,RocketMQ 消费者启动代码如下:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
上面的 DefaultMQPushConsumer 是一个推模式的消费者,启动方法是 start。消费者启动后会触发重平衡线程(RebalanceService),这个线程的任务是在死循环中不停地进行重平衡,最终封装拉取消息的请求到 pullRequestQueue。这个过程涉及到的 UML 类图如下:
1.2 处理拉取请求
封装好拉取消息的请求 PullRequest 后,RocketMQ 就会不停地从 pullRequestQueue 获取消息拉取请求进行处理。UML 类图如下:
拉取消息的入口方法是一个死循环,代码如下:
//PullMessageService public void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
这里拉取到消息后,提交给 PullCallback 这个回调函数进行处理。
拉取到的消息首先被 put 到 ProcessQueue 中的 msgTreeMap 上,然后被封装到 ConsumeRequest 这个线程类来处理。把代码精简后,ConsumeRequest 处理逻辑如下:
//ConsumeMessageConcurrentlyService.java public void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) { //2.处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
2 处理消费结果
2.1 并发消息
并发消息处理消费结果的代码做精简后如下:
//ConsumeMessageConcurrentlyService.java public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ){ int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; break; case RECONSUME_LATER: break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
从上面的代码可以看出,如果处理消息的逻辑是串行的,比如文章开头的代码使用 for 循环来处理消息,那如果在某一条消息处理失败了,直接退出循环,给 ConsumeConcurrentlyContext 的 ackIndex 变量赋值为消息列表中失败消息的位置,这样这条失败消息后面的消息就不再处理了,发送给 Broker 等待重新拉取。代码如下:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ for (int i = 0; i < msgs.size(); i++) { try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ context.setAckIndex(i); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
消费成功的消息则从 ProcessQueue 中的 msgTreeMap 中移除,并且返回 msgTreeMap 中最小的偏移量(firstKey)去更新。注意:集群模式偏移量保存在 Broker 端,更新偏移量需要发送消息到 Broker,而广播模式偏移量保存在 Consumer 端,只需要更新本地偏移量就可以。
如果处理消息的逻辑是并行的,处理消息失败后给 ackIndex 赋值是没有意义的,因为可能有多条消息失败,给 ackIndex 变量赋值并不准确。最好的方法就是给 ackIndex 赋值 0,整批消息全部重新消费,这样又可能带来冥等问题。
2.2 顺序消息
对于顺序消息,从 msgTreeMap 取出消息后,先要放到 consumingMsgOrderlyTreeMap 上面,更新偏移量时,是从 consumingMsgOrderlyTreeMap 上取最大的消息偏移量(lastKey)。
3 总结
回到开头的问题,如果一批消息按照顺序消费,是不可能出现第 100 条消息消费成功了,但第 50 条消费失败的情况,因为第 50 条消息失败的时候,应该退出循环,不再继续进行消费。
如果是并发消费,如果出现了这种情况,建议是整批消息全部重新消费,也就是给 ackIndex 赋值 0,这样必须考虑冥等问题。
以上是阿里二面:RocketMQ 消费者拉取一批消息,其中部分消费失败了,偏移量怎样更新?的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

热门话题

1.SpringBoot整合RocketMQ在SpringBoot中集成RocketMQ,只需要简单四步:1.引入相关依赖org.apache.rocketmqrocketmq-spring-boot-starter2.添加RocketMQ的相关配置rocketmq:consumer:group:springboot_consumer_group#一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值pull-batch-size:10name-server:10.5.103.6:9876pr

大家好,我是君哥。最近有读者参加面试时被问了一个问题,如果消费者拉取了一批消息,比如 100 条,第 100 条消息消费成功了,但是第 50 条消费失败,偏移量会怎样更新?就着这个问题,今天来聊一下,如果一批消息有消费失败的情况时,偏移量怎么保存。1 拉取消息1.1 封装拉取请求以 RocketMQ 推模式为例,RocketMQ 消费者启动代码如下:public static void main(String[] args) throws InterruptedException, MQClie

在Docker中安装RocketMQ的过程如下所示:创建Docker网络:在终端中执行以下命令来创建Docker网络,以便在容器之间进行通信:dockernetworkcreaterocketmq-network下载RocketMQ镜像:在终端中执行以下命令来下载RocketMQ的Docker镜像:dockerpullrocketmqinc/rocketmq启动NameServer容器:在终端中执行以下命令来启动NameServer容器:dockerrun-d--namermqnamesrv--

应用场景在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group、Topic以及selectorExpression(数据过滤、选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloudconfig进行动态切换。引入依赖org.apache.rocketmqrocketmq-spring-boot-starter2.0.4消费者代码@RocketMQMessageListener(consumerGroup=&qu

队列的生产者与消费者模式在PHP与MySQL中的实现方法随着互联网业务的快速发展,系统中处理大量任务的需求变得越来越迫切。队列是一种常见的解决方案,可以高效地处理任务。队列的生产者-消费者模式(Producer-ConsumerPattern)在PHP和MySQL中的实现方法是一种常见的解决方案,本文将介绍具体的实现方法,并提供代码示例。生产者-消费者模式

并发计算中普遍存在的同步挑战被称为生产者-消费者问题。鉴于多个线程或进程旨在在访问共享源时协调各自的操作;这个问题需要复杂的沟通任务以及平衡的执行程序。今天的讨论将有助于理解这一困难背后的概念,同时认识到它在当代计算机科学框架中的重要性-特别是在C++实现实践中。理解生产者-消费者问题定义和目的解决生产者-消费者问题带来的挑战的解决方案来自于明确划分负责生产和使用信息的人员之间的责任。当生产者自行生成新记录时,消费者通过同步他们的操作来确保它们被正确使用。人们必须小心避免竞争条件或死锁等问题,如

1消息publicclassMessageimplementsSerializable{privatestaticfinallongserialVersionUID=8445773977080406428L;//主题名字privateStringtopic;//消息扩展信息,Tag,keys,延迟级别都存在这里privateMapproperties;//消息体,字节数组privatebyte[]body;//设置消息的key,publicvoidsetKeys(Stringkeys){}//设

1、说明consumer表示消耗,接口接受通用参数t,调用accept,对参数执行一系列操作,但没有返回值。2、实例对Consumer来说,我们需要提供入参来消费。classPerson{StringfirstName;StringlastName;Person(){}Person(StringfirstName,StringlastName){this.firstName=firstName;this.lastName=lastName;}}Java可以用来干什么Java主要应用于:1.web
