KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > web > tomcat > tc5 > session > JBossCacheManager


1 /*
2  * JBoss, the OpenSource WebOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.web.tomcat.tc5.session;
8
9 import org.apache.catalina.Session;
10 import org.apache.catalina.LifecycleException;
11 import org.apache.catalina.Context;
12 import org.apache.catalina.deploy.FilterDef;
13 import org.apache.catalina.deploy.FilterMap;
14 import org.jboss.metadata.WebMetaData;
15 import org.jboss.util.NestedRuntimeException;
16
17 import javax.transaction.TransactionManager JavaDoc;
18 import javax.transaction.HeuristicMixedException JavaDoc;
19 import javax.transaction.HeuristicRollbackException JavaDoc;
20 import javax.transaction.RollbackException JavaDoc;
21 import javax.transaction.Status JavaDoc;
22 import javax.transaction.SystemException JavaDoc;
23 import javax.transaction.Transaction JavaDoc;
24 import javax.transaction.TransactionRequiredException JavaDoc;
25 import javax.transaction.TransactionRolledbackException JavaDoc;
26
27 import javax.transaction.UserTransaction JavaDoc;
28
29 import javax.naming.NamingException JavaDoc;
30 import javax.naming.InitialContext JavaDoc;
31 import java.util.List JavaDoc;
32
33 /**
34  * Implementation of a clustered session manager for
35  * catalina using JBossCache replication.
36  *
37  * @author Ben Wang
38  * @version $Revision: 1.3.2.11 $
39  */

40 public class JBossCacheManager
41    extends JBossManager
42 {
43
44    /**
45     * Informational name for this Catalina component
46     */

47    private static final String JavaDoc info_ = "JBossCacheManager/1.0";
48
49    // -- Class attributes ---------------------------------
50
// For JNDI lookup
51
private InitialContext JavaDoc initialContext_;
52
53    /**
54     * Proxy-object for the JBossCacheService
55     */

56    private JBossCacheService proxy_;
57
58    /**
59     * If set to true, will add a JvmRouteFilter to the request.
60     */

61    protected boolean useJK_ = false;
62
63    /**
64     * The transaction manager.
65     */

66    protected TransactionManager JavaDoc tm;
67
68    public JBossCacheManager()
69    {
70    }
71
72    public void init(String JavaDoc name, WebMetaData webMetaData, boolean useJK, boolean useLocalCache)
73       throws ClusteringNotSupportedException
74    {
75       super.init(name, webMetaData, useJK, useLocalCache);
76       this.useJK_ = useJK;
77       try
78       {
79          proxy_ = (JBossCacheService) new JBossCacheService();
80       }
81       catch (Throwable JavaDoc t)
82       {
83          String JavaDoc str = "JBossCacheService to Tomcat clustering not found";
84          log_.error(str);
85          throw new ClusteringNotSupportedException(str);
86       }
87    }
88
89    public JBossCacheService getCacheService()
90    {
91       return proxy_;
92    }
93
94    // Manager-methods -------------------------------------
95
/**
96     * Start this Manager
97     *
98     * @throws org.apache.catalina.LifecycleException
99     *
100     */

101    public void start() throws LifecycleException
102    {
103       super.start();
104
105       // Obtain the transaction manager
106
try
107       {
108          tm =(TransactionManager JavaDoc)new InitialContext JavaDoc().lookup("java:/TransactionManager");
109       }
110       catch (Exception JavaDoc e)
111       {
112          log_.error("Cannot get a reference to the transaction manager", e);
113          throw new LifecycleException(e);
114       }
115
116       // Adding JvmRouteFilter if needed
117
if (useJK_)
118       {
119          boolean hasFilterCreated = false;
120          Context context = (Context) container_;
121          // Need to set this so filter instance can get hold of me later.
122
context.getServletContext().setAttribute("AbstractJBossManager", this);
123
124          String JavaDoc filterName = "JvmRouteFilter";
125          if (log_.isDebugEnabled())
126          {
127             log_.debug("start(): we are using mod_jk(2) for load-balancing. Will add " + filterName);
128          }
129
130          FilterDef def = new FilterDef();
131          def.setFilterName(filterName);
132          def.setDescription("Filter to re-package the session id with jvmroute if failing-over under mod_jk(2)");
133          def.setFilterClass(org.jboss.web.tomcat.tc5.JvmRouteFilter.class.getName());
134          // Just to make sure we don't create duplicate
135
FilterDef[] defs = context.findFilterDefs();
136          for (int i = 0; i < defs.length; i++)
137          {
138             FilterDef d = defs[i];
139             if (d.getFilterName().equals(filterName))
140             {
141                hasFilterCreated = true;
142                break;
143             }
144          }
145          if (!hasFilterCreated)
146             context.addFilterDef(def);
147
148          FilterMap map = new FilterMap();
149          map.setFilterName(filterName);
150          map.setURLPattern("/*");
151          context.addFilterMap(map);
152       }
153       // Find JBossCacheService
154
// Will need to pass the classloader that is associated with this web app so de-serialization will work correctly.
155
ClassLoader JavaDoc tcl = super.getContainer().getLoader().getClassLoader();
156       proxy_.start(tcl, this);
157       if (log_.isDebugEnabled())
158       {
159          log_.debug("start(): JBossCacheService started");
160       }
161    }
162
163    public void stop() throws LifecycleException
164    {
165       super.stop();
166       proxy_.stop();
167       tm = null;
168    }
169
170    /**
171     * Create a new session. Note this does not mean the session is active yet.
172     */

173    public Session createSession()
174    {
175        return createSession(null);
176    }
177
178        
179    /**
180     * Create a new session. Note this does not mean the session is active yet.
181     */

182    public Session createSession(String JavaDoc sessionId)
183    {
184       ClusteredSession session = null;
185       if (replicationGranularity_ == WebMetaData.REPLICATION_GRANULARITY_ATTRIBUTE)
186       {
187          session = (ClusteredSession) new AttributeBasedClusteredSession(this);
188       }
189       else if (replicationGranularity_ == WebMetaData.REPLICATION_GRANULARITY_SESSION)
190       {
191          session = (ClusteredSession) new SessionBasedClusteredSession(this);
192       }
193
194       session.setNew(true);
195       session.setCreationTime(System.currentTimeMillis());
196       session.setMaxInactiveInterval(this.maxInactiveInterval_);
197
198       if (sessionId == null)
199       {
200           sessionId = this.getNextId();
201           
202           // We are using mod_jk for load balancing. Append the JvmRoute.
203
if (useJK_)
204           {
205               if (log_.isDebugEnabled())
206               {
207                   log_.debug("createSession(): useJK is true. Will append JvmRoute: " + this.getJvmRoute());
208               }
209               sessionId += "." + this.getJvmRoute();
210           }
211       }
212
213       session.setValid(true);
214       session.setId(sessionId);
215       if (log_.isDebugEnabled())
216       {
217          log_.debug("Creating an ClusteredSession with id: " + session.getId());
218       }
219
220       createdCounter_++;
221       return session;
222    }
223
224    public boolean storeSession(Session baseSession)
225    {
226       // is this session of the right type?
227
if (!(baseSession instanceof ClusteredSession))
228       {
229          throw new IllegalArgumentException JavaDoc("You can only add ClusteredSessions to this Manager");
230       }
231
232       ClusteredSession session = (ClusteredSession) baseSession;
233       if (session == null)
234       {
235          return false;
236       }
237
238       if (session.isValid())
239       {
240          // put it in the local store as well.
241
sessions_.put(getRealId(session.getId()), session);
242
243          String JavaDoc id = session.getId();
244          long beginPassivate = System.currentTimeMillis();
245          // Notify all session attributes that they get serialized (SRV 7.7.2)
246
session.passivate();
247          long endPassivate = System.currentTimeMillis();
248          stats_.updatePassivationStats(id, (endPassivate - beginPassivate));
249
250          if (log_.isDebugEnabled())
251          {
252             log_.debug("check to see if needs to store and replicate session with id " + id);
253          }
254
255          long beginReplication = System.currentTimeMillis();
256          processSessionRepl(session);
257          long endReplication = System.currentTimeMillis();
258          stats_.updateReplicationStats(id, (endReplication - beginReplication));
259          return true;
260       }
261       else
262       {
263          return false;
264       }
265    }
266
267    public void add(Session session)
268    {
269       if (session == null)
270          return;
271
272       if (!session.isValid())
273       {
274          log_.error("Cannot add session with id=" + session.getId() + " because it is invalid");
275          return;
276       }
277
278       // maxActive_ -1 is unlimited
279
if (maxActive_ != -1 && activeCounter_ >= maxActive_)
280       {
281          // Exceeds limit. We will need to reject it.
282
rejectedCounter_++;
283          // Catalina api does not specify what heppens but we will throw a runtime exception for now.
284
throw new IllegalStateException JavaDoc("JBossCacheManager.add(): number of active sessions exceeds the maximum limit: " +
285             maxActive_ + " when trying to add session id " + session.getId());
286       }
287
288       if (storeSession((ClusteredSession) session))
289       {
290          activeCounter_++;
291          if (log_.isDebugEnabled())
292          {
293             log_.debug("Session with id=" + session.getId() + " added. Current active sessions " + activeCounter_);
294          }
295       }
296    }
297
298    public Session createEmptySession()
299    {
300       // We will simply return new ClusteredSession instanc enow.
301
ClusteredSession session = null;
302       if (replicationGranularity_ == WebMetaData.REPLICATION_GRANULARITY_ATTRIBUTE)
303       {
304          session = (ClusteredSession) new AttributeBasedClusteredSession(this);
305       }
306       else if (replicationGranularity_ == WebMetaData.REPLICATION_GRANULARITY_SESSION)
307       {
308          session = (ClusteredSession) new SessionBasedClusteredSession(this);
309       }
310
311       if (log_.isDebugEnabled())
312       {
313          log_.debug("Creating an empty ClusteredSession: " + session);
314       }
315
316       createdCounter_++;
317       return session;
318    }
319
320    public Session findSession(String JavaDoc id)
321    {
322       String JavaDoc realId = getRealId(id);
323       ClusteredSession session = findLocalSession(realId);
324       // Find it from the local store first.
325
if (session != null && !session.isOutdated() )
326       {
327          return session;
328       }
329       else
330       {
331          return loadSession(realId);
332       }
333    }
334
335    /**
336     * Return the sessions. We will find it from the in-memory local ones first. If not found, we search for the
337     * underlying store as well just to be sure.
338     * @return
339     */

340    public Session[] findSessions()
341    {
342       Session[] sessions;
343
344       // Will need to find from the underlying store
345
List JavaDoc ids = proxy_.getNewSessionsInStore();
346       if(ids.size() ==0)
347       {
348          Session[] sess = new Session[0];
349          sess = (Session[]) sessions_.values().toArray(sess);
350          return sess;
351       }
352
353       if(log_.isDebugEnabled()) {
354          log_.debug("findSessions: find ids from cache store: " + ids);
355       }
356
357
358       // Is there a better way to do this?
359
for(int i=0; i < ids.size(); i++) {
360          Session session = loadSession((String JavaDoc)ids.get(i));
361          if( session == null )
362          {
363             // This can happen now if the node has been removed before it has been called.
364

365             // Something is wrong with session. Should not have id but null session!
366
// log_.warn("Has session id: " +ids.get(i) + " but session is null. Will remove this from map");
367
// sessions_.remove(ids.get(i));
368
// proxy_.removeSession((String)ids.get(i));
369
continue;
370          }
371          sessions_.put(ids.get(i), session); // Populate local copy as well.
372
}
373
374       Session[] sess = new Session[0];
375       sess = (Session[]) sessions_.values().toArray(sess);
376       return sess;
377    }
378
379    public ClusteredSession[] findLocalSessions()
380    {
381       ClusteredSession[] sess = new ClusteredSession[0];
382
383       sess = (ClusteredSession[]) sessions_.values().toArray(sess);
384       return sess;
385    }
386
387    public ClusteredSession findLocalSession(String JavaDoc realId)
388    {
389       ClusteredSession session = (ClusteredSession) sessions_.get(realId);
390       return session;
391    }
392
393    public void remove(Session session)
394    {
395       String JavaDoc id = session.getId();
396       if (id == null) return;
397       // Let's do it in brute force.
398
if (log_.isDebugEnabled())
399       {
400          log_.debug("Removing session from store with id: " + id);
401       }
402       ((ClusteredSession) session).removeMyself();
403       sessions_.remove(getRealId(session.getId()));
404       activeCounter_--;
405    }
406
407    public void removeLocal(Session session)
408    {
409       String JavaDoc id = session.getId();
410       if (id == null) return;
411       // Let's do it in brute force.
412
if (log_.isDebugEnabled())
413       {
414          log_.debug("Removing session from local store with id: " + id);
415       }
416       ((ClusteredSession) session).removeMyselfLocal();
417       sessions_.remove(getRealId(session.getId()));
418       // It's a bit ad-hoc to do it here. But since we currently call this when session expires ...
419
expiredCounter_++;
420       activeCounter_--;
421    }
422
423    /**
424     * Load a session from the distributed store
425     *
426     * @param realId The session-id for the session to load without JvmRoute
427     * @return the session or null if the session cannot be found in the distributed store
428     */

429    protected Session loadSession(String JavaDoc realId)
430    {
431       ClusteredSession session = null;
432
433       if (realId == null)
434       {
435          return null;
436       }
437
438       // TODO We will need to determine if we want stats for loading since there will be a lot of them
439
// long begin = System.currentTimeMillis();
440
session = (ClusteredSession) proxy_.getSession(realId);
441       // Will need to initialize.
442
if (session != null)
443       {
444          session.initAfterLoad(this);
445          /* Put the session into the local map or else other calls to find
446          the session after the request completes will wipe out the dirty state
447          due to any mods made by the web app.
448          */

449          sessions_.put(getRealId(session.getId()), session);
450 // long end = System.currentTimeMillis();
451
// stats_.updateLoadStats(id, (end - begin));
452
}
453
454       if (log_.isDebugEnabled())
455       {
456          log_.debug("loadSession(): id= " + realId + ", session=" + session);
457       }
458
459       return session;
460    }
461
462    protected void processSessionRepl(ClusteredSession session)
463    {
464       boolean doTx = false;
465       try
466       {
467          // We need transaction so all the replication are sent in batch.
468
// Don't do anything if there is already transaction context associated with this thread.
469
if(tm.getTransaction() == null)
470             doTx = true;
471
472          if(doTx)
473             tm.begin();
474
475          session.processSessionRepl();
476       }
477       catch (Exception JavaDoc ex)
478       {
479          log_.error("processSessionRepl: failed with exception: " + ex);
480          try
481          {
482 // if(doTx)
483
// Let's set it no matter what.
484
tm.setRollbackOnly();
485          }
486          catch (Exception JavaDoc exn)
487          {
488             exn.printStackTrace();
489          }
490          // We will need to alert Tomcat of this exception.
491
throw new NestedRuntimeException("JBossCacheManager.processSessionRepl(): failed to replicate session.", ex);
492       }
493       finally
494       {
495          if(doTx)
496             endTransaction();
497       }
498    }
499
500    protected void endTransaction()
501    {
502       try {
503          if(tm.getTransaction().getStatus() != Status.STATUS_MARKED_ROLLBACK)
504          {
505             tm.commit();
506          } else
507          {
508             tm.rollback();
509          }
510       } catch (Exception JavaDoc e) {
511          e.printStackTrace();
512          throw new NestedRuntimeException("TreeCacheAop.endTransaction(): ", e);
513       }
514    }
515
516    /**
517     * Go through all sessions and look if they have expired. Note this overrides the method in JBossManager.
518     */

519    protected void processExpires()
520    {
521       if (log_.isTraceEnabled())
522       {
523          log_.trace("Looking for sessions that have expired ...");
524       }
525
526       // Get all sessions
527
// Does not really need tx. But just to comform with the cache usage.
528
try
529       {
530          Session sessions[] = findSessions();
531          for (int i = 0; i < sessions.length; ++i)
532          {
533             ClusteredSession session = (ClusteredSession) sessions[i];
534             if(session == null)
535             {
536                log_.warn("processExpires(): processing null session at index " +i);
537                continue;
538             }
539
540             // We only look at valid sessions. This will remove session if not valid already.
541
boolean doTx = false;
542             try
543             {
544                // We need transaction so all the replication are sent in batch.
545
// Don't do anything if there is already transaction context associated with this thread.
546
if(tm.getTransaction() == null)
547                   doTx = true;
548
549                if(doTx)
550                   tm.begin();
551
552                if (!session.isValid()) continue;
553             }
554             catch (Exception JavaDoc ex)
555             {
556                log_.error("processSessionExpire: failed with exception: " + ex);
557                try
558                {
559 // if(doTx)
560
// Let's set it no matter what.
561
tm.setRollbackOnly();
562                }
563                catch (Exception JavaDoc exn)
564                {
565                   exn.printStackTrace();
566                }
567                // We will need to alert Tomcat of this exception.
568
throw new NestedRuntimeException("JBossCacheManager.processSessionExpire(): failed to expire session.", ex);
569             }
570             finally
571             {
572                if(doTx)
573                   endTransaction();
574             }
575          }
576       }
577       catch (Exception JavaDoc ex)
578       {
579          log_.error("processExpires: failed with exception: " + ex);
580          ex.printStackTrace();
581       }
582    }
583
584    // Get the id without JvmRoute.
585
private String JavaDoc getRealId(String JavaDoc id)
586    {
587       // TODO May need optimization
588
if (!useJK_) return id;
589
590       int index = id.indexOf(".");
591       if (index > 0)
592       {
593          return id.substring(0, id.indexOf("."));
594       }
595       else
596       {
597          return id;
598       }
599    }
600 }
601
Popular Tags