阿里二面:RocketMQ 消費者拉取一批訊息,其中部分消費失敗了,偏移量怎麼更新?
大家好,我是君哥。
最近有讀者參加面試時被問了一個問題,如果消費者拉取了一批訊息,例如100 條,第100 條訊息消費成功了,但是第50 條消費失敗,偏移量會怎樣更新?就著這個問題,今天來聊聊一下,如果一批訊息有消費失敗的狀況時,偏移量怎麼保存。
1 拉取訊息
1.1 封裝拉取請求
#以RocketMQ 推模式為例,RocketMQ 消費者啟動程式碼如下:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
上面的DefaultMQPushConsumer 是一個推模式的消費者,啟動方法是start。消費者啟動後會觸發重平衡線程(RebalanceService),這個線程的任務是在死循環中不停地進行重平衡,最終封裝拉取訊息的請求到 pullRequestQueue。這個過程涉及的UML 類別圖如下:
1.2 處理拉取請求
封裝好拉取訊息的請求PullRequest 後,RocketMQ 就會不停地從pullRequestQueue 取得訊息拉取請求進行處理。 UML 類別圖如下:
拉取訊息的入口方法是一個死循環,程式碼如下:
//PullMessageService public void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
這裡拉取到訊息後,提交給PullCallback 這個回呼函數進行處理。
拉取到的訊息先被 put 到 ProcessQueue 中的 msgTreeMap 上,然後被封裝到 ConsumeRequest 這個執行緒類別來處理。把程式碼精簡後,ConsumeRequest 處理邏輯如下:
//ConsumeMessageConcurrentlyService.java public void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) { //2.处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
2 處理消費結果
2.1 並發訊息
並發訊息處理消費結果的程式碼做精簡後如下:
//ConsumeMessageConcurrentlyService.java public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ){ int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; break; case RECONSUME_LATER: break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
從上面的程式碼可以看出,如果處理訊息的邏輯是串列的,例如文章開頭的程式碼使用for 迴圈來處理訊息,那如果在某一訊息處理失敗了,直接退出循環,給ConsumeConcurrentlyContext 的ackIndex 變數賦值為訊息清單中失敗訊息的位置,這樣這條失敗訊息後面的訊息就不再處理了,發送給Broker 等待重新拉取。程式碼如下:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ for (int i = 0; i < msgs.size(); i++) { try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ context.setAckIndex(i); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
消費成功的訊息則從 ProcessQueue 中的 msgTreeMap 中移除,並且傳回 msgTreeMap 中最小的偏移量(firstKey)去更新。注意:叢集模式偏移量保存在 Broker 端,更新偏移量需要傳送訊息到 Broker,而廣播模式偏移量保存在 Consumer 端,只需要更新本地偏移量就可以。
如果處理訊息的邏輯是並行的,處理訊息失敗後給 ackIndex 賦值是沒有意義的,因為可能有多條訊息失敗,給 ackIndex 變數賦值並不準確。最好的方法就是給 ackIndex 賦值 0,整批訊息全部重新消費,這樣又可能帶來冥等問題。
2.2 順序訊息
對於順序訊息,從msgTreeMap 取出訊息後,先要放到consumingMsgOrderlyTreeMap 上面,更新偏移量時,是從consumingMsgOrderlyTreeMap 上取最大的訊息偏移量( lastKey)。
3 總結
回到開頭的問題,如果一批訊息按照順序消費,是不可能出現第100 條訊息消費成功了,但第50 條消費失敗的情況,因為第50 條訊息失敗的時候,應該退出循環,不再繼續進行消費。
如果是並發消費,如果出現了這種情況,建議是整批訊息全部重新消費,也就是給 ackIndex 賦值 0,這樣就必須考慮冥等問題。
以上是阿里二面:RocketMQ 消費者拉取一批訊息,其中部分消費失敗了,偏移量怎麼更新?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

1.SpringBoot整合RocketMQ在SpringBoot中集成RocketMQ,只需要簡單四步:1.引入相關依賴org.apache.rocketmqrocketmq-spring-boot-starter2.添加RocketMQ的相關配置rocketmq:consumer:group:springboot_consumer_group#一次拉取訊息最大值,注意是拉取訊息的最大值而非消費最大值pull-batch-size:10name-server:10.5.103.6:9876pr

大家好,我是君哥。最近有讀者參加面試時被問了一個問題,如果消費者拉取了一批訊息,例如 100 條,第 100 條訊息消費成功了,但是第 50 條消費失敗,偏移量會怎樣更新?就著這個問題,今天來聊聊一下,如果一批訊息有消費失敗的狀況時,偏移量怎麼保存。 1 拉取訊息1.1 封裝拉取請求以 RocketMQ 推模式為例,RocketMQ 消費者啟動程式碼如下:public static void main(String[] args) throws InterruptedException, MQClie

隊列的生產者與消費者模式在PHP與MySQL中的實作方法隨著網路業務的快速發展,系統中處理大量任務的需求變得越來越迫切。隊列是一種常見的解決方案,可以有效率地處理任務。隊列的生產者-消費者模式(Producer-ConsumerPattern)在PHP和MySQL中的實作方法是常見的解決方案,本文將介紹具體的實作方法,並提供程式碼範例。生產者-消費者模式

應用場景在實現RocketMQ消費時,一般會用到@RocketMQMessageListener註解定義Group、Topic以及selectorExpression(資料過濾、選擇的規則)為了能支援動態篩選數據,一般都會使用表達式,然後透過apollo或cloudconfig進行動態切換。引入依賴org.apache.rocketmqrocketmq-spring-boot-starter2.0.4消費者代碼@RocketMQMessageListener(consumerGroup=&qu

在Docker中安裝RocketMQ的過程如下所示:建立Docker網路:在終端中執行以下命令來建立Docker網絡,以便在容器之間進行通訊:dockernetworkcreaterocketmq-network下載RocketMQ鏡像:在終端機中執行以下命令來下載RocketMQ的Docker映像:dockerpullrocketmqinc/rocketmq啟動NameServer容器:在終端機中執行以下指令來啟動NameServer容器:dockerrun-d--namermqnamesrv--

並發計算中普遍存在的同步挑戰被稱為生產者-消費者問題。鑑於多個執行緒或進程旨在在存取共享來源時協調各自的操作;這個問題需要複雜的溝通任務以及平衡的執行程序。今天的討論將有助於理解這一困難背後的概念,同時認識到它在當代計算機科學框架中的重要性-特別是在C++實現實踐中。理解生產者-消費者問題定義和目的解決生產者-消費者問題所帶來的挑戰的解決方案來自於明確劃分負責生產和使用資訊的人員之間的責任。當生產者自行產生新記錄時,消費者會透過同步他們的操作來確保它們被正確使用。人們必須小心避免競爭條件或死鎖等問題,如

1訊息publicclassMessageimplementsSerializable{privatestaticfinallongserialVersionUID=8445773977080406428L;//主題名字privateStringtopic;///訊息擴充訊息,Tag,keys,延遲層級都存在這裡privateMapproperties;///訊息擴充訊息,Tag,keys,延遲層級都存在這裡privateMapproperties;/訊息體,訊息擴展訊息組/pri ,publicvoidsetKeys(Stringkeys){}//設

1.說明consumer表示消耗,介面接受通用參數t,呼叫accept,對參數執行一系列操作,但沒有回傳值。 2、實例對Consumer來說,我們需要提供入參來消費。 classPerson{StringfirstName;StringlastName;Person(){}Person(StringfirstName,StringlastName){this.firstName=firstName;this.lastName=lastName;}}Java可以用來幹嘛Java主要套用在:1.web
