누구나 RabbitMq에 익숙합니다. 이 기사에서는 주로 RabbitMQ를 학습한 후 RabbitMQ.Client의 캡슐화를 공유합니다. 기사 마지막 부분에서는 캡슐화된 구성 요소와 데모를 소개하겠습니다.
아래 그림에서 볼 수 있듯이 게시자(Publisher)는 먼저 메시지를 Exchange(Exchange)로 보낸 다음 Exchange에서 지정된 큐(Queue)로 메시지를 보냅니다. 고객(Customer)은 을 통해 을 구독하거나 가 소비를 위해 지정된 큐 메시지 를 적극적으로 검색합니다.
그렇다면 방금 언급한 구독과 능동 검색은 푸시(수동)와 풀(능동)로 이해될 수 있습니다.
푸시, 메시지가 대기열에 추가되는 한 유휴 소비자는 소비하라는 알림을 받습니다. (내가 너를 찾지 않으면 네가 나를 찾을 때까지 기다릴게 관찰자 모드)
Pull을 수행하면 소비자에게 알림이 전달되지 않지만 소비자는 라운드 로빈 방식이나 정기적인 간격으로 대기열 메시지를 가져오는 작업을 주도하게 됩니다. (필요할 때만 찾아가요)사용 시나리오의 예를 들어보겠습니다. 주문 시스템과 배송 시스템이라는 두 가지 시스템이 있다고 가정합니다. 배송 메시지 지침은 적시에 상품을 배송하기 위해 필요합니다. 대기열을 구독하고 지침이 있는 한 처리합니다.
그러나 프로그램에서 때때로 네트워크 또는 DB 시간 초과와 같은 예외가 발생하고 메시지가 실패 대기열에 던져집니다. 이 경우 재전송 메커니즘이 필요합니다. 하지만 while(IsPostSuccess == True) 은 하고 싶지 않습니다. 왜냐하면 예외가 발생하는 한 일정 시간 내에 예외가 발생하게 되고 이러한 재시도는 의미가 없기 때문입니다.
이때, 실패한 큐 메시지를 정기적으로 또는 몇 분(실패 횟수 * 간격 분)마다 가져와서 다시 보내는 JOB이 있습니다.
패키지 게시
으아아아
코드 보기
다음은 기본 테스트의 게시 속도에 대한 스크린샷입니다.
4.2W/S는 안정적인 속도이며 역직렬화(ToJson)가 약간 더 빠릅니다.
구독 패키지
으아아아
코드 보기
다음은 기본 테스트의 게시 속도에 대한 스크린샷입니다.
빠르면 1.9K/S, 느리면 1.7K/S입니다.
패키지 가져오기
으아아아
코드 보기
빠르면 1.8K/s, 안정적일 때는 1.5K/S입니다.
首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:
1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常
2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。
/// <summary> /// RPC客户端 /// </summary> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="routingKey"></param> /// <param name="body"></param> /// <param name="isProperties"></param> /// <returns></returns> public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false) { var channel = GetModel(exchange, queue, routingKey, isProperties); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue, true, consumer); try { var correlationId = Guid.NewGuid().ToString(); var basicProperties = channel.CreateBasicProperties(); basicProperties.ReplyTo = queue; basicProperties.CorrelationId = correlationId; channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8()); var sw = Stopwatch.StartNew(); while (true) { var ea = consumer.Queue.Dequeue(); if (ea.BasicProperties.CorrelationId == correlationId) { return ea.Body.DeserializeUtf8(); } if (sw.ElapsedMilliseconds > 30000) throw new Exception("等待响应超时"); } } catch (Exception ex) { throw ex.GetInnestException(); } } /// <summary> /// RPC服务端 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="isProperties"></param> /// <param name="handler"></param> /// <param name="isDeadLetter"></param> public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter) { //队列声明 var channel = GetModel(queue, isProperties); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var msgStr = body.DeserializeUtf8(); var msg = msgStr.FromJson<T>(); var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { msg = handler(msg); } catch (Exception ex) { ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq"); } finally { channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8()); channel.BasicAck(ea.DeliveryTag, false); } }; channel.BasicConsume(queue, false, consumer); }
View Code
可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。
本篇文章,没有过多的写RabbitMq的知识点,因为园子的学习笔记实在太多了。下面把我的代码奉上 http://www.php.cn/ 。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。
如果本篇文章您有用,请点击一下推荐,谢谢大家阅读。