public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L; //主题名字 private String topic; //消息扩展信息,Tag,keys,延迟级别都存在这里 private Map<String, String> properties; //消息体,字节数组 private byte[] body; //设置消息的key, public void setKeys(String keys) {} //设置topic public void setTopic(String topic) {} //延迟级别 public int setDelayTimeLevel(int level) {} //消息过滤的标记 public void setTags(String tags) {} //扩展信息存放在此 public void putUserProperty(final String name, final String value) {} }
메시지는 아이들입니다. 이 아이들은 각자의 특성도 있지만 공통점도 있습니다. 같은 부모가 보낸 두 자녀가 같은 곳으로 갈 수도 있고, 서로 다른 곳으로 갈 수도 있습니다.
우선, 각 어린이 메시지에는 위에서 언급한 topic 속성이 있는데, 대기실입니다. 아이들은 입장 후 지정된 대기실의 지정 구역으로 걸어가서(보통 기차와 고속철도는 지정된 승강장에서 타지 않나요?) 메시지 대기열 좌석에 앉았고, 그리고 그들의 여행을 기다렸다.
Broker에는 하나 이상의 주제가 있으며 메시지는 해당 주제의 메시지 대기열에 저장되어 소비되기를 기다립니다.
어린이 뉴스에도 Body 속성이 있는데, 이는 그의 능력입니다. 그는 그림을 그릴 수 있고 노래도 할 수 있으며 무엇이든 할 수 있으며 이것이 Body 속성에 기록됩니다. 밖에 나갈 때 가치가 반영되는 곳도 Body 속성이다.
Body는 메시지 본문이며 소비자는 메시지 본문을 기반으로 해당 작업을 수행합니다.
이 태그는 이전 섹션에서 언급한 마크입니다. 어떤 아이들은 화판과 카메라를 가지고 다니기도 하고, 어떤 유람선에서는 이 아이들을 특별히 찾아 데려가서 작업을 완료하기도 합니다.
메시지에 대한 태그 속성을 설정할 수 있으며, 소비자는 소비할 특정 태그 속성이 포함된 메시지를 선택할 수 있습니다.
key는 각 어린이 메시지의 이름입니다. 아이를 찾고 싶다면 이름을 불러주세요.
보낸 메시지에 키를 설정하면 향후 이 키를 기준으로 메시지를 검색할 수 있습니다. 예를 들어 메시지가 비정상적이거나 메시지가 손실된 경우 검색하는 것이 매우 편리합니다.
물론, 일부 아이들은 급하게 출발하지도 않습니다. 식사를 하기 전에 생각을 해봤기 때문에 30분을 기다린 후에 픽업을 하게 됩니다.
지연 수준을 설정하여 메시지가 소비될 수 있는 시간을 지정하세요.
자녀를 보내는 모든 부모는 자녀를 대기실에 보내고 싶어하며 자녀가 잃어버리는를 원하지 않습니다. 이때 대기실에는 몇 가지 보증 메커니즘이 필요합니다.
은 부모가 자식을 보낸 후 대기실에 들어간 후 메시지 대기열 좌석에 앉지 못하여 작동 중임을 의미합니다. 이때 직원이 다시 시도하여 좌석이 있는지 확인할 것입니다. 기본 재시도 횟수는 2회입니다. 이는 메시지 하위가 자리를 찾을 수 있는 기회가 총 3회 있다는 의미입니다.
소스코드를 보시면 특별히 코멘트를 추가해두었으니 대략 이해되실 겁니다.
//这里取到了重试的次数 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //获取消息队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } //发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); ... } catch (RemotingException e) { ... continue; } catch (MQClientException e) { ... continue; } catch (MQBrokerException e) { ... continue; } catch (InterruptedException e) { //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试 throw e; } } else { break; } }
재시도 코드는 위와 같습니다. 이 sendDefaultImpl
메서드에서는 메시지 전송을 세 번 시도합니다. 실패하면 해당 오류가 발생합니다. sendDefaultImpl
方法中,会尝试发送三次消息,若是都失败,才会抛出对应的错误。
若是有多个Broker候车大厅的时候,服务人员会安排消息孩子选择一个相对不拥挤,比较容易进入的来进入。当然那些已经关闭的,停电的,没有服务能力的,我们是不会进的。
MQ Client会维护一个Broker的发送延迟信息,根据这个信息会选择一个相对延迟较低的Broker来发送消息。会主动剔除哪些已经宕机,不可用或发送延迟级别较高的Broker.
选择Broker
就是在选择message queue
브로커 대기실이 여러 개 있는 경우 서비스 직원은 어린이에게 메시지를 보내 상대적으로 덜 혼잡하고 입장하기 더 쉬운 중 하나를 선택하도록 준비합니다. 물론 폐쇄
,정전, 서비스 불가능 등은 입력하지 않습니다.
MQ 클라이언트는 브로커의 전송 지연 정보를 유지하고, 이 정보를 기반으로 상대적으로 지연이 적은 브로커를 선택하여 메시지를 보냅니다. 다운되었거나 사용할 수 없거나 전송 지연 수준이 높은 브로커를 적극적으로 제거합니다.브로커
를 선택하면 해당 코드는 다음과 같습니다.
기본적으로 꺼져 있습니다. 켜져 있으면 지연 시간이 낮은 브로커가 우선 순위를 갖습니다.
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //判断发送延迟容错开关是否开启 if (this.sendLatencyFaultEnable) { try { //选择一个延迟上可以接受,并且和上次发送相同的Broker int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //若是Broker的延迟时间可以接受,则返回这个Broker if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } //若是前边都没选中一个Broker,就随机选一个Broker return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //若是没有上次的Brokername做参考,就随机选一个 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //如果有,那么就选一个其他的Broker for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //这里判断遇上一个使用的Broker不是同一个 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } //若是上边的都没选中,那么就随机选一个 return selectOneMessageQueue(); } }
위 내용은 Java 개발 RocketMQ 생산자 고가용성 사례 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!