1 56 package org.objectstyle.cayenne.event; 57 58 import java.io.Serializable ; 59 60 import javax.jms.Message ; 61 import javax.jms.MessageFormatException ; 62 import javax.jms.MessageListener ; 63 import javax.jms.ObjectMessage ; 64 import javax.jms.Session ; 65 import javax.jms.Topic ; 66 import javax.jms.TopicConnection ; 67 import javax.jms.TopicConnectionFactory ; 68 import javax.jms.TopicPublisher ; 69 import javax.jms.TopicSession ; 70 import javax.jms.TopicSubscriber ; 71 import javax.naming.Context ; 72 import javax.naming.InitialContext ; 73 import javax.naming.NameNotFoundException ; 74 import javax.naming.NamingException ; 75 76 import org.apache.log4j.Logger; 77 78 86 public class JMSBridge extends EventBridge implements MessageListener { 87 private static Logger logObj = Logger.getLogger(JMSBridge.class); 88 89 protected String topicConnectionFactoryName; 90 91 protected TopicConnection sendConnection; 92 protected TopicSession sendSession; 93 protected TopicConnection receivedConnection; 94 protected TopicPublisher publisher; 95 protected TopicSubscriber subscriber; 96 97 public JMSBridge(EventSubject localSubject, String externalSubject) { 98 super(localSubject, externalSubject); 99 } 100 101 105 public void onMessage(Message message) { 106 107 try { 108 Object vmID = message.getObjectProperty(VM_ID_PROPERRTY); 109 if (VM_ID.equals(vmID)) { 110 logObj.debug("Message from same VM ignoring."); 111 return; 112 } 113 114 if (!(message instanceof ObjectMessage )) { 115 if (logObj.isDebugEnabled()) { 116 logObj.debug("Unsupported message, ignoring: " + vmID); 117 } 118 119 return; 120 } 121 122 ObjectMessage objectMessage = (ObjectMessage ) message; 123 CayenneEvent event = messageObjectToEvent(objectMessage.getObject()); 124 if (event != null) { 125 if (logObj.isDebugEnabled()) { 126 logObj.debug( 127 "Received CayenneEvent: " 128 + event.getClass().getName() 129 + ", id: " 130 + vmID); 131 } 132 133 onExternalEvent(event); 134 } 135 136 } catch (MessageFormatException mfex) { 137 Exception linkedException = mfex.getLinkedException(); 138 Exception logException = (linkedException != null) ? linkedException : mfex; 139 logObj.info("Message Format Exception: ", logException); 140 } catch (Exception ex) { 141 logObj.info("Exception while processing message: ", ex); 142 } 143 } 144 145 148 public String getTopicConnectionFactoryName() { 149 return topicConnectionFactoryName; 150 } 151 152 public void setTopicConnectionFactoryName(String name) { 153 this.topicConnectionFactoryName = name; 154 } 155 156 159 protected void startupExternal() throws Exception { 160 Context jndiContext = new InitialContext (); 161 TopicConnectionFactory connectionFactory = 162 (TopicConnectionFactory ) jndiContext.lookup(topicConnectionFactoryName); 163 164 Topic topic = null; 165 166 try { 167 topic = (Topic ) jndiContext.lookup(externalSubject); 168 } catch (NameNotFoundException ex) { 169 topic = topicNotFound(jndiContext, ex); 171 172 if (topic == null) { 173 throw ex; 174 } 175 } 176 177 if (receivesLocalEvents()) { 179 this.sendConnection = connectionFactory.createTopicConnection(); 180 this.sendSession = 181 sendConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 182 this.publisher = sendSession.createPublisher(topic); 183 } 184 185 if (receivesExternalEvents()) { 187 this.receivedConnection = connectionFactory.createTopicConnection(); 188 this.subscriber = 189 receivedConnection.createTopicSession( 190 false, 191 Session.AUTO_ACKNOWLEDGE).createSubscriber( 192 topic); 193 this.subscriber.setMessageListener(this); 194 this.receivedConnection.start(); 195 } 196 } 197 198 202 protected Topic topicNotFound(Context jndiContext, NamingException ex) 203 throws Exception { 204 throw ex; 205 } 206 207 210 protected void shutdownExternal() throws Exception { 211 Exception lastException = null; 212 213 if (publisher != null) { 214 try { 215 publisher.close(); 216 } catch (Exception ex) { 217 lastException = ex; 218 } 219 } 220 221 if (subscriber != null) { 222 try { 223 subscriber.close(); 224 } catch (Exception ex) { 225 lastException = ex; 226 } 227 } 228 229 if (receivedConnection != null) { 230 try { 231 receivedConnection.close(); 232 } catch (Exception ex) { 233 lastException = ex; 234 } 235 } 236 237 if (sendSession != null) { 238 try { 239 sendSession.close(); 240 } catch (Exception ex) { 241 lastException = ex; 242 } 243 } 244 245 if (sendConnection != null) { 246 try { 247 sendConnection.close(); 248 } catch (Exception ex) { 249 lastException = ex; 250 } 251 } 252 253 publisher = null; 254 subscriber = null; 255 receivedConnection = null; 256 sendConnection = null; 257 sendSession = null; 258 259 if (lastException != null) { 260 throw lastException; 261 } 262 } 263 264 protected void sendExternalEvent(CayenneEvent localEvent) throws Exception { 265 logObj.debug("Sending event remotely: " + localEvent); 266 ObjectMessage message = 267 sendSession.createObjectMessage(eventToMessageObject(localEvent)); 268 message.setObjectProperty(VM_ID_PROPERRTY, VM_ID); 269 publisher.publish(message); 270 } 271 272 277 protected Serializable eventToMessageObject(CayenneEvent event) throws Exception { 278 return event; 279 } 280 281 286 protected CayenneEvent messageObjectToEvent(Serializable object) throws Exception { 287 return (object instanceof CayenneEvent) ? (CayenneEvent) object : null; 288 } 289 } 290 | Popular Tags |