首頁 > 資料庫 > mysql教程 > activeMQ发布订阅模式中中常用工具类

activeMQ发布订阅模式中中常用工具类

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
發布: 2016-06-07 16:10:02
原創
1726 人瀏覽過

package com.jms;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import javax.jms.BytesMessage;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;impo

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

package com.jms;

 

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

 

import javax.jms.BytesMessage;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageConsumer;

import javax.jms.MessageProducer;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

 

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

import org.clapper.util.logging.Logger;

 

import com.pzoom.dsa.common.util.Log;

import com.pzoom.dsa.nerd.mysql.DBQueryHelper;

 

public class Jms

{

  static ConnectionFactory connectionFactory;

  static Connection connection = null;

  static Session session;

  static Map<String, MessageProducer> sendQueues = new ConcurrentHashMap<String, MessageProducer>();

 

  static Map<String, MessageConsumer> getQueues = new ConcurrentHashMap<String, MessageConsumer>();

 

  static Log log=Log.getLogger(DBQueryHelper.class);

 

  static {

    connectionFactory = new ActiveMQConnectionFactory(

      ActiveMQConnection.DEFAULT_USER,

      ActiveMQConnection.DEFAULT_PASSWORD,

      "tcp://10.100.100.100:61616?wireFormat.maxInactivityDuration=0");

    try

    {

      connection = connectionFactory.createConnection();

 

      connection.start();

 

      session = connection.createSession(Boolean.FALSE.booleanValue(),

        1);

    }

    catch (Exception e) {

      e.printStackTrace();

    }

  }

 

  static MessageProducer getMessageProducer(String name) {

    if (sendQueues.containsKey(name))

      return ((MessageProducer)sendQueues.get(name));

    try

    {

      Destination destination = session.createQueue(name);

      MessageProducer producer = session.createProducer(destination);

      sendQueues.put(name, producer);

      return producer;

    } catch (JMSException e) {

      e.printStackTrace();

    }

 

    return ((MessageProducer)sendQueues.get(name));

  }

 

  static MessageConsumer getMessageConsumer(String name) {

    if (getQueues.containsKey(name))

      return ((MessageConsumer)getQueues.get(name));

    try

    {

      Destination destination = session.createQueue(name);

      MessageConsumer consumer = session.createConsumer(destination);

      getQueues.put(name, consumer);

      return consumer;

    } catch (JMSException e) {

      e.printStackTrace();

    }

 

    return ((MessageConsumer)getQueues.get(name));

  }

 

  public static void sendMessage(String queue, String text) {

    try {

      TextMessage message = session.createTextMessage(text);

      getMessageProducer(queue).send(message);

     // log.info("sendMessage " + queue + "\t\t" + text);

    }

    catch (JMSException e) {

      e.printStackTrace();

    }

  }

   

  

   

  public static String getMessage(String queue)

  {

    try {

      TextMessage message = (TextMessage)getMessageConsumer(queue).receive(10000L);

      if (message != null)

      return message.getText();

    } catch (JMSException e) {

      e.printStackTrace();

    }

    return null;

  }

 

  public static void close() {

    try {

      session.close();

    } catch (JMSException e) {

      e.printStackTrace();

    }

    try {

      connection.close();

    } catch (JMSException e) {

      e.printStackTrace();

    }

  }

}

登入後複製

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板