public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L; //主题名字 private String topic; //消息扩展信息,Tag,keys,延迟级别都存在这里 private Map<String, String> properties; //消息体,字节数组 private byte[] body; //设置消息的key, public void setKeys(String keys) {} //设置topic public void setTopic(String topic) {} //延迟级别 public int setDelayTimeLevel(int level) {} //消息过滤的标记 public void setTags(String tags) {} //扩展信息存放在此 public void putUserProperty(final String name, final String value) {} }
メッセージは子供たちです。子供たちは独自の特徴を持っていますが、共通点もあります。同じ親から送られた 2 人の子供は、同じ場所に行くことも、別の場所に行くこともできます。
まず、各子メッセージには属性 topic があり、これは上で説明した待機ホールです。子どもたちは入ってきた後、指定された待合室の 指定エリア まで歩き (外出するときも指定のホームから高速鉄道に乗りますよね)、# の席に座ります。 ##メッセージキュー席 待って、旅行を待ちます。
ブローカーには 1 つ以上のトピックがあり、メッセージはトピック内のメッセージ キューに保存され、消費されるのを待ちます。 1.2 ボディ子供のニュース、ボディ属性もあります。これは彼の 能力で、彼は絵を描くことができ、歌うことができ、彼は歌うことができます。何を行っても、Body 属性に記録されます。外出時に値が反映される場所もBody属性です。
Body はメッセージ本文であり、コンシューマはメッセージ本文に基づいて対応する操作を実行します。 1.3 タグこのタグは前のセクションで説明しました。これはマーク です。お絵かきボードやカメラを背中に背負っている子供もいますし、クルーズ船もいます。 特別に見つけてください。子供たちは を引き離し、任務を完了します。
メッセージのタグ属性を設定でき、コンシューマは特定のタグ属性を含むメッセージを選択して消費できます。 1.4 キーキーは、各子供のメッセージのname です。子供を見つけたいなら、名前を呼んでください。
送信メッセージのキーを設定すると、後でこのキーに基づいてメッセージを検索できます。たとえば、メッセージに異常がある場合やメッセージが失われた場合に、検索するのに非常に便利です。 1.5 遅延レベルもちろん、来てもすぐに帰らない子もいます。よく考えてから来ます。食事には 30 分かかるので、来たら待ってます。30分後に迎えに来ます。 遅延レベルを設定すると、メッセージを消費できる時間を指定できます。 2 プロデューサーの高可用性子供を送り出す親は皆、子供を待合室に送り届けたいと考えており、子供が迷子になることを望んでいません。現時点では、待合室には何らかの保証メカニズムが必要です。 2.1 クライアントはプロデューサーの高可用性を保証します
に座れなかった場合、スタッフは空席があるかどうかを確認するために再度挑戦するよう手配します。デフォルトの再試行回数は 2 回です。つまり、メッセージの子には座席を見つける機会が合計 3 回あります。 ソースコードを見て、大まかに理解できるように特別にコメントを追加しました。
//这里取到了重试的次数 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //获取消息队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } //发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); ... } catch (RemotingException e) { ... continue; } catch (MQClientException e) { ... continue; } catch (MQBrokerException e) { ... continue; } catch (InterruptedException e) { //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试 throw e; } } else { break; } }
再試行コードは上記のとおりです。この
sendDefaultImpl メソッドでは、メッセージの送信を 3 回試行します。失敗すると、対応するエラーがスローされます。 2.1.2 クライアントのフォールト トレランス
を入力します。もちろん、 が閉鎖され、 が停電し、 がサービス機能を持たない は入力されません。 MQ クライアントは、ブローカーの送信遅延情報を維持し、この情報に基づいて、メッセージを送信する比較的遅延の少ないブローカーを選択します。ダウンしている、利用できない、または送信遅延レベルが高いブローカーを積極的に排除します。 Broker を選択すると、
message queueが選択されます。対応するコードは次のとおりです。
ここでは、まず 遅延フォールト トレランス スイッチ
がオンになっているかどうかを判断します。このスイッチは デフォルトではオフになっています
。オンになっている場合は、
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //判断发送延迟容错开关是否开启 if (this.sendLatencyFaultEnable) { try { //选择一个延迟上可以接受,并且和上次发送相同的Broker int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //若是Broker的延迟时间可以接受,则返回这个Broker if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } //若是前边都没选中一个Broker,就随机选一个Broker return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
A 異なるブローカー が選択されます。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //若是没有上次的Brokername做参考,就随机选一个 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //如果有,那么就选一个其他的Broker for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //这里判断遇上一个使用的Broker不是同一个 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } //若是上边的都没选中,那么就随机选一个 return selectOneMessageQueue(); } }
メッセージの子を正確に受信するために、少なくとも 2 つのホールがあり、1 つはホールになります。 メインホール
副ホールに入り、しばらくするとカードが機械を信じてビジー状態になるはずです(シャドウクローンの技術) そして、クローンを副ホールに入らせて、メインホールが停電して機能しなくなっても、副ホールのクローンはタスクを完了する限り大丈夫です。一般的には、子供が仕事を完了するために船に乗りに行くという本殿からのメッセージです。
以上がJava 開発 RocketMQ プロデューサーの高可用性の例の分析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。