ActiveMQ は言うまでもなく良いものです。 ActiveMQ は、Java、C、C++、C#、Ruby、Perl、Python、PHP などの複数の言語サポートを提供します。 Windows で GUI を開発しているので、C++ と C# に興味があります。C# の ActiveMQ は、.Net 開発をサポートする NMS (.Net Messaging Service) を提供します。以下の手順だけで簡単な実装を作成できます。 C++の応用は割と面倒なので、後ほど紹介記事を書きます。
1. ActiveMQ 公式 Web サイトにアクセスして、ActiveMQ の最新バージョンをダウンロードしました。現在、5.3.2 がリリースされています。
2. ActiveMQ の公式 Web サイトにアクセスして、Apache.NMS の最新バージョンをダウンロードします。ソース コードに興味がある場合は、Apache.NMS と Apache.NMS.ActiveMQ の 2 つの bin パッケージをダウンロードする必要があります。 src パッケージもダウンロードします。ここで注意していただきたいのは、NMS.ActiveMQ の 1.2.0 バージョンをダウンロードすると、Apache.NMS.ActiveMQ.dll には実際の使用時にバグがあるということです。つまり、ActiveMQ アプリケーションを停止するときに WaitOne 関数例外がスローされます。 src パッケージ内のソース コードを確認すると、Apache.NMS.ActiveMQ-1.2.0-srcsrcmaincsharpTransportInactivityMonitor.cs のコードが原因であることがわかります。ソース コードを変更して再コンパイルしてください。最新バージョン 1.3.0 ではこの不具合が修正されていることが確認できたので、最新バージョンをダウンロードしてください。
private void StopMonitorThreads() { lock(monitor) { if(monitorStarted.CompareAndSet(true, false)) { AutoResetEvent shutdownEvent = new AutoResetEvent(false); // Attempt to wait for the Timers to shutdown, but don't wait // forever, if they don't shutdown after two seconds, just quit. this.readCheckTimer.Dispose(shutdownEvent); shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); this.writeCheckTimer.Dispose(shutdownEvent); shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext) this.asyncTasks.Shutdown(); this.asyncTasks = null; this.asyncWriteTask = null; this.asyncErrorTask = null; } } } private void StopMonitorThreads() { lock(monitor) { if(monitorStarted.CompareAndSet(true, false)) { AutoResetEvent shutdownEvent = new AutoResetEvent(false); // Attempt to wait for the Timers to shutdown, but don't wait // forever, if they don't shutdown after two seconds, just quit. this.readCheckTimer.Dispose(shutdownEvent); shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); this.writeCheckTimer.Dispose(shutdownEvent); shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext) this.asyncTasks.Shutdown(); this.asyncTasks = null; this.asyncWriteTask = null; this.asyncErrorTask = null; } } }
3. ActiveMQ を実行し、ActiveMQ の解凍後に bin フォルダーを見つけます: ...apache-activemq-5.3.1bin。activemq.bat バッチ ファイルを実行して、ActiveMQ サーバーを起動します。デフォルトのポートは 61616 です。ファイル内で設定および変更されます。
4. ActiveMQ の簡単なアプリケーションを実装する C# プログラムを作成します。 WinForm または Console プログラムのいずれかの新しい C# プロジェクト (Product プロジェクトと Consumer プロジェクト) を作成し、Apache.NMS.dll および Apache.NMS.ActiveMQ.dll への参照を追加します。簡単なプロデューサーとコンシューマーの実装コードは次のとおりです:
プロデューサー:
using System; using System.Collections.Generic; using System.Text; using Apache.NMS; using Apache.NMS.ActiveMQ; using System.IO; using System.Xml.Serialization; using System.Runtime.Serialization.Formatters.Binary; namespace Publish { class Program { static void Main(string[] args) { try { //Create the Connection Factory IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/"); using (IConnection connection = factory.CreateConnection()) { //Create the Session using (ISession session = connection.CreateSession()) { //Create the Producer for the topic/queue IMessageProducer prod = session.CreateProducer( new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing")); //Send Messages int i = 0; while (!Console.KeyAvailable) { ITextMessage msg = prod.CreateTextMessage(); msg.Text = i.ToString(); Console.WriteLine("Sending: " + i.ToString()); prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue); System.Threading.Thread.Sleep(5000); i++; } } } Console.ReadLine(); } catch (System.Exception e) { Console.WriteLine("{0}",e.Message); Console.ReadLine(); } } } }
コンシューマー:
using System; using System.Collections.Generic; using System.Text; using Apache.NMS; using Apache.NMS.ActiveMQ; using System.IO; using System.Xml.Serialization; using System.Runtime.Serialization.Formatters.Binary; namespace Subscribe { class Program { static void Main(string[] args) { try { //Create the Connection factory IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/"); //Create the connection using (IConnection connection = factory.CreateConnection()) { connection.ClientId = "testing listener"; connection.Start(); //Create the Session using (ISession session = connection.CreateSession()) { //Create the Consumer IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false); consumer.Listener += new MessageListener(consumer_Listener); Console.ReadLine(); } connection.Stop(); connection.Close(); } } catch (System.Exception e) { Console.WriteLine(e.Message); } } static void consumer_Listener(IMessage message) { try { ITextMessage msg = (ITextMessage)message; Console.WriteLine("Receive: " + msg.Text); } catch (System.Exception e) { Console.WriteLine(e.Message); } } } }
プログラムによって実装される関数: プロデューサーは testing という名前のトピックを作成し、トピックにメッセージを送信します。コンシューマは 5 秒ごとにテスト トピックをサブスクライブするため、プロデューサがテスト トピックに関するメッセージを ActiveMQ サーバーに送信する限り、サーバーはテスト トピックをサブスクライブしているコンシューマにメッセージを送信します。
Producer.exe と Consumer.exe をコンパイルして生成し、2 つの EXE を実行してメッセージの送受信を確認します。
この例はトピック (トピック) です。ActiveMQ は別のメソッド、つまり P2P もサポートしています。この 2 つの違いは何ですか?違いは、トピックがブロードキャストであることです。つまり、トピックが複数のコンシューマによってサブスクライブされている場合、メッセージがサーバーに到達する限り、サーバーはメッセージをすべてのコンシューマに送信しますが、キューはポイントツーポイントであるということです。つまり、1 つのメッセージは 1 つのコンシューマにのみ送信できます。キューが複数のコンシューマによってサブスクライブされている場合、
msg1-->consumer A
などの特別な状況がない限り、メッセージは異なるコンシューマに 1 つずつ送信されます。msg2-->コンシューマ B
msg3-->コンシューマ C
msg4-->コンシューマ A
msg5-->コンシューマ B
msg6-->コンシューマ C
特殊なケースは以下を指します: ActiveMQ はフィルタリング メカニズムをサポートしています。つまり、プロデューサは、コンシューマ側のセレクタに対応するメッセージのプロパティ (プロパティ) を設定できます。コンシューマによって設定されたセレクタがメッセージのプロパティと一致する場合にのみ、メッセージは、消費者に送られます。トピックとキューはどちらもセレクターをサポートしています。
プロパティとセレクターを設定するには?次のコードを参照してください:
プロデューサー:
public void SetProperties() { ITextMessage msg = prod.CreateTextMessage(); msg.Text = i.ToString(); msg.Properties.SetString("myFilter", "test1"); Console.WriteLine("Sending: " + i.ToString()); prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue); ITextMessage msg = prod.CreateTextMessage(); msg.Text = i.ToString(); msg.Properties.SetString("myFilter", "test1"); Console.WriteLine("Sending: " + i.ToString()); prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue); }
コンシューマー:
public void SetSelector() { //生成consumer时通过参数设置Selector IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'"); //生成consumer时通过参数设置Selector IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'"); }
以上がC#でのActiveMQの応用を詳しく解説の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。