1 19 20 package org.apache.cayenne.event; 21 22 import java.io.Serializable ; 23 import java.util.Collection ; 24 25 import javax.jms.Message ; 26 import javax.jms.MessageFormatException ; 27 import javax.jms.MessageListener ; 28 import javax.jms.ObjectMessage ; 29 import javax.jms.Session ; 30 import javax.jms.Topic ; 31 import javax.jms.TopicConnection ; 32 import javax.jms.TopicConnectionFactory ; 33 import javax.jms.TopicPublisher ; 34 import javax.jms.TopicSession ; 35 import javax.jms.TopicSubscriber ; 36 import javax.naming.Context ; 37 import javax.naming.InitialContext ; 38 import javax.naming.NameNotFoundException ; 39 import javax.naming.NamingException ; 40 41 import org.apache.cayenne.util.IDUtil; 42 43 51 public class JMSBridge extends EventBridge implements MessageListener { 52 53 static final String VM_ID = new String (IDUtil.pseudoUniqueByteSequence16()); 54 static final String VM_ID_PROPERRTY = "VM_ID"; 55 56 protected String topicConnectionFactoryName; 57 58 protected TopicConnection sendConnection; 59 protected TopicSession sendSession; 60 protected TopicConnection receivedConnection; 61 protected TopicPublisher publisher; 62 protected TopicSubscriber subscriber; 63 64 public JMSBridge(EventSubject localSubject, String externalSubject) { 65 super(localSubject, externalSubject); 66 } 67 68 71 public JMSBridge(Collection localSubjects, String externalSubject) { 72 super(localSubjects, externalSubject); 73 } 74 75 79 public void onMessage(Message message) { 80 81 try { 82 Object vmID = message.getObjectProperty(JMSBridge.VM_ID_PROPERRTY); 83 if (JMSBridge.VM_ID.equals(vmID)) { 84 return; 85 } 86 87 if (!(message instanceof ObjectMessage )) { 88 return; 89 } 90 91 ObjectMessage objectMessage = (ObjectMessage ) message; 92 CayenneEvent event = messageObjectToEvent(objectMessage.getObject()); 93 if (event != null) { 94 onExternalEvent(event); 95 } 96 97 } 98 catch (MessageFormatException mfex) { 99 } 102 catch (Exception ex) { 103 } 106 } 107 108 111 public String getTopicConnectionFactoryName() { 112 return topicConnectionFactoryName; 113 } 114 115 public void setTopicConnectionFactoryName(String name) { 116 this.topicConnectionFactoryName = name; 117 } 118 119 122 protected void startupExternal() throws Exception { 123 Context jndiContext = new InitialContext (); 124 TopicConnectionFactory connectionFactory = (TopicConnectionFactory ) jndiContext 125 .lookup(topicConnectionFactoryName); 126 127 Topic topic = null; 128 129 try { 130 topic = (Topic ) jndiContext.lookup(externalSubject); 131 } 132 catch (NameNotFoundException ex) { 133 topic = topicNotFound(jndiContext, ex); 135 136 if (topic == null) { 137 throw ex; 138 } 139 } 140 141 if (receivesLocalEvents()) { 143 this.sendConnection = connectionFactory.createTopicConnection(); 144 this.sendSession = sendConnection.createTopicSession( 145 false, 146 Session.AUTO_ACKNOWLEDGE); 147 this.publisher = sendSession.createPublisher(topic); 148 } 149 150 if (receivesExternalEvents()) { 152 this.receivedConnection = connectionFactory.createTopicConnection(); 153 this.subscriber = receivedConnection.createTopicSession( 154 false, 155 Session.AUTO_ACKNOWLEDGE).createSubscriber(topic); 156 this.subscriber.setMessageListener(this); 157 this.receivedConnection.start(); 158 } 159 } 160 161 166 protected Topic topicNotFound(Context jndiContext, NamingException ex) 167 throws Exception { 168 throw ex; 169 } 170 171 174 protected void shutdownExternal() throws Exception { 175 Exception lastException = null; 176 177 if (publisher != null) { 178 try { 179 publisher.close(); 180 } 181 catch (Exception ex) { 182 lastException = ex; 183 } 184 } 185 186 if (subscriber != null) { 187 try { 188 subscriber.close(); 189 } 190 catch (Exception ex) { 191 lastException = ex; 192 } 193 } 194 195 if (receivedConnection != null) { 196 try { 197 receivedConnection.close(); 198 } 199 catch (Exception ex) { 200 lastException = ex; 201 } 202 } 203 204 if (sendSession != null) { 205 try { 206 sendSession.close(); 207 } 208 catch (Exception ex) { 209 lastException = ex; 210 } 211 } 212 213 if (sendConnection != null) { 214 try { 215 sendConnection.close(); 216 } 217 catch (Exception ex) { 218 lastException = ex; 219 } 220 } 221 222 publisher = null; 223 subscriber = null; 224 receivedConnection = null; 225 sendConnection = null; 226 sendSession = null; 227 228 if (lastException != null) { 229 throw lastException; 230 } 231 } 232 233 protected void sendExternalEvent(CayenneEvent localEvent) throws Exception { 234 ObjectMessage message = sendSession 235 .createObjectMessage(eventToMessageObject(localEvent)); 236 message.setObjectProperty(JMSBridge.VM_ID_PROPERRTY, JMSBridge.VM_ID); 237 publisher.publish(message); 238 } 239 240 245 protected Serializable eventToMessageObject(CayenneEvent event) throws Exception { 246 return event; 247 } 248 249 254 protected CayenneEvent messageObjectToEvent(Serializable object) throws Exception { 255 return (object instanceof CayenneEvent) ? (CayenneEvent) object : null; 256 } 257 } 258 | Popular Tags |