In some projects, opening an ActiveMQ separately is sometimes a bit cumbersome for project implementation. So we embed ActiveMQ into Tomcat, and ActiveMQ is started at the same time that Tomcat starts. From this we need to master three important knowledge points
BrokerService in ActiveMQ
Auto-start Servlet configuration
Use jconsole to understand the running status of embedded ActiveMQ
Add ActiveMQ dependency in pom.xml. This code example uses version 5.7 , Remember that only activemq-core
is enough.
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency>
When writing the BrokerService
code part, pay attention to three main points
Whether monitoring information needs to be displayed in jconsole
broker.setUseJmx(true)
Set the connection username and password, how to use the verification plug-in
Whether it is persistent, Storage location settings, persistence configuration
So you need to start a connection address tcp://localhost:61616
, the user name is admin
, The password is admin
, which needs to be persisted. The storage address of the persistent data file is /activemq
. The code of the BrokerService that needs to be started by jconsole
is as follows:
// author:herbert qq:464884492 BrokerService broker = new BrokerService(); broker.setUseJmx(true); // 开启监控 broker.setPersistent(true); // 持久化 broker.setBrokerName("Test"); SimpleAuthenticationPlugin sap = new SimpleAuthenticationPlugin(); AuthenticationUser au = new AuthenticationUser("admin", "admin","users"); ArrayList<AuthenticationUser> d = new ArrayList<AuthenticationUser>(); d.add(au); sap.setUsers(d); // 用户验证 broker.setPlugins(new BrokerPlugin[] { sap }); String mqDataPath = "/activemq"; // 存储位置 broker.getPersistenceAdapter().setDirectory(new File(mqDataPath)); broker.addConnector("tcp://localhost:61616"); // 连接地址 broker.start();
In ActiveMQ, there are two general message delivery methods
queue, which supports message persistence and unconsumed messages , persists after reboot. If there are multiple consumers, on the premise of extracting one message at a time, all consumers will share the message
topics in the queue equally. Message persistence is not supported. Unconsumed messages will be Messages are lost after reboot. If there are multiple consumers, each consumer consumes all the messages in the topic in turn
Whether it is written by the producer or the consumer code, there are mainly 4 steps
Establish a connection, use failover:()
method, automatically disconnect and reconnect
Establish Session
, get the sending or receiving destination Destination
, specify whether it is a queue (session.createQueue(queueName)
) or a topic (session.createTopic(topicName)
)
Get the producer or consumer through Session
Produce or consume messages
We now write a producer code and loop to generate 10 messages
// author:herbert qq:464884492 String mqConnUrl = "tcp://localhost:61616"; String connUrl = "failover:(" + mqConnUrl.trim()+ ")?initialReconnectDelay=1000&maxReconnectDelay=30000"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin", connUrl); javax.jms.Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("system"); MessageProducer messageProducer = session.createProducer(destination); for (int i = 0; i < 10; i++) { javax.jms.TextMessage message = session.createTextMessage("ActiveMQ 发送的消息" + i); System.out.println("发送消息:" + "ActiveMQ 发送的消息" + i); messageProducer.send(message); }
Write a consumer to consume the above 10 messages
// author:herbert qq:464884492 String mqConnUrl = "tcp://localhost:61616"; String connUrl = "failover:(" + mqConnUrl.trim()+ ")?initialReconnectDelay=1000&maxReconnectDelay=30000"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", connUrl); javax.jms.Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("system"); MessageConsumer messageConsumer = session.createConsumer(destination); messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(javax.jms.Message message) { ActiveMQTextMessage m = (ActiveMQTextMessage) message; try { System.out.println("接收到:" + m.getText()); } catch (JMSException e) { e.printStackTrace(); } } });
Operation effect
It can be seen that the 10 messages generated by our producer have been successfully processed by the consumer.
For embedded ActiveMQ, you need to set broker.setUseJmx(true) before starting BrokerService; then find your JAVA_HOME, switch to bin, and enter the jconsole command.
#After jconsole starts, select the process where ActiveMQ is located. After connecting, select the Mbean tab
. The red boxes indicate the number of messages that have been consumed and those that have entered MQ. Select the operation, find the SendTextMessage, and you can also send messages to this queue.
For Tomcat after Tomcat7. This value>=0 indicates that self-starting is required. The smaller the value, the higher the priority.
// author:herbert qq:464884492 @WebServlet(urlPatterns = "/initmq", loadOnStartup = 1) public class InitMqServlet extends HttpServlet { @Override public void init(ServletConfig config) throws ServletException { super.init(config); // 这里编写启动ActiveMQ代码 } }
activemq-broker, resulting in messages being able to be connected but not being sent. Afterwards, directly replace it with
activemq-all. If there is a slf4j log conflict, using
exclusions still cannot solve the problem. In the end, we only rely on
activemq-core, which perfectly solves all problems.
JMS message persistence, persist ActiveMQ messages to mySql database
Related understanding of Session settings in ActiveMQ
The above is the detailed content of Embedding ActiveMQ into Tomcat: three important knowledge points need to be mastered. For more information, please follow other related articles on the PHP Chinese website!