1 22 package org.jboss.mq.il.ha.examples; 23 24 import javax.jms.ExceptionListener ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageListener ; 28 import javax.jms.Queue ; 29 import javax.jms.QueueConnection ; 30 import javax.jms.QueueConnectionFactory ; 31 import javax.jms.QueueReceiver ; 32 import javax.jms.QueueSender ; 33 import javax.jms.QueueSession ; 34 import javax.jms.TextMessage ; 35 import javax.jms.Topic ; 36 import javax.jms.TopicConnection ; 37 import javax.jms.TopicConnectionFactory ; 38 import javax.jms.TopicPublisher ; 39 import javax.jms.TopicSession ; 40 import javax.jms.TopicSubscriber ; 41 import javax.naming.InitialContext ; 42 import javax.naming.NamingException ; 43 44 import org.jboss.logging.Logger; 45 import org.jboss.system.ServiceMBeanSupport; 46 47 54 public class HAJMSClient extends ServiceMBeanSupport 55 implements MessageListener , ExceptionListener , HAJMSClientMBean 56 { 57 58 61 protected void startService() throws Exception 62 { 63 connect(); 64 } 65 66 70 protected void stopService() throws Exception 71 { 72 disconnect(); 73 } 74 75 79 public void onException(JMSException connEx) 80 { 81 log.info("Notification received by ExceptionListener. Singleton Probably Moved."); 82 try 83 { 84 reconnect(); 85 } 86 catch (Exception e) 87 { 88 e.printStackTrace(); 89 } 90 finally 91 { 92 connectionException_ = connEx; 93 } 94 } 95 96 protected void reconnect() throws NamingException , JMSException 97 { 98 log.info("Reconnecting"); 99 try 100 { 101 disconnect(); 102 } 103 finally 104 { 105 connect(); 106 } 107 } 108 109 public void connect() throws NamingException , JMSException 110 { 111 log.info("Connecting"); 112 113 InitialContext iniCtx = new InitialContext (); 114 Object tmp = iniCtx.lookup("HAILXAConnectionFactory"); 115 116 TopicConnectionFactory tcf = (TopicConnectionFactory )tmp; 117 topicConn_ = tcf.createTopicConnection(); 118 topic_ = (Topic )iniCtx.lookup("topic/testTopic"); 119 topicSession_ = topicConn_.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); 120 topicConn_.setExceptionListener(this); 121 topicSub_ = topicSession_.createSubscriber(topic_); 122 topicSub_.setMessageListener( this ); 123 topicPub_ = topicSession_.createPublisher(topic_); 124 topicConn_.start(); 125 126 QueueConnectionFactory qcf = (QueueConnectionFactory )tmp; 127 qConn_ = qcf.createQueueConnection(); 128 q_ = (Queue )iniCtx.lookup("queue/testQueue"); 129 qSession_ = qConn_.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); 130 qRecv_ = qSession_.createReceiver(q_); 131 qRecv_.setMessageListener( this ); 132 qSend_ = qSession_.createSender(q_); 133 qConn_.start(); 134 135 log.info("Connected"); 136 } 137 138 public void disconnect() throws JMSException 139 { 140 if (topicConn_ == null) return; 141 142 log.info("Disconnecting"); 143 144 connectionException_ = null; 145 146 try 147 { 148 topicConn_.setExceptionListener(null); 149 150 topicSub_.close(); 151 topicPub_.close(); 152 topicConn_.stop(); 153 topicSession_.close(); 154 155 qRecv_.close(); 156 qSend_.close(); 157 qConn_.stop(); 158 qSession_.close(); 159 } 160 finally 161 { 162 try 163 { 164 topicConn_.close(); 165 } 166 finally 167 { 168 topicConn_ = null; 169 try 170 { 171 qConn_.close(); 172 } 173 finally 174 { 175 qConn_ = null; 176 } 177 } 178 179 } 180 log.info("Disconnected"); 181 } 182 183 188 public void onMessage(Message msg) 189 { 190 lastMessage_ = (TextMessage )msg; 191 log.info("Message received: " + msg); 192 } 193 194 public String getLastMessage() throws JMSException 195 { 196 if (lastMessage_ == null) return null; 197 return lastMessage_.getText(); 198 } 199 200 public String getConnectionException() 201 { 202 if (connectionException_ == null) return null; 203 return connectionException_.toString(); 204 } 205 206 public void publishMessageToTopic(String text) throws JMSException 207 { 208 TextMessage msg = topicSession_.createTextMessage(text); 209 topicPub_.publish(msg); 210 log.info("HA JMS message published to topic: " + text); 211 } 212 213 public void sendMessageToQueue(String text) throws JMSException 214 { 215 TextMessage msg = qSession_.createTextMessage(text); 216 qSend_.send(msg); 217 log.info("HA JMS message sent to queue: " + text); 218 } 219 220 private Topic topic_; 221 private TopicSession topicSession_; 222 private TopicConnection topicConn_; 223 private JMSException connectionException_; 224 private TopicSubscriber topicSub_; 225 private TopicPublisher topicPub_; 226 private TextMessage lastMessage_; 227 228 private Queue q_; 229 private QueueConnection qConn_; 230 private QueueSession qSession_; 231 private QueueReceiver qRecv_; 232 private QueueSender qSend_; 233 234 private static Logger log = Logger.getLogger(HAJMSClient.class); 235 236 } 237 | Popular Tags |