Java 개발 RocketMQ 생산자 고가용성 사례 분석
1 메시지
public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L; //主题名字 private String topic; //消息扩展信息,Tag,keys,延迟级别都存在这里 private Map<String, String> properties; //消息体,字节数组 private byte[] body; //设置消息的key, public void setKeys(String keys) {} //设置topic public void setTopic(String topic) {} //延迟级别 public int setDelayTimeLevel(int level) {} //消息过滤的标记 public void setTags(String tags) {} //扩展信息存放在此 public void putUserProperty(final String name, final String value) {} }
메시지는 아이들입니다. 이 아이들은 각자의 특성도 있지만 공통점도 있습니다. 같은 부모가 보낸 두 자녀가 같은 곳으로 갈 수도 있고, 서로 다른 곳으로 갈 수도 있습니다.
1.1 topic
우선, 각 어린이 메시지에는 위에서 언급한 topic 속성이 있는데, 대기실입니다. 아이들은 입장 후 지정된 대기실의 지정 구역으로 걸어가서(보통 기차와 고속철도는 지정된 승강장에서 타지 않나요?) 메시지 대기열 좌석에 앉았고, 그리고 그들의 여행을 기다렸다.
Broker에는 하나 이상의 주제가 있으며 메시지는 해당 주제의 메시지 대기열에 저장되어 소비되기를 기다립니다.
1.2 Body
어린이 뉴스에도 Body 속성이 있는데, 이는 그의 능력입니다. 그는 그림을 그릴 수 있고 노래도 할 수 있으며 무엇이든 할 수 있으며 이것이 Body 속성에 기록됩니다. 밖에 나갈 때 가치가 반영되는 곳도 Body 속성이다.
Body는 메시지 본문이며 소비자는 메시지 본문을 기반으로 해당 작업을 수행합니다.
1.3 태그
이 태그는 이전 섹션에서 언급한 마크입니다. 어떤 아이들은 화판과 카메라를 가지고 다니기도 하고, 어떤 유람선에서는 이 아이들을 특별히 찾아 데려가서 작업을 완료하기도 합니다.
메시지에 대한 태그 속성을 설정할 수 있으며, 소비자는 소비할 특정 태그 속성이 포함된 메시지를 선택할 수 있습니다.
1.4 key
key는 각 어린이 메시지의 이름입니다. 아이를 찾고 싶다면 이름을 불러주세요.
보낸 메시지에 키를 설정하면 향후 이 키를 기준으로 메시지를 검색할 수 있습니다. 예를 들어 메시지가 비정상적이거나 메시지가 손실된 경우 검색하는 것이 매우 편리합니다.
1.5 지연 수준
물론, 일부 아이들은 급하게 출발하지도 않습니다. 식사를 하기 전에 생각을 해봤기 때문에 30분을 기다린 후에 픽업을 하게 됩니다.
지연 수준을 설정하여 메시지가 소비될 수 있는 시간을 지정하세요.
2 생산자 고가용성
자녀를 보내는 모든 부모는 자녀를 대기실에 보내고 싶어하며 자녀가 잃어버리는를 원하지 않습니다. 이때 대기실에는 몇 가지 보증 메커니즘이 필요합니다.
2.1 클라이언트는 생산자의 고가용성을 보장합니다.
2.1.1 재시도 메커니즘
은 부모가 자식을 보낸 후 대기실에 들어간 후 메시지 대기열 좌석에 앉지 못하여 작동 중임을 의미합니다. 이때 직원이 다시 시도하여 좌석이 있는지 확인할 것입니다. 기본 재시도 횟수는 2회입니다. 이는 메시지 하위가 자리를 찾을 수 있는 기회가 총 3회 있다는 의미입니다.
소스코드를 보시면 특별히 코멘트를 추가해두었으니 대략 이해되실 겁니다.
//这里取到了重试的次数 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //获取消息队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } //发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); ... } catch (RemotingException e) { ... continue; } catch (MQClientException e) { ... continue; } catch (MQBrokerException e) { ... continue; } catch (InterruptedException e) { //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试 throw e; } } else { break; } }
재시도 코드는 위와 같습니다. 이 sendDefaultImpl
메서드에서는 메시지 전송을 세 번 시도합니다. 실패하면 해당 오류가 발생합니다. sendDefaultImpl
方法中,会尝试发送三次消息,若是都失败,才会抛出对应的错误。
2.1.2 客户端容错
若是有多个Broker候车大厅的时候,服务人员会安排消息孩子选择一个相对不拥挤,比较容易进入的来进入。当然那些已经关闭的,停电的,没有服务能力的,我们是不会进的。
MQ Client会维护一个Broker的发送延迟信息,根据这个信息会选择一个相对延迟较低的Broker来发送消息。会主动剔除哪些已经宕机,不可用或发送延迟级别较高的Broker.
选择Broker
就是在选择message queue
브로커 대기실이 여러 개 있는 경우 서비스 직원은 어린이에게 메시지를 보내 상대적으로 덜 혼잡하고 입장하기 더 쉬운 중 하나를 선택하도록 준비합니다. 물론 폐쇄
,정전, 서비스 불가능 등은 입력하지 않습니다.
MQ 클라이언트는 브로커의 전송 지연 정보를 유지하고, 이 정보를 기반으로 상대적으로 지연이 적은 브로커를 선택하여 메시지를 보냅니다. 다운되었거나 사용할 수 없거나 전송 지연 수준이 높은 브로커를 적극적으로 제거합니다.브로커
를 선택하면 해당 코드는 다음과 같습니다.
지연 허용 스위치
가 켜져 있는지 확인합니다. 이 스위치는기본적으로 꺼져 있습니다. 켜져 있으면 지연 시간이 낮은 브로커가 우선 순위를 갖습니다.
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //判断发送延迟容错开关是否开启 if (this.sendLatencyFaultEnable) { try { //选择一个延迟上可以接受,并且和上次发送相同的Broker int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //若是Broker的延迟时间可以接受,则返回这个Broker if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } //若是前边都没选中一个Broker,就随机选一个Broker return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //若是没有上次的Brokername做参考,就随机选一个 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //如果有,那么就选一个其他的Broker for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //这里判断遇上一个使用的Broker不是同一个 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } //若是上边的都没选中,那么就随机选一个 return selectOneMessageQueue(); } }
위 내용은 Java 개발 RocketMQ 생산자 고가용성 사례 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

AI Hentai Generator
AI Hentai를 무료로 생성하십시오.

인기 기사

뜨거운 도구

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

드림위버 CS6
시각적 웹 개발 도구

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

뜨거운 주제











Java의 Weka 가이드. 여기에서는 소개, weka java 사용 방법, 플랫폼 유형 및 장점을 예제와 함께 설명합니다.

Java의 Smith Number 가이드. 여기서는 정의, Java에서 스미스 번호를 확인하는 방법에 대해 논의합니다. 코드 구현의 예.

이 기사에서는 가장 많이 묻는 Java Spring 면접 질문과 자세한 답변을 보관했습니다. 그래야 면접에 합격할 수 있습니다.

Java 8은 스트림 API를 소개하여 데이터 컬렉션을 처리하는 강력하고 표현적인 방법을 제공합니다. 그러나 스트림을 사용할 때 일반적인 질문은 다음과 같은 것입니다. 기존 루프는 조기 중단 또는 반환을 허용하지만 스트림의 Foreach 메소드는이 방법을 직접 지원하지 않습니다. 이 기사는 이유를 설명하고 스트림 처리 시스템에서 조기 종료를 구현하기위한 대체 방법을 탐색합니다. 추가 읽기 : Java Stream API 개선 스트림 foreach를 이해하십시오 Foreach 메소드는 스트림의 각 요소에서 하나의 작업을 수행하는 터미널 작동입니다. 디자인 의도입니다

Java의 TimeStamp to Date 안내. 여기서는 소개와 예제와 함께 Java에서 타임스탬프를 날짜로 변환하는 방법에 대해서도 설명합니다.

캡슐은 3 차원 기하학적 그림이며, 양쪽 끝에 실린더와 반구로 구성됩니다. 캡슐의 부피는 실린더의 부피와 양쪽 끝에 반구의 부피를 첨가하여 계산할 수 있습니다. 이 튜토리얼은 다른 방법을 사용하여 Java에서 주어진 캡슐의 부피를 계산하는 방법에 대해 논의합니다. 캡슐 볼륨 공식 캡슐 볼륨에 대한 공식은 다음과 같습니다. 캡슐 부피 = 원통형 볼륨 2 반구 볼륨 안에, R : 반구의 반경. H : 실린더의 높이 (반구 제외). 예 1 입력하다 반경 = 5 단위 높이 = 10 단위 산출 볼륨 = 1570.8 입방 단위 설명하다 공식을 사용하여 볼륨 계산 : 부피 = π × r2 × h (4

Spring Boot는 강력하고 확장 가능하며 생산 가능한 Java 응용 프로그램의 생성을 단순화하여 Java 개발에 혁명을 일으킨다. Spring Ecosystem에 내재 된 "구성에 대한 협약"접근 방식은 수동 설정, Allo를 최소화합니다.
