Menggunakan HornetQ untuk pemprosesan mesej dalam pembangunan API Java
Dengan perkembangan pesat Internet, sejumlah besar interaksi maklumat telah muncul, dan baris gilir mesej telah menjadi cara penting untuk menyelesaikan masalah seperti keselarasan yang tinggi , ketersediaan tinggi dan pemprosesan tak segerak. HornetQ ialah perisian tengah pemesejan sumber terbuka berprestasi tinggi dan ketersediaan tinggi berdasarkan protokol JMS yang dibangunkan oleh JBoss. Artikel ini akan memperkenalkan cara menggunakan HornetQ untuk pemprosesan mesej dalam pembangunan API Java.
1. Mula Pantas
Tapak web rasmi HornetQ (http://hornetq.apache.org/downloads.html) menyediakan pakej Muat turun dalam pelbagai format, pilih HornetQ-2.4.0.Final-bin.tar.gz di sini.
Selepas muat turun selesai, nyahzip HornetQ-2.4.0.Final-bin.tar.gz ke folder setempat.
Masukkan direktori bin HornetQ dan laksanakan arahan berikut:
./run.sh
The log berikut muncul Mesej menunjukkan bahawa perkhidmatan HornetQ telah berjaya dimulakan:
11:14:21,867 INFO [ServerImpl] Memulakan Pelayan HornetQ
11:14:21,986 INFO [JournalStorageManager] Menggunakan NIO>JournalStorageManager 11:14:22,626 INFO [NettyAcceptor] Memulakan Netty Acceptor versi #{versi}
11:14:22,697 INFO [HornetQServerImpl] HornetQ Server versi #{version} [${name}] bermula
public class Publisher { public static void main(String[] args) throws Exception { // 初始化连接工厂等配置信息 ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName())); JMSContext jmsContext = connectionFactory.createContext(); // 发送消息 JMSProducer producer = jmsContext.createProducer(); Destination destination = HornetQJMSClient.createTopic("exampleTopic"); producer.send(destination, "Hello, HornetQ!"); // 关闭连接 jmsContext.close(); } }
public class Subscriber { public static void main(String[] args) throws Exception { // 初始化连接工厂等配置信息 ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName())); JMSContext jmsContext = connectionFactory.createContext(); // 创建消费者 Destination destination = HornetQJMSClient.createTopic("exampleTopic"); JMSConsumer consumer = jmsContext.createConsumer(destination); // 接收消息并打印 String message = null; do { message = consumer.receiveBody(String.class, 1000); System.out.println("Received message: " + message); } while (message != null); // 关闭连接 jmsContext.close(); } }
public class Publisher { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName())); JMSContext jmsContext = connectionFactory.createContext(); // 设定持久性 JMSProducer producer = jmsContext.createProducer(); destination = HornetQJMSClient.createTopic("exampleTopic"); producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 发送消息 producer.send(destination, "Hello, HornetQ!"); jmsContext.close(); } }
<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd"> <cluster-password>guest</cluster-password> <paging-directory>${data.dir:../data}/paging</paging-directory> <bindings-directory>${data.dir:../data}/bindings</bindings-directory> <journal-directory>${data.dir:../data}/journal</journal-directory> <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory> <journal-type>NIO</journal-type> <journal-datasync>true</journal-datasync> <journal-min-files>2</journal-min-files> <journal-pool-files>10</journal-pool-files> <journal-file-size>10240</journal-file-size> <journal-buffer-timeout>28000</journal-buffer-timeout> <journal-max-io>1</journal-max-io> <disk-scan-period>5000</disk-scan-period> <max-disk-usage>90</max-disk-usage> <critical-analyzer>true</critical-analyzer> <critical-analyzer-timeout>120000</critical-analyzer-timeout> <critical-analyzer-check-period>60000</critical-analyzer-check-period> <critical-analyzer-policy>HALT</critical-analyzer-policy> <page-sync-timeout>1628000</page-sync-timeout> <global-max-size>100Mb</global-max-size> <connectors> <connector name="netty"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="host" value="192.168.1.1"/> <param key="port" value="5445"/> </connector> </connectors> <acceptors> <acceptor name="netty"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> <param key="host" value="192.168.1.1"/> <param key="port" value="5545"/> </acceptor> </acceptors> <cluster-connections> <cluster-connection name="my-cluster"> <address>jms</address> <connector-ref>netty</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> <forward-when-no-consumers>true</forward-when-no-consumers> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> <static-connectors> <connector-ref>netty</connector-ref> </static-connectors> </cluster-connection> </cluster-connections> <ha-policy> <replication> <slave> <allow-failback>true</allow-failback> <failback-delay>5000</failback-delay> <max-saved-replicated-journals-size>1000000</max-saved-replicated-journals-size> <restart-backup>true</restart-backup> </slave> </replication> </ha-policy> </configuration>
Atas ialah kandungan terperinci Menggunakan HornetQ untuk pemprosesan mesej dalam pembangunan API Java. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!