大家好,我是君哥。
最近有讀者參加面試時被問了一個問題,如果消費者拉取了一批訊息,例如100 條,第100 條訊息消費成功了,但是第50 條消費失敗,偏移量會怎樣更新?就著這個問題,今天來聊聊一下,如果一批訊息有消費失敗的狀況時,偏移量怎麼保存。
#以RocketMQ 推模式為例,RocketMQ 消費者啟動程式碼如下:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
上面的DefaultMQPushConsumer 是一個推模式的消費者,啟動方法是start。消費者啟動後會觸發重平衡線程(RebalanceService),這個線程的任務是在死循環中不停地進行重平衡,最終封裝拉取訊息的請求到 pullRequestQueue。這個過程涉及的UML 類別圖如下:
封裝好拉取訊息的請求PullRequest 後,RocketMQ 就會不停地從pullRequestQueue 取得訊息拉取請求進行處理。 UML 類別圖如下:
拉取訊息的入口方法是一個死循環,程式碼如下:
//PullMessageService public void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
這裡拉取到訊息後,提交給PullCallback 這個回呼函數進行處理。
拉取到的訊息先被 put 到 ProcessQueue 中的 msgTreeMap 上,然後被封裝到 ConsumeRequest 這個執行緒類別來處理。把程式碼精簡後,ConsumeRequest 處理邏輯如下:
//ConsumeMessageConcurrentlyService.java public void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) { //2.处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
並發訊息處理消費結果的程式碼做精簡後如下:
//ConsumeMessageConcurrentlyService.java public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ){ int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; break; case RECONSUME_LATER: break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
從上面的程式碼可以看出,如果處理訊息的邏輯是串列的,例如文章開頭的程式碼使用for 迴圈來處理訊息,那如果在某一訊息處理失敗了,直接退出循環,給ConsumeConcurrentlyContext 的ackIndex 變數賦值為訊息清單中失敗訊息的位置,這樣這條失敗訊息後面的訊息就不再處理了,發送給Broker 等待重新拉取。程式碼如下:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ for (int i = 0; i < msgs.size(); i++) { try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ context.setAckIndex(i); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
消費成功的訊息則從 ProcessQueue 中的 msgTreeMap 中移除,並且傳回 msgTreeMap 中最小的偏移量(firstKey)去更新。注意:叢集模式偏移量保存在 Broker 端,更新偏移量需要傳送訊息到 Broker,而廣播模式偏移量保存在 Consumer 端,只需要更新本地偏移量就可以。
如果處理訊息的邏輯是並行的,處理訊息失敗後給 ackIndex 賦值是沒有意義的,因為可能有多條訊息失敗,給 ackIndex 變數賦值並不準確。最好的方法就是給 ackIndex 賦值 0,整批訊息全部重新消費,這樣又可能帶來冥等問題。
對於順序訊息,從msgTreeMap 取出訊息後,先要放到consumingMsgOrderlyTreeMap 上面,更新偏移量時,是從consumingMsgOrderlyTreeMap 上取最大的訊息偏移量( lastKey)。
回到開頭的問題,如果一批訊息按照順序消費,是不可能出現第100 條訊息消費成功了,但第50 條消費失敗的情況,因為第50 條訊息失敗的時候,應該退出循環,不再繼續進行消費。
如果是並發消費,如果出現了這種情況,建議是整批訊息全部重新消費,也就是給 ackIndex 賦值 0,這樣就必須考慮冥等問題。
以上是阿里二面:RocketMQ 消費者拉取一批訊息,其中部分消費失敗了,偏移量怎麼更新?的詳細內容。更多資訊請關注PHP中文網其他相關文章!