pom.xml:
<!-- --><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>4.3.7.RELEASE</version></dependency> <!-- --><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.9.0</version></dependency>
spring-jms.xml:
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:jms="http://www.springframework.org/schema/jms"xmlns:amq="http://activemq.apache.org/schema/core"xsi:schemaLocation="http://activemq.apache.org/schema/core http://www.springframework.org/schema/jms http://www.springframework.org/schema/beans "><!-- ActiveMQConnectionFactory就是JMS中负责创建到ActiveMQ连接的工厂类 --><bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" > <property name="brokerURL" value="tcp://192.168.0.224:61616"/> </bean><!-- 创建连接池 --><bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory" ref="connectionFactory"/> <property name="maxConnections" value="10"/> </bean> <!-- Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory --><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="pooledConnectionFactory"/> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="cachingConnectionFactory"/> </bean> <!--这个是队列目的地,点对点的--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="spring-queue"/> </bean> </beans>
ConnectionFactory is used to generate links to the JMS server. Spring provides us with multiple ConnectionFactory, including SingleConnectionFactory and CachingConnectionFactory. SingleConnectionFactory will always return the same link for requests to establish a JMS server link, and will ignore the Connection's close method call. CachingConnectionFactory inherits SingleConnectionFactory, so it has all the functions of SingleConnectionFactory, and it also adds a new caching function, which can cache Session, MessageProducer and MessageConsumer. Here we use CachingConnectionFactory as an example.
Message producer:
= ClassPathXmlApplicationContext("spring-jms.xml"=(JmsTemplate) context.getBean("jmsTemplate"=(Destination) context.getBean("queueDestination" Message createMessage(Session session) session.createTextMessage("Hello spring JMS"
Consumer:
package com.jalja.org.jms.spring;import javax.jms.Destination;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import org.springframework.jms.core.JmsTemplate;public class SpringJmsReceive {public static void main(String[] args) { ApplicationContext context=new ClassPathXmlApplicationContext("spring-jms.xml"); JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate"); Destination queueDestination=(Destination) context.getBean("queueDestination"); String msg=(String) jmsTemplate.receiveAndConvert(queueDestination); System.out.println(msg); } }
We directly configure the listener for asynchronously receiving messages in spring, which is equivalent to configuring the consumer in spring. There is no need to start the consumer when receiving messages.
spring-jms.xml:
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:jms="http://www.springframework.org/schema/jms"xmlns:amq="http://activemq.apache.org/schema/core"xsi:schemaLocation="http://activemq.apache.org/schema/core http://www.springframework.org/schema/jms http://www.springframework.org/schema/beans "><!-- ActiveMQConnectionFactory就是JMS中负责创建到ActiveMQ连接的工厂类 --><bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" > <property name="brokerURL" value="tcp://192.168.0.224:61616"/> </bean><!-- 创建连接池 --><bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory" ref="connectionFactory"/> <property name="maxConnections" value="10"/> </bean> <!-- Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory --><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="pooledConnectionFactory"/> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="cachingConnectionFactory"/> </bean> <!--这个是队列目的地,点对点的--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="spring-queue"/> </bean> <!-- 消息监听器 --> <bean id="myMessageListener" class="com.jalja.org.jms.spring.yb.MyMessageListener"/> <!-- 消息监听容器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="cachingConnectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean> </beans>
After the producer sends a message to the specified destination Destination, the next step is for the consumer to consume the message at the specified destination. . So how does the consumer know that a producer has sent a message to the specified destination? This is achieved through the message listening container MessageListenerContainer that Spring encapsulates for us. It is responsible for receiving information and distributing the received information to the real MessageListener for processing. Each consumer needs a corresponding MessageListenerContainer for each destination. For the message listening container, in addition to knowing which destination to listen to, it also needs to know where to listen. That is to say, it also needs to know which JMS server to listen to. This is done by injecting a MessageConnectionFactory into it when configuring the MessageConnectionFactory. ConnectionFactory to achieve. So when we configure a MessageListenerContainer, we have three attributes that must be specified. One is the ConnectionFactory that indicates where to listen; one is the Destination that indicates what to listen to; and the other is the MessageListener that processes the message after receiving the message. Spring provides us with two types of MessageListenerContainer, SimpleMessageListenerContainer and DefaultMessageListenerContainer.
SimpleMessageListenerContainer: SimpleMessageListenerContainer will create a session and consumer Consumer at the beginning, and will use the standard JMS MessageConsumer.setMessageListener() method to register the listener and let the JMS provider call the listener. callback function. It does not dynamically adapt to runtime needs and participate in external transaction management. In terms of compatibility, it is very close to the standalone JMS specification, but is generally not compatible with Java EE's JMS limitations.
DefaultMessageListenerContainer:In most cases we still use DefaultMessageListenerContainer. Compared with SimpleMessageListenerContainer, DefaultMessageListenerContainer will dynamically adapt to runtime needs and can participate in external transaction management. It is a good balance between low requirements on JMS providers, advanced features such as transaction participation and compatibility with Java EE environments.
Message producer:
public static void main(String[] args) { ApplicationContext context=new ClassPathXmlApplicationContext("spring-jms.xml"); JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate"); Destination queueDestination=(Destination) context.getBean("queueDestination"); System.out.println("异步调用执行开始"); jmsTemplate.send(queueDestination, new MessageCreator(){ @Overridepublic Message createMessage(Session session) throws JMSException {return session.createTextMessage("Hello spring JMS"); } }); System.out.println("异步调用执行结束"); }
Message listener: MyMessageListener
public class MyMessageListener implements MessageListener{ @Overridepublic void onMessage(Message message) { TextMessage msg= (TextMessage) message;try { System.out.println("你好:"+msg.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
Start the message producer and the execution result of the listener Yes:
异步调用执行开始 异步调用执行结束 你好:Hello spring JMS
spring-jms.xml:
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:jms="http://www.springframework.org/schema/jms"xmlns:amq="http://activemq.apache.org/schema/core"xsi:schemaLocation="http://activemq.apache.org/schema/core http://www.springframework.org/schema/jms http://www.springframework.org/schema/beans "><!-- ActiveMQConnectionFactory就是JMS中负责创建到ActiveMQ连接的工厂类 --><bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" > <property name="brokerURL" value="tcp://192.168.0.224:61616"/> </bean><!-- 创建连接池 --><bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory" ref="connectionFactory"/> <property name="maxConnections" value="10"/> </bean> <!-- Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory --><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="pooledConnectionFactory"/> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="cachingConnectionFactory"/> </bean> <!--这个是队列目的地,发布订阅--> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="spring-Topic"/> </bean> </beans>
Producer:
public static void main(String[] args) { ApplicationContext context=new ClassPathXmlApplicationContext("spring-jms.xml"); JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate"); Destination topicDestination=(Destination) context.getBean("topicDestination"); jmsTemplate.send(topicDestination, new MessageCreator(){ @Overridepublic Message createMessage(Session session) throws JMSException {return session.createTextMessage("Hello spring JMS topicDestination"); } }); }
Consumer:
public class SpringJmsSubscriber {public static void main(String[] args) { ApplicationContext context=new ClassPathXmlApplicationContext("spring-jms.xml"); JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate"); Destination topicDestination=(Destination) context.getBean("topicDestination"); String msg=(String) jmsTemplate.receiveAndConvert(topicDestination); System.out.println(msg); } }
The above is the detailed content of A brief analysis of spring integration of JMS Active MQ. For more information, please follow other related articles on the PHP Chinese website!