1 22 package org.jboss.cache.invalidation.bridges; 23 24 import java.io.Serializable ; 25 26 import javax.jms.JMSException ; 27 import javax.jms.Message ; 28 import javax.jms.MessageListener ; 29 import javax.jms.ObjectMessage ; 30 import javax.jms.Topic ; 31 import javax.jms.TopicConnection ; 32 import javax.jms.TopicConnectionFactory ; 33 import javax.jms.TopicPublisher ; 34 import javax.jms.TopicSession ; 35 import javax.jms.TopicSubscriber ; 36 import javax.naming.Context ; 37 import javax.naming.InitialContext ; 38 import javax.naming.NamingException ; 39 40 import org.jboss.cache.invalidation.BatchInvalidation; 41 import org.jboss.cache.invalidation.InvalidationBridgeListener; 42 import org.jboss.cache.invalidation.InvalidationManager; 43 import org.jboss.system.ServiceMBeanSupport; 44 45 63 64 public class JMSCacheInvalidationBridge 65 extends ServiceMBeanSupport 66 implements JMSCacheInvalidationBridgeMBean, 67 InvalidationBridgeListener, 68 MessageListener 69 { 70 72 public static final String JMS_CACHE_INVALIDATION_BRIDGE = "JMS_CACHE_INVALIDATION_BRIDGE"; 73 74 76 protected org.jboss.cache.invalidation.InvalidationManagerMBean invalMgr = null; 79 protected org.jboss.cache.invalidation.BridgeInvalidationSubscription invalidationSubscription = null; 80 protected String invalidationManagerName = InvalidationManager.DEFAULT_JMX_SERVICE_NAME; 81 82 protected boolean publishingAuthorized = false; 83 protected String connectionFactoryName = "java:/ConnectionFactory"; 84 protected String topicName = "topic/JMSCacheInvalidationBridge"; 85 protected boolean transacted = true; 86 protected int acknowledgeMode = TopicSession.AUTO_ACKNOWLEDGE; protected int propagationMode = JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION; 89 protected java.rmi.dgc.VMID serviceId = new java.rmi.dgc.VMID (); 90 91 protected TopicConnection conn = null; 92 protected TopicSession session = null; 93 protected Topic topic = null; 94 protected TopicSubscriber subscriber = null; 95 protected TopicPublisher pub = null; 96 97 protected String providerUrl = null; 98 99 101 103 public JMSCacheInvalidationBridge () { super (); } 104 105 107 109 public String getInvalidationManager () 110 { 111 return this.invalidationManagerName; 112 } 113 114 public void setInvalidationManager (String objectName) 115 { 116 this.invalidationManagerName = objectName; 117 } 118 119 public String getConnectionFactoryName () 120 { 121 return this.connectionFactoryName; 122 } 123 public void setConnectionFactoryName (String factoryName) 124 { 125 this.connectionFactoryName = factoryName; 126 } 127 128 public String getTopicName () 129 { 130 return this.topicName; 131 } 132 public void setTopicName (String topicName) 133 { 134 this.topicName = topicName; 135 } 136 137 public String getProviderUrl () 138 { 139 return providerUrl; 140 } 141 142 public void setProviderUrl (String providerUrl) 143 { 144 this.providerUrl = providerUrl; 145 } 146 147 public boolean isTransacted () 148 { 149 return this.transacted; 150 } 151 public void setTransacted (boolean isTransacted) 152 { 153 this.transacted = isTransacted; 154 } 155 156 public int getAcknowledgeMode () 157 { 158 return this.acknowledgeMode; 159 } 160 public void setAcknowledgeMode (int ackMode) 161 { 162 if (ackMode > 3 || ackMode < 1) 163 throw new RuntimeException ("Value AcknowledgeMode must be between 1 and 3"); 164 165 switch (ackMode) 166 { 167 case 1: this.acknowledgeMode = TopicSession.AUTO_ACKNOWLEDGE; break; 168 case 2: this.acknowledgeMode = TopicSession.CLIENT_ACKNOWLEDGE; break; 169 case 3: this.acknowledgeMode = TopicSession.DUPS_OK_ACKNOWLEDGE; break; 170 } 171 } 172 173 public int getPropagationMode () 174 { 175 return this.propagationMode; 176 } 177 public void setPropagationMode (int propMode) 178 { 179 if (propMode > 3 || propMode < 1) 180 throw new RuntimeException ("Value PropagationMode must be between 1 and 3"); 181 182 this.propagationMode = propMode; 183 } 184 185 187 public void onMessage(Message msg) 188 { 189 if (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || 192 this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION) 193 { 194 try 195 { 196 ObjectMessage objmsg = (ObjectMessage )msg; 197 if (!objmsg.getJMSType().equals(JMS_CACHE_INVALIDATION_BRIDGE)) return; 198 JMSCacheInvalidationMessage content = (JMSCacheInvalidationMessage)objmsg.getObject(); 199 200 if (!content.emitter.equals (this.serviceId)) 204 { 205 if(content.invalidateAllGroupName != null) 206 { 207 invalidationSubscription.invalidateAll(content.invalidateAllGroupName); 208 } 209 else 210 { 211 invalidationSubscription.batchInvalidate (content.getInvalidations ()); 212 } 213 } 214 } 215 catch (Exception ex) 216 { 217 log.warn(ex.getMessage()); 218 } 219 } 220 } 221 222 224 public void batchInvalidate (BatchInvalidation[] invalidations, boolean asynchronous) 225 { 226 if ( (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || 227 this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION) 228 && this.publishingAuthorized) 229 { 230 JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage (this.serviceId, invalidations); 231 this.sendJMSInvalidationEvent (msg); 232 } 233 } 234 235 public void invalidate (String invalidationGroupName, Serializable [] keys, boolean asynchronous) 236 { 237 if ( (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || 238 this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION) 239 && this.publishingAuthorized) 240 { 241 JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage ( 242 this.serviceId, 243 invalidationGroupName, 244 keys); 245 this.sendJMSInvalidationEvent (msg); 246 } 247 } 248 249 public void invalidate (String invalidationGroupName, Serializable key, boolean asynchronous) 250 { 251 if ( (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || 252 this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION) 253 && this.publishingAuthorized) 254 { 255 JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage ( 256 this.serviceId, 257 invalidationGroupName, 258 new Serializable [] {key} ); 259 this.sendJMSInvalidationEvent (msg); 260 } 261 } 262 263 public void invalidateAll(String groupName, boolean asynchronous) 264 { 265 if ( (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || 266 this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION) 267 && this.publishingAuthorized) 268 { 269 JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage( 270 this.serviceId, 271 groupName 272 ); 273 this.sendJMSInvalidationEvent (msg); 274 } 275 } 276 277 public void newGroupCreated (String groupInvalidationName) 278 { 279 } 282 283 public void groupIsDropped (String groupInvalidationName) 284 { 285 } 288 289 291 protected void startService () throws Exception 292 { 293 log.info("Starting JMS cache invalidation bridge"); 294 295 this.invalMgr = (org.jboss.cache.invalidation.InvalidationManagerMBean) 298 org.jboss.system.Registry.lookup (this.invalidationManagerName); 299 300 this.invalidationSubscription = invalMgr.registerBridgeListener (this); 301 302 InitialContext iniCtx = getInitialContext (); 305 306 Object tmp = iniCtx.lookup(this.connectionFactoryName); 307 TopicConnectionFactory tcf = (TopicConnectionFactory ) tmp; 308 conn = tcf.createTopicConnection(); 309 310 topic = (Topic ) iniCtx.lookup(this.topicName); 311 session = conn.createTopicSession(this.transacted, 312 this.acknowledgeMode); 313 314 conn.start(); 315 316 if (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || 319 this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION) 320 { 321 this.subscriber = session.createSubscriber(topic); 322 this.subscriber.setMessageListener(this); 323 } 324 325 if (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || 326 this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION) 327 { 328 this.pub = session.createPublisher(topic); 329 this.publishingAuthorized = true; 330 } 331 } 332 333 protected void stopService () 334 { 335 log.info ("Stoping JMS cache invalidation bridge"); 336 try 337 { 338 if (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || 339 this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION) 340 { 341 subscriber.close(); 342 } 343 344 if (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION || 345 this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION) 346 { 347 this.publishingAuthorized = false; 348 pub.close(); 349 } 350 351 conn.stop(); 352 session.close(); 353 conn.close(); 354 355 } 356 catch (Exception ex) 357 { 358 log.warn("Failed to stop JMS resources associated with the JMS bridge: ", ex); 359 } 360 } 361 362 364 366 protected synchronized TopicSession getSession() 367 { 368 return this.session; 369 } 370 371 protected synchronized TopicPublisher getPublisher() 372 { 373 return this.pub; 374 } 375 376 protected void sendJMSInvalidationEvent(JMSCacheInvalidationMessage invalidationMsg) 377 { 378 try 379 { 380 if (log.isTraceEnabled ()) 381 log.trace("sending JMS message for cache invalidation" + invalidationMsg); 382 383 try 384 { 385 ObjectMessage msg = getSession().createObjectMessage(); 386 msg.setJMSType(JMS_CACHE_INVALIDATION_BRIDGE); 387 msg.setObject(invalidationMsg); 388 getPublisher().publish(msg); 389 } 390 catch (JMSException ex) 391 { 392 log.debug("failed to publish seppuku event: ", ex); 393 } 394 } 395 catch (Exception ex) 396 { 397 log.warn("failed to do cluster seppuku event: " , ex); 398 } 399 } 400 401 protected InitialContext getInitialContext() 402 throws NamingException 403 { 404 if (providerUrl == null) 405 { 406 return new InitialContext (); 407 } 408 else 409 { 410 log.debug("Using Context.PROVIDER_URL: " + providerUrl); 411 412 java.util.Properties props = new java.util.Properties (System.getProperties()); 413 props.put(Context.PROVIDER_URL, providerUrl); 414 return new InitialContext (props); 415 } 416 } 417 418 419 421 423 } 424 | Popular Tags |