KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > invalidation > bridges > JMSCacheInvalidationBridge


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.cache.invalidation.bridges;
23
24 import java.io.Serializable JavaDoc;
25
26 import javax.jms.JMSException JavaDoc;
27 import javax.jms.Message JavaDoc;
28 import javax.jms.MessageListener JavaDoc;
29 import javax.jms.ObjectMessage JavaDoc;
30 import javax.jms.Topic JavaDoc;
31 import javax.jms.TopicConnection JavaDoc;
32 import javax.jms.TopicConnectionFactory JavaDoc;
33 import javax.jms.TopicPublisher JavaDoc;
34 import javax.jms.TopicSession JavaDoc;
35 import javax.jms.TopicSubscriber JavaDoc;
36 import javax.naming.Context JavaDoc;
37 import javax.naming.InitialContext JavaDoc;
38 import javax.naming.NamingException JavaDoc;
39
40 import org.jboss.cache.invalidation.BatchInvalidation;
41 import org.jboss.cache.invalidation.InvalidationBridgeListener;
42 import org.jboss.cache.invalidation.InvalidationManager;
43 import org.jboss.system.ServiceMBeanSupport;
44
45 /**
46  * JMS implementation of a cache invalidation bridge
47  *
48  * Based on previous code of Bill Burke based on interceptors
49  *
50  * @see org.jboss.cache.invalidation.InvalidationManagerMBean
51  *
52  * @author <a HREF="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
53  * @author <a HREF="mailto:bill@jboss.org">Bill Burke</a>.
54  * @version $Revision: 37459 $
55  *
56  * <p><b>Revisions:</b>
57  *
58  * <p><b>28 septembre 2002 Sacha Labourey:</b>
59  * <ul>
60  * <li> First implementation </li>
61  * </ul>
62  */

63
64 public class JMSCacheInvalidationBridge
65    extends ServiceMBeanSupport
66    implements JMSCacheInvalidationBridgeMBean,
67               InvalidationBridgeListener,
68               MessageListener JavaDoc
69 {
70    // Constants -----------------------------------------------------
71

72    public static final String JavaDoc JMS_CACHE_INVALIDATION_BRIDGE = "JMS_CACHE_INVALIDATION_BRIDGE";
73
74    // Attributes ----------------------------------------------------
75

76    // JMX Attributes
77
//
78
protected org.jboss.cache.invalidation.InvalidationManagerMBean invalMgr = null;
79    protected org.jboss.cache.invalidation.BridgeInvalidationSubscription invalidationSubscription = null;
80    protected String JavaDoc invalidationManagerName = InvalidationManager.DEFAULT_JMX_SERVICE_NAME;
81
82    protected boolean publishingAuthorized = false;
83    protected String JavaDoc connectionFactoryName = "java:/ConnectionFactory";
84    protected String JavaDoc topicName = "topic/JMSCacheInvalidationBridge";
85    protected boolean transacted = true;
86    protected int acknowledgeMode = TopicSession.AUTO_ACKNOWLEDGE; // AUTO_ACK by default
87
protected int propagationMode = JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION; // IN_OUT by default
88

89    protected java.rmi.dgc.VMID JavaDoc serviceId = new java.rmi.dgc.VMID JavaDoc();
90  
91    protected TopicConnection JavaDoc conn = null;
92    protected TopicSession JavaDoc session = null;
93    protected Topic JavaDoc topic = null;
94    protected TopicSubscriber JavaDoc subscriber = null;
95    protected TopicPublisher JavaDoc pub = null;
96
97    protected String JavaDoc providerUrl = null;
98
99    // Static --------------------------------------------------------
100

101    // Constructors --------------------------------------------------
102

103    public JMSCacheInvalidationBridge () { super (); }
104    
105    // Public --------------------------------------------------------
106

107    // *MBean implementation ----------------------------------------------
108

109    public String JavaDoc getInvalidationManager ()
110    {
111       return this.invalidationManagerName;
112    }
113    
114    public void setInvalidationManager (String JavaDoc objectName)
115    {
116       this.invalidationManagerName = objectName;
117    }
118    
119    public String JavaDoc getConnectionFactoryName ()
120    {
121       return this.connectionFactoryName;
122    }
123    public void setConnectionFactoryName (String JavaDoc factoryName)
124    {
125       this.connectionFactoryName = factoryName;
126    }
127    
128    public String JavaDoc getTopicName ()
129    {
130       return this.topicName;
131    }
132    public void setTopicName (String JavaDoc topicName)
133    {
134       this.topicName = topicName;
135    }
136    
137    public String JavaDoc getProviderUrl ()
138    {
139       return providerUrl;
140    }
141
142    public void setProviderUrl (String JavaDoc providerUrl)
143    {
144       this.providerUrl = providerUrl;
145    }
146
147    public boolean isTransacted ()
148    {
149       return this.transacted;
150    }
151    public void setTransacted (boolean isTransacted)
152    {
153       this.transacted = isTransacted;
154    }
155    
156    public int getAcknowledgeMode ()
157    {
158       return this.acknowledgeMode;
159    }
160    public void setAcknowledgeMode (int ackMode)
161    {
162       if (ackMode > 3 || ackMode < 1)
163          throw new RuntimeException JavaDoc ("Value AcknowledgeMode must be between 1 and 3");
164       
165       switch (ackMode)
166       {
167          case 1: this.acknowledgeMode = TopicSession.AUTO_ACKNOWLEDGE; break;
168          case 2: this.acknowledgeMode = TopicSession.CLIENT_ACKNOWLEDGE; break;
169          case 3: this.acknowledgeMode = TopicSession.DUPS_OK_ACKNOWLEDGE; break;
170       }
171    }
172    
173    public int getPropagationMode ()
174    {
175       return this.propagationMode;
176    }
177    public void setPropagationMode (int propMode)
178    {
179       if (propMode > 3 || propMode < 1)
180          throw new RuntimeException JavaDoc ("Value PropagationMode must be between 1 and 3");
181       
182       this.propagationMode = propMode;
183    }
184
185    // MessageListener implementation ----------------------------------------------
186

187    public void onMessage(Message JavaDoc msg)
188    {
189       // just to make sure we are in the good mode
190
//
191
if (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
192             this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION)
193       {
194          try
195          {
196             ObjectMessage JavaDoc objmsg = (ObjectMessage JavaDoc)msg;
197             if (!objmsg.getJMSType().equals(JMS_CACHE_INVALIDATION_BRIDGE)) return;
198             JMSCacheInvalidationMessage content = (JMSCacheInvalidationMessage)objmsg.getObject();
199             
200             // Not very efficient as the whole message must be unserialized just to check
201
// if we were the emitter. Maybe wrapping this in a byte array would be more efficient
202
//
203
if (!content.emitter.equals (this.serviceId))
204             {
205                if(content.invalidateAllGroupName != null)
206                {
207                   invalidationSubscription.invalidateAll(content.invalidateAllGroupName);
208                }
209                else
210                {
211                   invalidationSubscription.batchInvalidate (content.getInvalidations ());
212                }
213             }
214          }
215          catch (Exception JavaDoc ex)
216          {
217             log.warn(ex.getMessage());
218          }
219       }
220    }
221
222    // InvalidationBridgeListener implementation ----------------------------------------------
223

224    public void batchInvalidate (BatchInvalidation[] invalidations, boolean asynchronous)
225    {
226       if ( (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
227             this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
228             && this.publishingAuthorized)
229       {
230          JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage (this.serviceId, invalidations);
231          this.sendJMSInvalidationEvent (msg);
232       }
233    }
234
235    public void invalidate (String JavaDoc invalidationGroupName, Serializable JavaDoc[] keys, boolean asynchronous)
236    {
237       if ( (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
238             this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
239             && this.publishingAuthorized)
240       {
241          JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage (
242                   this.serviceId,
243                   invalidationGroupName,
244                   keys);
245          this.sendJMSInvalidationEvent (msg);
246       }
247    }
248
249    public void invalidate (String JavaDoc invalidationGroupName, Serializable JavaDoc key, boolean asynchronous)
250    {
251       if ( (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
252             this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
253             && this.publishingAuthorized)
254       {
255          JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage (
256                   this.serviceId,
257                   invalidationGroupName,
258                   new Serializable JavaDoc[] {key} );
259          this.sendJMSInvalidationEvent (msg);
260       }
261    }
262
263    public void invalidateAll(String JavaDoc groupName, boolean asynchronous)
264    {
265       if ( (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
266             this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
267             && this.publishingAuthorized)
268       {
269          JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage(
270                   this.serviceId,
271                   groupName
272          );
273          this.sendJMSInvalidationEvent (msg);
274       }
275    }
276    
277    public void newGroupCreated (String JavaDoc groupInvalidationName)
278    {
279       // we don't manage groups dynamically, so we don't really care...
280
//
281
}
282    
283    public void groupIsDropped (String JavaDoc groupInvalidationName)
284    {
285       // we don't manage groups dynamically, so we don't really care...
286
//
287
}
288
289    // ServiceMBeanSupport overrides ---------------------------------------------------
290

291    protected void startService () throws Exception JavaDoc
292    {
293       log.info("Starting JMS cache invalidation bridge");
294             
295       // Deal with the InvalidationManager first..
296
//
297
this.invalMgr = (org.jboss.cache.invalidation.InvalidationManagerMBean)
298          org.jboss.system.Registry.lookup (this.invalidationManagerName);
299       
300       this.invalidationSubscription = invalMgr.registerBridgeListener (this);
301
302       // deal with JMS next
303
//
304
InitialContext JavaDoc iniCtx = getInitialContext ();
305       
306       Object JavaDoc tmp = iniCtx.lookup(this.connectionFactoryName);
307       TopicConnectionFactory JavaDoc tcf = (TopicConnectionFactory JavaDoc) tmp;
308       conn = tcf.createTopicConnection();
309       
310       topic = (Topic JavaDoc) iniCtx.lookup(this.topicName);
311       session = conn.createTopicSession(this.transacted,
312                                         this.acknowledgeMode);
313       
314       conn.start();
315       
316       // Are we publisher, subscriber, or both?
317
//
318
if (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
319             this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION)
320       {
321          this.subscriber = session.createSubscriber(topic);
322          this.subscriber.setMessageListener(this);
323       }
324       
325       if (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
326             this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
327       {
328          this.pub = session.createPublisher(topic);
329          this.publishingAuthorized = true;
330       }
331    }
332    
333    protected void stopService ()
334    {
335       log.info ("Stoping JMS cache invalidation bridge");
336       try
337       {
338          if (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
339                this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION)
340          {
341             subscriber.close();
342          }
343
344          if (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
345                this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
346          {
347             this.publishingAuthorized = false;
348             pub.close();
349          }
350
351          conn.stop();
352          session.close();
353          conn.close();
354          
355       }
356       catch (Exception JavaDoc ex)
357       {
358          log.warn("Failed to stop JMS resources associated with the JMS bridge: ", ex);
359       }
360    }
361
362    // Package protected ---------------------------------------------
363

364    // Protected -----------------------------------------------------
365

366    protected synchronized TopicSession JavaDoc getSession()
367    {
368       return this.session;
369    }
370
371    protected synchronized TopicPublisher JavaDoc getPublisher()
372    {
373       return this.pub;
374    }
375
376    protected void sendJMSInvalidationEvent(JMSCacheInvalidationMessage invalidationMsg)
377    {
378       try
379       {
380          if (log.isTraceEnabled ())
381             log.trace("sending JMS message for cache invalidation" + invalidationMsg);
382          
383          try
384          {
385             ObjectMessage JavaDoc msg = getSession().createObjectMessage();
386             msg.setJMSType(JMS_CACHE_INVALIDATION_BRIDGE);
387             msg.setObject(invalidationMsg);
388             getPublisher().publish(msg);
389          }
390          catch (JMSException JavaDoc ex)
391          {
392             log.debug("failed to publish seppuku event: ", ex);
393          }
394       }
395       catch (Exception JavaDoc ex)
396       {
397          log.warn("failed to do cluster seppuku event: " , ex);
398       }
399    }
400
401    protected InitialContext JavaDoc getInitialContext()
402       throws NamingException JavaDoc
403    {
404       if (providerUrl == null)
405       {
406          return new InitialContext JavaDoc();
407       }
408       else
409       {
410          log.debug("Using Context.PROVIDER_URL: " + providerUrl);
411
412          java.util.Properties JavaDoc props = new java.util.Properties JavaDoc(System.getProperties());
413          props.put(Context.PROVIDER_URL, providerUrl);
414          return new InitialContext JavaDoc(props);
415       }
416     }
417    
418
419    // Private -------------------------------------------------------
420

421    // Inner classes -------------------------------------------------
422

423 }
424
Popular Tags