> 백엔드 개발 > C#.Net 튜토리얼 > C#은 비동기 메시지 큐를 구현합니다.

C#은 비동기 메시지 큐를 구현합니다.

黄舟
풀어 주다: 2016-12-27 14:31:43
원래의
2116명이 탐색했습니다.

메시지 큐

메시지 큐(영어: Message queue)는 프로세스 간 통신 또는 동일한 프로세스의 다른 스레드 간의 통신 방법을 처리하는 데 사용됩니다. a 일반적으로 사용자로부터의 일련의 입력입니다. 메시지 큐는 비동기 통신 프로토콜을 제공합니다. 큐의 각 레코드에는 발생 시간, 입력 장치 유형 및 특정 입력 매개변수를 포함한 자세한 정보가 포함되어 있습니다. 동시에 메시지 큐와 상호 작용합니다. 메시지는 수신자가 검색할 때까지 대기열에 남아 있습니다.

간단히 말하면 대기열은 처리해야 하는 명령을 저장하지만 처리 결과를 제때 얻지 못합니다.

구현

실제로 메시지 대기열은 연결 목록 구조로 저장되는 경우가 많습니다. 권한이 있는 프로세스는 메시지 큐에서 메시지를 쓰거나 읽을 수 있습니다.

현재 JBoss Messaging, JORAM, Apache ActiveMQ, Sun Open Message Queue, Apache Qpid 및 HTTPSQS를 포함하여 메시지 대기열의 오픈 소스 구현이 많이 있습니다.

장점, 단점

메시지 대기열 자체는 비동기식이므로 메시지가 전송된 후에도 오랫동안 수신자가 메시지를 검색할 수 있으며 이는 대부분의 통신 프로토콜과 다릅니다. 예를 들어, WWW에서 사용되는 HTTP 프로토콜은 클라이언트가 요청한 후 서버의 응답을 기다려야 하기 때문에 동기식입니다. 그러나 비동기 통신 프로토콜이 필요한 상황이 많이 있습니다. 예를 들어, 한 프로세스는 이벤트가 발생했음을 다른 프로세스에 알리지만 응답을 기다릴 필요는 없습니다. 그러나 메시지 대기열의 비동기적 특성으로 인해 수신자가 최신 메시지를 수신하려면 메시지 대기열을 폴링해야 한다는 단점도 있습니다.

신호에 비해 메시지 대기열은 더 많은 정보를 전송할 수 있습니다. 파이프와 비교하여 메시지 큐는 형식화된 데이터를 제공하므로 개발자 작업량을 줄일 수 있습니다. 그러나 메시지 대기열에는 여전히 크기 제한이 있습니다.

큐 메시지 읽기

주로 두 가지 유형이 있습니다. (1) 서버에 의해 푸시됨 (2) 클라이언트에 의해 풀림

풀: 주로 클라이언트가 정기적으로 폴링합니다. 메시지 처리 받기;

푸시: 처리를 위해 이벤트 구독을 통해 구독자에게 적극적으로 알림

메시지 저장

간단한 저장은 메모리 연결 목록을 통해 달성됩니다. Redis와 같은 DB는 로컬 파일에 유지될 수도 있습니다.

비동기 처리의 일관성을 보장하는 방법

큐의 주요 목적은 메시지를 저장하는 것이지만 호출도 비동기화합니다. 및 구현. 하지만 메시지 처리의 일관성을 원한다면 마스터-슬레이브 DB를 운영하는 등 업무 처리 순서를 구분해 마스터가 쓰기를 담당하고 슬레이브가 읽기를 담당하지 않는 것이 좋은 방법이다. 작성 후 즉시 데이터베이스를 읽어서 원하는 결과를 얻을 수 있는 기회 동시에 여러 중간 상태가 호출 결과를 충족하면 업무 시간에 처리됩니다. "예외 메시지"는 다음 작업까지 유지됩니다.

위 코드

메시지 지원 핵심 대기열 설정

이벤트 처리
{    public delegate void MessageQueueEventNotifyHandler(Message.BaseMessage message);
 
    public class MessageQueue:Queue<BaseMessage>
    {
        public static MessageQueue GlobalQueue = new MessageQueue();
 
        private Timer timer = new Timer();
        public MessageQueue() {
            this.timer.Interval = 5000;
            this.timer.Elapsed += Notify;
            this.timer.Enabled = true;
        }
        private void Notify(object sender, ElapsedEventArgs e) {
            lock (this) {
                if (this.Count > 0) {
                    //this.messageNotifyEvent.GetInvocationList()[0].DynamicInvoke(this.Dequeue());
                    var message = this.Dequeue();
                    this.messageNotifyEvent(message);
                }
            }
        }
 
        private MessageQueueEventNotifyHandler messageNotifyEvent;
        public event MessageQueueEventNotifyHandler MessageNotifyEvent {
            add {
                this.messageNotifyEvent += value;
            }
 
            remove {
                if (this.messageNotifyEvent != null) {
                    this.messageNotifyEvent -= value;
                }
            }
        }
    }
}
로그인 후 복사

public const string OrderCodePrefix = "P";        public void Submit(Message.BaseMessage message)
        {
            Order order = message.Body as Order;
 
            if (order.OrderCode.StartsWith(OrderCodePrefix))
            {
                System.Console.WriteLine("这个是个正确的以({0})开头的订单:{1}", OrderCodePrefix,order.OrderCode);
            }
            else {
                System.Console.WriteLine("这个是个错误的订单,没有以({0})开头:{1}",OrderCodePrefix,order.OrderCode);
            }
        }
로그인 후 복사
특정 비즈니스에 따라 개인화 가능;

프록시를 통해 대기열에 메시지 추가

클라이언트 호출
public class OrderServiceProxy:IOrderService    {
        public void Submit(Message.BaseMessage message)
        {
            MessageQueue.MessageQueue.GlobalQueue.Enqueue(message);
        }
    }
로그인 후 복사

이것은 이벤트 바인딩 및 개인화 처리 트리거의 요구 사항을 충족하는 동시에 메시지 비동기를 달성합니다. 변형의 목적을 위해 더 자세히 확장하여 향후 프로젝트에서 사용하기를 바랍니다.
OrderService orderService = new OrderService();            MessageQueue.MessageQueue.GlobalQueue.MessageNotifyEvent += orderService.Submit;
 
            var orders = new List<Order>() {
                new Order(){OrderCode="P001"},
                new Order(){OrderCode="P002"},
                new Order(){OrderCode="B003"}
            };
 
            OrderServiceProxy proxy = new OrderServiceProxy();
            orders.ForEach(order => proxy.Submit(new Message.BaseMessage() { Body=order}));
 
            Console.ReadLine();
로그인 후 복사

위 내용은 비동기 메시지 큐를 구현하기 위한 C# 내용입니다. 더 많은 관련 내용은 PHP 중국어 홈페이지(www.php.cn)를 참고해주세요!


관련 라벨:
원천:php.cn
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿