KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > web > tomcat > tc6 > session > JBossCacheManager


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.web.tomcat.tc6.session;
23
24 import java.util.Collection JavaDoc;
25 import java.util.Date JavaDoc;
26 import java.util.HashMap JavaDoc;
27 import java.util.HashSet JavaDoc;
28 import java.util.Iterator JavaDoc;
29 import java.util.Map JavaDoc;
30 import java.util.Set JavaDoc;
31 import java.util.Map.Entry;
32
33 import javax.management.MBeanServer JavaDoc;
34 import javax.management.MalformedObjectNameException JavaDoc;
35 import javax.management.ObjectName JavaDoc;
36 import javax.transaction.Status JavaDoc;
37 import javax.transaction.TransactionManager JavaDoc;
38 import javax.transaction.RollbackException JavaDoc;
39
40 import org.apache.catalina.Context;
41 import org.apache.catalina.Host;
42 import org.apache.catalina.LifecycleException;
43 import org.apache.catalina.Session;
44 import org.apache.catalina.Valve;
45 import org.apache.catalina.core.ContainerBase;
46 import org.jboss.cache.CacheException;
47 import org.jboss.metadata.WebMetaData;
48 import org.jboss.mx.util.MBeanServerLocator;
49
50 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
51
52 /**
53  * Implementation of a clustered session manager for
54  * catalina using JBossCache replication.
55  *
56  * @author Ben Wang
57  * @author Brian Stansberry
58  * @author Hany Mesha
59  * @version $Revision: 58587 $
60  */

61 public class JBossCacheManager
62    extends JBossManager
63    implements JBossCacheManagerMBean
64 {
65
66    /**
67     * Informational name for this Catalina component
68     */

69    static final String JavaDoc info_ = "JBossCacheManager/1.0";
70
71    // -- Class attributes ---------------------------------
72

73    /**
74     * The transaction manager.
75     */

76    private TransactionManager JavaDoc tm;
77
78    /**
79     * Proxy-object for the JBossCacheService
80     */

81    private JBossCacheService proxy_;
82
83    /**
84     * Id/timestamp of sessions in cache that we haven't loaded
85     */

86    private Map JavaDoc unloadedSessions_ = new ConcurrentHashMap();
87
88    /** Our TreeCache's ObjectName */
89    private String JavaDoc cacheObjectNameString_ = "jboss.cache:service=TomcatClusteringCache";
90
91    /**
92     * If set to true, will add a JvmRouteFilter to the request.
93     */

94    private boolean useJK_ = false;
95
96    /** Are we running embedded in JBoss? */
97    private boolean embedded_ = false;
98
99    /** Our JMX Server */
100    private MBeanServer JavaDoc mserver_ = null;
101
102    /** Our ClusteredSessionValve's snapshot mode. */
103    private String JavaDoc snapshotMode_ = null;
104
105    /** Our ClusteredSessionValve's snapshot interval. */
106    private int snapshotInterval_ = 0;
107
108    /** String form of invalidateSessionPolicy_ */
109    private String JavaDoc replTriggerString_ = null;
110
111    /** String form of replGranularityString_ */
112    private String JavaDoc replGranularityString_ = null;
113
114    /**
115     * Whether we use batch mode replication for field level granularity.
116     * We store this in a Boolean rather than a primitive so JBossCacheCluster
117     * can determine if this was set via a <Manager> element.
118     */

119    private Boolean JavaDoc replicationFieldBatchMode_;
120    
121    /** Class loader for this web app. */
122    private ClassLoader JavaDoc tcl_;
123    
124    /**
125     * The snapshot manager we are using.
126     */

127    private SnapshotManager snapshotManager_;
128
129    // ---------------------------------------------------------- Constructors
130

131    public JBossCacheManager()
132    {
133       super();
134    }
135
136    /**
137     * Initializes this Manager when running in embedded mode.
138     * <p>
139     * <strong>NOTE:</strong> This method should not be called when
140     * running unembedded.
141     * </p>
142     */

143    public void init(String JavaDoc name, WebMetaData webMetaData,
144                     boolean useJK, boolean useLocalCache)
145       throws ClusteringNotSupportedException
146    {
147       super.init(name, webMetaData, useJK, useLocalCache);
148       this.useJK_ = useJK;
149       this.replicationFieldBatchMode_ =
150          webMetaData.getReplicationFieldBatchMode() ? Boolean.TRUE : Boolean.FALSE;
151       
152       proxy_ = new JBossCacheService(cacheObjectNameString_);
153
154       // Confirm our replication granularity is compatible with the cache
155
// Throws ISE if not
156
validateFieldMarshalling();
157
158       embedded_ = true;
159    }
160
161    // ------------------------------------------------------------- Properties
162

163    /**
164     * Gets the <code>JBossCacheService</code> through which we interact
165     * with the <code>TreeCache</code>.
166     */

167    public JBossCacheService getCacheService()
168    {
169       return proxy_;
170    }
171
172    /**
173     * Gets a String representation of the JMX <code>ObjectName</code> under
174     * which our <code>TreeCache</code> is registered.
175     */

176    public String JavaDoc getCacheObjectNameString()
177    {
178       return cacheObjectNameString_;
179    }
180
181    /**
182     * Sets the JMX <code>ObjectName</code> under which our
183     * <code>TreeCache</code> is registered.
184     */

185    public void setCacheObjectNameString(String JavaDoc treeCacheObjectName)
186    {
187       this.cacheObjectNameString_ = treeCacheObjectName;
188    }
189
190    /**
191     * Gets when sessions are replicated to the other nodes.
192     * The default value, "instant", synchronously replicates changes
193     * to the other nodes. In this case, the "SnapshotInterval" attribute
194     * is not used.
195     * The "interval" mode, in association with the "SnapshotInterval"
196     * attribute, indicates that Tomcat will only replicate modified
197     * sessions every "SnapshotInterval" miliseconds at most.
198     *
199     * @see #getSnapshotInterval()
200     */

201    public String JavaDoc getSnapshotMode()
202    {
203       return snapshotMode_;
204    }
205
206    /**
207     * Sets when sessions are replicated to the other nodes. Valid values are:
208     * <ul>
209     * <li>instant</li>
210     * <li>interval</li>
211     * </ul>
212     */

213    public void setSnapshotMode(String JavaDoc snapshotMode)
214    {
215       this.snapshotMode_ = snapshotMode;
216    }
217
218    /**
219     * Gets how often session changes should be replicated to other nodes.
220     * Only relevant if property {@link #getSnapshotMode() snapshotMode} is
221     * set to <code>interval</code>.
222     *
223     * @return the number of milliseconds between session replications.
224     */

225    public int getSnapshotInterval()
226    {
227       return snapshotInterval_;
228    }
229
230    /**
231     * Sets how often session changes should be replicated to other nodes.
232     *
233     * @param snapshotInterval the number of milliseconds between
234     * session replications.
235     */

236    public void setSnapshotInterval(int snapshotInterval)
237    {
238       this.snapshotInterval_ = snapshotInterval;
239    }
240
241    /**
242     * Gets whether the <code>Engine</code> in which we are running
243     * uses <code>mod_jk</code>.
244     */

245    public boolean getUseJK()
246    {
247       return useJK_;
248    }
249
250    /**
251     * Sets whether the <code>Engine</code> in which we are running
252     * uses <code>mod_jk</code>.
253     */

254    public void setUseJK(boolean useJK)
255    {
256       this.useJK_ = useJK;
257    }
258
259    /**
260     * Returns the replication granularity expressed as an int.
261     *
262     * @see WebMetaData#REPLICATION_GRANULARITY_ATTRIBUTE
263     * @see WebMetaData#REPLICATION_GRANULARITY_FIELD
264     * @see WebMetaData#REPLICATION_GRANULARITY_SESSION
265     */

266    public ReplicationGranularity getReplicationGranularity()
267    {
268       return replicationGranularity_;
269    }
270
271    /**
272     * Gets the granularity of session data replicated across the
273     * cluster; i.e. whether the entire session should be replicated when
274     * replication is triggered, only modified attributes, or only
275     * modified fields of attributes.
276     */

277    public String JavaDoc getReplicationGranularityString()
278    {
279       // Only lazy-set this if we are started;
280
// otherwise screws up standalone TC integration!!
281
if (started_ && this.replGranularityString_ == null)
282       {
283          this.replGranularityString_ = replicationGranularity_.toString();
284       }
285       return replGranularityString_;
286    }
287
288    /**
289     * Sets the granularity of session data replicated across the cluster.
290     * Valid values are:
291     * <ul>
292     * <li>SESSION</li>
293     * <li>ATTRIBUTE</li>
294     * <li>FIELD</li>
295     * </ul>
296     */

297    public void setReplicationGranularityString(String JavaDoc granularity)
298    {
299       this.replGranularityString_ = granularity;
300    }
301
302    /**
303     * Gets the type of operations on a <code>HttpSession</code> that
304     * trigger replication.
305     */

306    public String JavaDoc getReplicationTriggerString()
307    {
308       // Only lazy-set this if we are started;
309
// otherwise screws up standalone TC integration!!
310
if (started_ && this.replTriggerString_ == null)
311       {
312          this.replTriggerString_ = invalidateSessionPolicy_.toString();
313       }
314       return this.replTriggerString_;
315    }
316
317    /**
318     * Sets the type of operations on a <code>HttpSession</code> that
319     * trigger replication. Valid values are:
320     * <ul>
321     * <li>SET_AND_GET</li>
322     * <li>SET_AND_NON_PRIMITIVE_GET</li>
323     * <li>SET</li>
324     * </ul>
325     */

326    public void setReplicationTriggerString(String JavaDoc trigger)
327    {
328       this.replTriggerString_ = trigger;
329    }
330
331    /**
332     * Gets whether, if replication granularity is set to <code>FIELD</code>,
333     * replication should be done in batch mode. Ignored if field-level
334     * granularity is not used.
335     */

336    public Boolean JavaDoc isReplicationFieldBatchMode()
337    {
338       return replicationFieldBatchMode_;
339    }
340
341    /**
342     * Sets whether, if replication granularity is set to <code>FIELD</code>,
343     * replication should be done in batch mode. Ignored if field-level
344     * granularity is not used.
345     */

346    public void setReplicationFieldBatchMode(boolean replicationFieldBatchMode)
347    {
348       this.replicationFieldBatchMode_ = Boolean.valueOf(replicationFieldBatchMode);
349    }
350
351    public void setUseLocalCache(boolean useLocalCache)
352    {
353       this.useLocalCache_ = useLocalCache;
354    }
355
356    // JBossCacheManagerMBean-methods -------------------------------------
357

358    public void expireSession(String JavaDoc sessionId)
359    {
360       Session session = findSession(sessionId);
361       if (session != null)
362          session.expire();
363    }
364
365    public String JavaDoc getLastAccessedTime(String JavaDoc sessionId)
366    {
367       Session session = findSession(sessionId);
368       if(session == null) {
369          log_.debug("getLastAccessedTime(): Session " + sessionId +
370                     " not found");
371          return "";
372       }
373      return new Date JavaDoc(session.getLastAccessedTime()).toString();
374    }
375
376    public Object JavaDoc getSessionAttribute(String JavaDoc sessionId, String JavaDoc key)
377    {
378       ClusteredSession session = (ClusteredSession) findSession(sessionId);
379       return (session == null) ? null : session.getAttribute(key);
380    }
381
382    public String JavaDoc getSessionAttributeString(String JavaDoc sessionId, String JavaDoc key)
383    {
384       Object JavaDoc attr = getSessionAttribute(sessionId, key);
385       return (attr == null) ? null : attr.toString();
386    }
387    
388    public String JavaDoc listLocalSessionIds()
389    {
390       return reportSessionIds(sessions_.keySet());
391    }
392    
393    public String JavaDoc listSessionIds()
394    {
395       Set JavaDoc ids = new HashSet JavaDoc(sessions_.keySet());
396       ids.addAll(unloadedSessions_.keySet());
397       return reportSessionIds(ids);
398    }
399    
400    private String JavaDoc reportSessionIds(Set JavaDoc ids)
401    {
402       StringBuffer JavaDoc sb = new StringBuffer JavaDoc();
403       boolean added = false;
404       for (Iterator JavaDoc it = ids.iterator(); it.hasNext(); )
405       {
406          if (added)
407          {
408             sb.append(',');
409          }
410          else
411          {
412             added = true;
413          }
414          
415          sb.append(it.next());
416       }
417       return sb.toString();
418    }
419
420    // Manager-methods -------------------------------------
421

422    /**
423     * Start this Manager
424     *
425     * @throws org.apache.catalina.LifecycleException
426     *
427     */

428    public void start() throws LifecycleException
429    {
430       if (embedded_)
431       {
432          startEmbedded();
433       }
434       else
435       {
436          startUnembedded();
437       }
438    }
439
440    public void stop() throws LifecycleException
441    {
442       if (!started_)
443       {
444          throw new IllegalStateException JavaDoc("Manager not started");
445       }
446       
447       log_.debug("Stopping");
448       
449       resetStats();
450       
451       // Notify our interested LifecycleListeners
452
lifecycle_.fireLifecycleEvent(BEFORE_STOP_EVENT, this);
453       
454       clearSessions();
455       
456       // Don't leak the classloader
457
tcl_ = null;
458       
459       proxy_.stop();
460       tm = null;
461       
462       snapshotManager_.stop();
463       
464       started_ = false;
465       
466       // Notify our interested LifecycleListeners
467
lifecycle_.fireLifecycleEvent(AFTER_STOP_EVENT, this);
468       
469       try
470       {
471          unregisterMBeans();
472       }
473       catch (Exception JavaDoc e)
474       {
475          log_.error("Could not unregister ManagerMBean from MBeanServer", e);
476       }
477    }
478
479    /**
480     * Clear the underlying cache store and also pojo that has the observers.
481     */

482    protected void clearSessions()
483    {
484       // First, the sessions we have actively loaded
485
ClusteredSession[] sessions = findLocalSessions();
486       for(int i=0; i < sessions.length; i++)
487       {
488          ClusteredSession ses = sessions[i];
489          // JBCLUSTER-15
490
// if session passivation is enabled, passivate sessions instead of expiring them which means
491
// they'll be available to the manager for activation after a restart.
492
if (log_.isDebugEnabled())
493          {
494              log_.debug("clearSessions(): clear session by expiring or passivating: " + ses);
495          }
496          boolean notify = true;
497          boolean localCall = true;
498          boolean localOnly = true;
499          try
500          {
501             if(isPassivationEnabled() && ses.isValid())
502             {
503                
504                processSessionPassivation(ses.getRealId(), this.getContainer().getParent().getName());
505             }
506             else
507             {
508                ses.expire(notify, localCall, localOnly);
509             }
510
511          }
512          catch (Throwable JavaDoc t)
513          {
514             log_.warn("clearSessions(): Caught exception expiring or passivating session " +
515                      ses.getIdInternal(), t);
516          }
517          finally
518          {
519             // Guard against leaking memory if anything is holding a
520
// ref to the session by clearing its internal state
521
ses.recycle();
522          }
523       }
524
525       if(!isPassivationEnabled())
526       {
527 // Next, the local copy of the distributed cache
528
Map JavaDoc unloaded = new HashMap JavaDoc(unloadedSessions_);
529          Set JavaDoc keys = unloaded.keySet();
530          for (Iterator JavaDoc it = keys.iterator(); it.hasNext(); )
531          {
532             String JavaDoc realId = (String JavaDoc) it.next();
533             proxy_.removeSessionLocal(realId);
534             unloadedSessions_.remove(realId);
535          }
536       }
537    }
538
539    /**
540     * Create a new session with a generated id.
541     */

542    public Session createSession()
543    {
544       return createSession(null);
545    }
546
547    /**
548     * Create a new session.
549     *
550     * @param sessionId the id to use, or <code>null</code> if we should
551     * generate a new id
552     *
553     * @return the session
554     *
555     * @throws IllegalStateException if the current number of active sessions
556     * exceeds the maximum number allowed
557     */

558    public Session createSession(String JavaDoc sessionId)
559    {
560       if (log_.isTraceEnabled())
561       {
562          log_.trace("createSession: active sessions = " + activeCounter_ +
563                     " , session map size = " + sessions_.size() +
564                     " and max allowed sessions = " + maxActive_);
565       }
566       // We check here for maxActive instead of in add(). add() gets called
567
// when we load an already existing session from the distributed cache
568
// (e.g. in a failover) and we don't want to fail in that situation.
569

570       // JBCLUSTER-15
571
// first check if passivation is enabled and reached the max allowed sessions,
572
/// then try to expire/passivate sessions to free memory
573
if(maxActive_ != -1 && sessions_.size() >= maxActive_ && isPassivationEnabled())
574       {
575          processExpires();
576       }
577       // maxActive_ -1 is unlimited
578
if (maxActive_ != -1 && sessions_.size() >= maxActive_)
579       {
580          // Exceeds limit. We need to reject it.
581
rejectedCounter_++;
582          // Catalina api does not specify what happens
583
// but we will throw a runtime exception for now.
584
String JavaDoc msgEnd = (sessionId == null) ? "" : " id " + sessionId;
585          throw new IllegalStateException JavaDoc("JBossCacheManager.createSession(): number of " +
586                 "active sessions exceeds the maximum limit: " +
587                 maxActive_ + " when trying to create session" + msgEnd);
588       }
589
590       ClusteredSession session = createEmptyClusteredSession();
591
592       session.setNew(true);
593       session.setCreationTime(System.currentTimeMillis());
594       session.setMaxInactiveInterval(this.maxInactiveInterval_);
595       session.setValid(true);
596
597       if (sessionId == null)
598       {
599           sessionId = this.getNextId();
600
601           // We are using mod_jk for load balancing. Append the JvmRoute.
602
if (useJK_)
603           {
604               if (log_.isDebugEnabled())
605               {
606                   log_.debug("createSession(): useJK is true. Will append JvmRoute: " + this.getJvmRoute());
607               }
608               sessionId += "." + this.getJvmRoute();
609           }
610       }
611
612       session.setId(sessionId); // Setting the id leads to a call to add()
613

614       if (log_.isDebugEnabled())
615       {
616          log_.debug("Created a ClusteredSession with id: " + sessionId);
617       }
618
619       createdCounter_++;
620       // JBCLUSTER-15 - if we have created a session it must be handled by this manager
621
// therefore we must increment the active counter
622
activeCounter_++;
623       
624       // Add this session to the set of those potentially needing replication
625
SessionReplicationContext.bindSession(session, snapshotManager_);
626       
627       return session;
628    }
629
630    public boolean storeSession(Session baseSession)
631    {
632       boolean stored = false;
633       if(baseSession != null && started_)
634       {
635          ClusteredSession session = (ClusteredSession) baseSession;
636
637          synchronized (session)
638          {
639             if (log_.isTraceEnabled())
640             {
641                log_.trace("check to see if needs to store and replicate " +
642                           "session with id " + session.getIdInternal());
643             }
644
645             if (session.isValid() &&
646                   (session.isSessionDirty() || session.getExceedsMaxUnreplicatedInterval()))
647             {
648                String JavaDoc realId = session.getRealId();
649
650                // Notify all session attributes that they get serialized (SRV 7.7.2)
651
long begin = System.currentTimeMillis();
652                session.passivate();
653                long elapsed = System.currentTimeMillis() - begin;
654                stats_.updatePassivationStats(realId, elapsed);
655
656                // Do the actual replication
657
begin = System.currentTimeMillis();
658                processSessionRepl(session);
659                elapsed = System.currentTimeMillis() - begin;
660                stored = true;
661                stats_.updateReplicationStats(realId, elapsed);
662             }
663             else if (log_.isTraceEnabled())
664             {
665                log_.trace("Session " + session.getIdInternal() +
666                           " did not require replication.");
667             }
668          }
669       }
670
671       return stored;
672    }
673
674    public void add(Session session)
675    {
676       if (session == null)
677          return;
678
679       if (!(session instanceof ClusteredSession))
680       {
681          throw new IllegalArgumentException JavaDoc("You can only add instances of " +
682                "type ClusteredSession to this Manager. Session class name: " +
683                session.getClass().getName());
684       }
685
686 // add((ClusteredSession) session, true);
687
add((ClusteredSession) session, false);
688    }
689
690    /**
691     * Adds the given session to the collection of those being managed by this
692     * Manager.
693     *
694     * @param session the session. Cannot be <code>null</code>.
695     * @param replicate whether the session should be replicated
696     *
697     * @throws NullPointerException if <code>session</code> is <code>null</code>.
698     */

699    private void add(ClusteredSession session, boolean replicate)
700    {
701       if (!session.isValid())
702       {
703          log_.error("Cannot add session with id=" + session.getIdInternal() +
704                     " because it is invalid");
705          return;
706       }
707
708       String JavaDoc realId = session.getRealId();
709       Object JavaDoc existing = sessions_.put(realId, session);
710       unloadedSessions_.remove(realId);
711
712       if (!session.equals(existing))
713       {
714          if (replicate)
715          {
716             storeSession(session);
717          }
718
719          activeCounter_++;
720          if (activeCounter_ > maxActiveCounter_)
721             maxActiveCounter_++;
722          
723          if (log_.isDebugEnabled())
724          {
725             log_.debug("Session with id=" + session.getIdInternal() + " added. " +
726                        "Current active sessions " + activeCounter_);
727          }
728       }
729    }
730
731    // Satisfy the Manager interface. Internally we use
732
// createEmptyClusteredSession to avoid a cast
733
public Session createEmptySession()
734    {
735       return createEmptyClusteredSession();
736    }
737
738    private ClusteredSession createEmptyClusteredSession()
739    {
740       log_.debug("Creating an empty ClusteredSession");
741
742       ClusteredSession session = null;
743       switch (replicationGranularity_)
744       {
745          case ATTRIBUTE:
746          session = new AttributeBasedClusteredSession(this);
747             break;
748          case FIELD:
749             session = new FieldBasedClusteredSession(this);
750             break;
751          default:
752             session = new SessionBasedClusteredSession(this);
753             break;
754       }
755       return session;
756    }
757
758    /**
759     * Attempts to find the session in the collection of those being managed
760     * locally, and if not found there, in the distributed cache of sessions.
761     * <p>
762     * If a session is found in the distributed cache, it is added to the
763     * collection of those being managed locally.
764     * </p>
765     *
766     * @param id the session id, which may include an appended jvmRoute
767     *
768     * @return the session, or <code>null</code> if no such session could
769     * be found
770     */

771    public Session findSession(String JavaDoc id)
772    {
773       String JavaDoc realId = getRealId(id);
774       // Find it from the local store first
775
ClusteredSession session = findLocalSession(realId);
776       
777       // If we didn't find it locally, only check the distributed cache
778
// if we haven't previously handled this session id on this request.
779
// If we handled it previously but it's no longer local, that means
780
// it's been invalidated. If we request an invalidated session from
781
// the distributed cache, it will be missing from the local cache but
782
// may still exist on other nodes (i.e. if the invalidation hasn't
783
// replicated yet because we are running in a tx). With buddy replication,
784
// asking the local cache for the session will cause the out-of-date
785
// session from the other nodes to be gravitated, thus resuscitating
786
// the session.
787
if (session == null
788             && !SessionReplicationContext.isSessionBoundAndExpired(realId, snapshotManager_))
789       {
790          if (log_.isTraceEnabled())
791             log_.trace("Checking for session " + realId + " in the distributed cache");
792          
793          session = loadSession(realId);
794          if (session != null)
795          {
796             add(session);
797             // TODO should we advise of a new session?
798
//tellNew();
799
}
800       }
801       else if (session != null && session.isOutdated())
802       {
803          if (log_.isTraceEnabled())
804             log_.trace("Updating session " + realId + " from the distributed cache");
805          
806          // Need to update it from the cache
807
loadSession(realId);
808       }
809
810       if (session != null)
811       {
812          // Add this session to the set of those potentially needing replication
813
SessionReplicationContext.bindSession(session, snapshotManager_);
814       }
815
816       return session;
817    }
818
819    /**
820     * Return the sessions. Note that this will return not only the local
821     * in-memory sessions, but also any sessions that are in the distributed
822     * cache but have not previously been accessed on this server. Invoking
823     * this method will bring all such sessions into local memory and can
824     * potentially be quite expensive.
825     *
826     * <p>
827     * Note also that when sessions are loaded from the distributed cache, no
828     * check is made as to whether the number of local sessions will thereafter
829     * exceed the maximum number allowed on this server.
830     * </p>
831     *
832     * @return an array of all the sessions
833     */

834    public Session[] findSessions()
835    {
836       // Need to load all the unloaded sessions
837
if(unloadedSessions_.size() > 0)
838       {
839          // Make a thread-safe copy of the new id list to work with
840
Set JavaDoc ids = new HashSet JavaDoc(unloadedSessions_.keySet());
841
842          if(log_.isDebugEnabled()) {
843             log_.debug("findSessions: loading sessions from distributed cache: " + ids);
844          }
845
846          for(Iterator JavaDoc it = ids.iterator(); it.hasNext();) {
847             loadSession((String JavaDoc) it.next());
848          }
849       }
850
851       // All sessions are now "local" so just return the local sessions
852
return findLocalSessions();
853    }
854
855    /**
856     * Returns all the sessions that are being actively managed by this manager.
857     * This includes those that were created on this server, those that were
858     * brought into local management by a call to
859     * {@link #findLocalSession(String)} as well as all sessions brought into
860     * local management by a call to {@link #findSessions()}.
861     */

862    public ClusteredSession[] findLocalSessions()
863    {
864       Collection JavaDoc coll = sessions_.values();
865       ClusteredSession[] sess = new ClusteredSession[coll.size()];
866       sess = (ClusteredSession[]) coll.toArray(sess);
867       return sess;
868    }
869
870    /**
871     * Returns the given session if it is being actively managed by this manager.
872     * An actively managed session is on that was either created on this server,
873     * brought into local management by a call to
874     * {@link #findLocalSession(String)} or brought into local management by a
875     * call to {@link #findSessions()}.
876     *
877     * @param realId the session id, with any trailing jvmRoute removed.
878     *
879     * @see #getRealId(String)
880     */

881    public ClusteredSession findLocalSession(String JavaDoc realId)
882    {
883       return (ClusteredSession) sessions_.get(realId);
884    }
885
886    /**
887     * Removes the session from this Manager's collection of actively managed
888     * sessions. Also removes the session from the distributed cache, both
889     * on this server and on all other server to which this one replicates.
890     */

891    public void remove(Session session)
892    {
893       ClusteredSession clusterSess = (ClusteredSession) session;
894       synchronized (clusterSess)
895       {
896          String JavaDoc realId = clusterSess.getRealId();
897          if (realId == null)
898             return;
899
900          if (log_.isDebugEnabled())
901          {
902             log_.debug("Removing session from store with id: " + realId);
903          }
904
905          try {
906             // Ignore any cache notifications that our own work generates
907
SessionReplicationContext.startCacheActivity();
908             clusterSess.removeMyself();
909          }
910          finally {
911             SessionReplicationContext.finishCacheActivity();
912             
913             // We don't want to replicate this session at the end
914
// of the request; the removal process took care of that
915
SessionReplicationContext.sessionExpired(clusterSess, realId, snapshotManager_);
916             
917             sessions_.remove(realId);
918             stats_.removeStats(realId);
919             activeCounter_--;
920          }
921       }
922    }
923
924    /**
925     * Removes the session from this Manager's collection of actively managed
926     * sessions. Also removes the session from this server's copy of the
927     * distributed cache (but does not remove it from other servers'
928     * distributed cache).
929     */

930    public void removeLocal(Session session)
931    {
932       ClusteredSession clusterSess = (ClusteredSession) session;
933       synchronized (clusterSess)
934       {
935          String JavaDoc realId = clusterSess.getRealId();
936          if (realId == null) return;
937
938          if (log_.isDebugEnabled())
939          {
940             log_.debug("Removing session from local store with id: " + realId);
941          }
942
943          try {
944             // Ignore any cache notifications that our own work generates
945
SessionReplicationContext.startCacheActivity();
946             clusterSess.removeMyselfLocal();
947          }
948          finally
949          {
950             SessionReplicationContext.finishCacheActivity();
951             
952             // We don't want to replicate this session at the end
953
// of the request; the removal process took care of that
954
SessionReplicationContext.sessionExpired(clusterSess, realId, snapshotManager_);
955             
956             sessions_.remove(realId);
957             stats_.removeStats(realId);
958
959             // Update counters.
960
// It's a bit ad-hoc to do it here. But since we currently call
961
// this when session expires ...
962
expiredCounter_++;
963             activeCounter_--;
964          }
965       }
966    }
967
968    /**
969     * Loads a session from the distributed store. If an existing session with
970     * the id is already under local management, that session's internal state
971     * will be updated from the distributed store. Otherwise a new session
972     * will be created and added to the collection of those sessions under
973     * local management.
974     *
975     * @param realId id of the session-id with any jvmRoute removed
976     *
977     * @return the session or <code>null</code> if the session cannot be found
978     * in the distributed store
979     */

980    protected ClusteredSession loadSession(String JavaDoc realId)
981    {
982       if (realId == null)
983       {
984          return null;
985       }
986
987       long begin = System.currentTimeMillis();
988       boolean mustAdd = false;
989       ClusteredSession session = (ClusteredSession) sessions_.get(realId);
990       
991       if (session == null)
992       {
993          // JBCLUSTER-15
994
// We need to check for maxActive first before attempting to create a new session
995
if (log_.isTraceEnabled())
996          {
997             log_.trace("createSession: active sessions = " + activeCounter_ +
998                        " , session map size = " + sessions_.size() +
999                        " and max allowed sessions = " + maxActive_);
1000         }
1001         // first check if passivation is enabled and reached the max allowed sessions,
1002
/// then try to expire/passivate sessions to free memory
1003
if(maxActive_ != -1 && sessions_.size() >= maxActive_ && isPassivationEnabled())
1004         {
1005            processExpires();
1006         }
1007         // maxActive_ -1 is unlimited
1008
if (maxActive_ != -1 && sessions_.size() >= maxActive_)
1009         {
1010            // Exceeds limit. We need to reject it.
1011
rejectedCounter_++;
1012            // Catalina api does not specify what happens
1013
// but we will throw a runtime exception for now.
1014
String JavaDoc msgEnd = (realId == null) ? "" : " id " + realId;
1015            throw new IllegalStateException JavaDoc("JBossCacheManager.createSession(): number of " +
1016                   "active sessions exceeds the maximum limit: " +
1017                   maxActive_ + " when trying to load session" + msgEnd);
1018         }
1019         
1020         // This is either the first time we've seen this session on this
1021
// server, or we previously expired it and have since gotten
1022
// a replication message from another server
1023
mustAdd = true;
1024         session = createEmptyClusteredSession();
1025      }
1026
1027      synchronized (session)
1028      {
1029         boolean doTx = false;
1030         try
1031         {
1032            // We need transaction so any data gravitation replication
1033
// is sent in batch.
1034
// Don't do anything if there is already transaction context
1035
// associated with this thread.
1036
if(tm.getTransaction() == null)
1037               doTx = true;
1038
1039            if(doTx)
1040               tm.begin();
1041            
1042            // Ignore cache notifications we may generate for this
1043
// session if data gravitation occurs.
1044
SessionReplicationContext.startCacheActivity();
1045            
1046            session = proxy_.loadSession(realId, session);
1047         }
1048         catch (Exception JavaDoc ex)
1049         {
1050            try
1051            {
1052// if(doTx)
1053
// Let's set it no matter what.
1054
tm.setRollbackOnly();
1055            }
1056            catch (Exception JavaDoc exn)
1057            {
1058               log_.error("Caught exception rolling back transaction", exn);
1059            }
1060            // We will need to alert Tomcat of this exception.
1061
if (ex instanceof RuntimeException JavaDoc)
1062               throw (RuntimeException JavaDoc) ex;
1063            
1064            throw new RuntimeException JavaDoc("loadSession(): failed to load session " +
1065                                       realId, ex);
1066         }
1067         finally
1068         {
1069            try {
1070               if(doTx)
1071                  endTransaction(realId);
1072            }
1073            finally {
1074               SessionReplicationContext.finishCacheActivity();
1075            }
1076         }
1077
1078         if (session != null)
1079         {
1080            // Need to initialize.
1081
session.initAfterLoad(this);
1082            if (mustAdd)
1083               add(session, false); // don't replicate
1084
long elapsed = System.currentTimeMillis() - begin;
1085            stats_.updateLoadStats(realId, elapsed);
1086
1087            if (log_.isDebugEnabled())
1088            {
1089               log_.debug("loadSession(): id= " + realId + ", session=" + session);
1090            }
1091         }
1092         else if (log_.isDebugEnabled())
1093         {
1094            log_.debug("loadSession(): session " + realId +
1095                       " not found in distributed cache");
1096         }
1097      }
1098
1099      return session;
1100   }
1101
1102   /**
1103    * Places the current session contents in the distributed cache and
1104    * replicates them to the cluster
1105    *
1106    * @param session the session. Cannot be <code>null</code>.
1107    */

1108   protected void processSessionRepl(ClusteredSession session)
1109   {
1110      // If we are using SESSION granularity, we don't want to initiate a TX
1111
// for a single put
1112
boolean notSession = (replicationGranularity_ != ReplicationGranularity.SESSION);
1113      boolean doTx = false;
1114      try
1115      {
1116         // We need transaction so all the replication are sent in batch.
1117
// Don't do anything if there is already transaction context
1118
// associated with this thread.
1119
if(notSession && tm.getTransaction() == null)
1120            doTx = true;
1121
1122         if(doTx)
1123            tm.begin();
1124
1125         // Tell the proxy to ignore cache notifications we are about
1126
// to generate for this session. We have to do this
1127
// at this level because we don't want to resume handling
1128
// notifications until any compensating changes resulting
1129
// from a tx rollback are done.
1130
SessionReplicationContext.startCacheActivity();
1131
1132         session.processSessionRepl();
1133      }
1134      catch (Exception JavaDoc ex)
1135      {
1136         log_.debug("processSessionRepl(): failed with exception", ex);
1137         
1138         try
1139         {
1140            //if(doTx)
1141
// Let's setRollbackOnly no matter what.
1142
// (except if there's no tx due to SESSION (JBAS-3840))
1143
if (notSession)
1144               tm.setRollbackOnly();
1145         }
1146         catch (Exception JavaDoc exn)
1147         {
1148            log_.error("Caught exception rolling back transaction", exn);
1149         }
1150         
1151         // We will need to alert Tomcat of this exception.
1152
if (ex instanceof RuntimeException JavaDoc)
1153            throw (RuntimeException JavaDoc) ex;
1154         
1155         throw new RuntimeException JavaDoc("JBossCacheManager.processSessionRepl(): " +
1156                                    "failed to replicate session.", ex);
1157      }
1158      finally
1159      {
1160         try {
1161            if(doTx)
1162               endTransaction(session.getId());
1163         }
1164         finally {
1165            SessionReplicationContext.finishCacheActivity();
1166         }
1167      }
1168   }
1169
1170   protected void endTransaction(String JavaDoc id)
1171   {
1172      if (tm == null)
1173      {
1174         log_.warn("JBossCacheManager.endTransaction(): tm is null for id: " +id);
1175         return;
1176      }
1177
1178
1179      try
1180      {
1181         if(tm.getTransaction().getStatus() != Status.STATUS_MARKED_ROLLBACK)
1182         {
1183            tm.commit();
1184         }
1185         else
1186         {
1187            log_.info("JBossCacheManager.endTransaction(): rolling back tx for id: " +id);
1188            tm.rollback();
1189         }
1190      }
1191      catch (RollbackException JavaDoc re)
1192      {
1193         // Do nothing here since cache may rollback automatically.
1194
log_.warn("JBossCacheManager.endTransaction(): rolling back transaction with exception: " +re);
1195      }
1196      catch (Exception JavaDoc e)
1197      {
1198         throw new RuntimeException JavaDoc("JBossCacheManager.endTransaction(): Exception for id: " +id, e);
1199      }
1200   }
1201   
1202   /**
1203    * Gets the classloader of the webapp we are managing.
1204    */

1205   protected ClassLoader JavaDoc getWebappClassLoader()
1206   {
1207      return tcl_;
1208   }
1209
1210   /**
1211    * Goes through all sessions and look if they have expired.
1212    * Note this overrides the method in JBossManager.
1213    */

1214   protected void processExpires()
1215   {
1216      if (maxInactiveInterval_ < 0)
1217      {
1218         return;
1219      }
1220
1221      if (log_.isTraceEnabled())
1222      {
1223         log_.trace("processExpires():max active sessions = " + maxActive_);
1224         log_.trace("processExpires(): passivation mode = " + isPassivationEnabled());
1225         log_.trace("processExpires(): Looking for sessions that have expired ...");
1226      }
1227      try
1228      {
1229         // First, handle the sessions we are actively managing
1230
Session sessions[] = findLocalSessions();
1231         for (int i = 0; i < sessions.length; ++i)
1232         {
1233            try
1234            {
1235               ClusteredSession session = (ClusteredSession) sessions[i];
1236               if(session == null)
1237               {
1238                  log_.warn("processExpires(): processing null session at index " +i);
1239                  continue;
1240               }
1241
1242               // JBAS-2403. Check for outdated sessions where we think
1243
// the local copy has timed out. If found, refresh the
1244
// session from the cache in case that might change the timeout
1245
if (session.isOutdated() && !(session.isValid(false)))
1246               {
1247                  // JBAS-2792 don't assign the result of loadSession to session
1248
// just update the object from the cache or fall through if
1249
// the session has been removed from the cache
1250
loadSession(session.getRealId());
1251               }
1252
1253               // Do a normal invalidation check that will expire any
1254
// sessions that have timed out
1255
// DON'T SYNCHRONIZE on session here -- isValid() and
1256
// expire() are meant to be multi-threaded and synchronize
1257
// properly internally; synchronizing externally can lead
1258
// to deadlocks!!
1259
if (!session.isValid()) continue;
1260               
1261               // JBCLUSTER-15
1262
if (log_.isTraceEnabled())
1263               {
1264                  log_.trace("processExpires(): Checking passivation for session " + session.getId());
1265               }
1266               // now that we have valid session, see if we need to
1267
// passivate it based on the configurable passivation min and max Idle time
1268
if (isPassivationEnabled())
1269               {
1270                  long timeNow = System.currentTimeMillis();
1271                  int timeIdle = (int) ((timeNow - session.getLastAccessedTimeInternal()) / 1000L);
1272                  // if maxIdle time configured, means that we need to passivate sessions that have
1273
// exceeded the max allowed idle time
1274
if (passivationMaxIdleTime_ >= 0 && timeIdle > passivationMaxIdleTime_)
1275                  {
1276                     if(log_.isTraceEnabled())
1277                     {
1278                        log_.trace("JBossCacheManager.processExpires() passivating session " + session.getRealId());
1279                     }
1280                     processSessionPassivation(session.getRealId(), this.getContainer().getParent().getName());
1281                  }
1282                  // If the session didn't exceed the passivationMaxIdleTime_, See
1283
// if the number of sessions managed by this manager greater than the max allowed
1284
// active sessions, passivate the session if it exceed passivationMinIdleTime_
1285
else if (maxActive_ > 0 && passivationMinIdleTime_ > 0 && sessions_.size()> maxActive_)
1286                  {
1287                     if(timeIdle > passivationMinIdleTime_)
1288                     {
1289                        if(log_.isTraceEnabled())
1290                        {
1291                           log_.debug("JBossCacheManager.processExpires() passivating session " + session.getRealId());
1292                        }
1293                        processSessionPassivation(session.getRealId(), this.getContainer().getParent().getName());
1294                     }
1295                  }
1296               }
1297               
1298            }
1299            catch (Exception JavaDoc ex)
1300            {
1301               log_.error("processExpires(): failed expiring " +
1302                          sessions[i].getIdInternal() + " with exception: " +
1303                          ex, ex);
1304            }
1305         }
1306
1307         // Next, handle any unloaded sessions that are stale
1308

1309         long now = System.currentTimeMillis();
1310         Map JavaDoc unloaded = new HashMap JavaDoc(unloadedSessions_);
1311         Set JavaDoc entries = unloaded.entrySet();
1312         for (Iterator JavaDoc it = entries.iterator(); it.hasNext(); )
1313         {
1314            Map.Entry JavaDoc entry = (Map.Entry JavaDoc) it.next();
1315            OwnedSessionUpdate osu = (OwnedSessionUpdate) entry.getValue();
1316            int elapsed = (int) ((now - osu.updateTime) / 1000L);
1317            if (elapsed >= maxInactiveInterval_)
1318            {
1319               String JavaDoc realId = (String JavaDoc) entry.getKey();
1320               try
1321               {
1322                  proxy_.removeSessionLocal(realId, osu.owner);
1323                  unloadedSessions_.remove(realId);
1324               }
1325                              
1326               // JBClUSTER-15
1327
// we don't need to worry about session passivation here, since the
1328
// method processSessionPassivation() takes care of unloadedSessions_ map
1329
// when it receives a notification of passivation event happened in the
1330
// distributed store from the CacheListener
1331
catch (Exception JavaDoc ex)
1332               {
1333                  log_.error("processExpire(): failed removing unloaded session " +
1334                          realId + " with exception: " +
1335                          ex, ex);
1336               }
1337            }
1338         }
1339      }
1340      catch (Exception JavaDoc ex)
1341      {
1342         log_.error("processExpires: failed with exception: " + ex, ex);
1343      }
1344   }
1345   
1346   public void processRemoteAttributeRemoval(String JavaDoc realId, String JavaDoc attrKey)
1347   {
1348      
1349      ClusteredSession session = findLocalSession(realId);
1350      if (session != null)
1351      {
1352         boolean localCall = false; // call is due to remote event
1353
boolean localOnly = true; // don't call back into cache
1354
boolean notify = false; // SRV.10.7 gives us leeway
1355
// not to notify listeners,
1356
// which is safer
1357

1358         // Ensure the correct TCL is in place
1359
ClassLoader JavaDoc prevTcl = Thread.currentThread().getContextClassLoader();
1360         try
1361         {
1362            Thread.currentThread().setContextClassLoader(tcl_);
1363            synchronized (session)
1364            {
1365               session.removeAttributeInternal(attrKey, localCall, localOnly, notify);
1366            }
1367            if (log_.isTraceEnabled())
1368               log_.trace("processRemoteAttributeRemoval: removed attribute " +
1369                          attrKey + " from " + realId);
1370         }
1371         finally
1372         {
1373            Thread.currentThread().setContextClassLoader(prevTcl);
1374         }
1375      }
1376   }
1377
1378   public void processRemoteInvalidation(String JavaDoc realId)
1379   {
1380      // Remove the session from our local map
1381
ClusteredSession session = (ClusteredSession) sessions_.remove(realId);
1382      if (session == null)
1383      {
1384         // We weren't managing the session anyway. But remove it
1385
// from the list of cached sessions we haven't loaded
1386
if (unloadedSessions_.remove(realId) != null)
1387         {
1388            if (log_.isTraceEnabled())
1389               log_.trace("Removed entry for session " + realId + " from unloaded session map");
1390         }
1391      }
1392      else
1393      {
1394         // Expire the session
1395
// DON'T SYNCHRONIZE ON SESSION HERE -- isValid() and
1396
// expire() are meant to be multi-threaded and synchronize
1397
// properly internally; synchronizing externally can lead
1398
// to deadlocks!!
1399
boolean notify = false; // Don't notify listeners. SRV.10.7
1400
// allows this, and sending notifications
1401
// leads to all sorts of issues; e.g.
1402
// circular calls with ClusteredSSO
1403
boolean localCall = false; // this call originated from the cache;
1404
// we have already removed session
1405
boolean localOnly = true; // Don't pass attr removals to cache
1406

1407         // Ensure the correct TCL is in place
1408
ClassLoader JavaDoc prevTcl = Thread.currentThread().getContextClassLoader();
1409         try
1410         {
1411            Thread.currentThread().setContextClassLoader(tcl_);
1412            session.expire(notify, localCall, localOnly);
1413         }
1414         finally
1415         {
1416            Thread.currentThread().setContextClassLoader(prevTcl);
1417         }
1418
1419         // Remove any stats for this session
1420
stats_.removeStats(realId);
1421         
1422         // Update counter.
1423
activeCounter_--;
1424      }
1425   }
1426   
1427   public void processSessionPassivation(String JavaDoc realId, String JavaDoc dataOwner)
1428   {
1429      // get the session from the local map
1430
ClusteredSession session = findLocalSession(realId);
1431      // only remove actively managed session and add to the unloaded sessions
1432
// if it's already unloaded session (session == null) don't do anything,
1433
// the evict notification will tell the server that has the session to remove it.
1434
if (session != null)
1435      {
1436         synchronized (session)
1437         {
1438            if (log_.isTraceEnabled())
1439            {
1440               log_.trace("Passivating session with id: " + realId);
1441            }
1442
1443            try {
1444               // Tell the proxy to ignore cache notifications we are about
1445
// to generate for this session.
1446
SessionReplicationContext.startCacheActivity();
1447               session.passivate();
1448               proxy_.evictSession(realId);
1449            }
1450            finally {
1451               SessionReplicationContext.finishCacheActivity();
1452            }
1453
1454            Object JavaDoc obj = unloadedSessions_.put(realId,
1455                  new OwnedSessionUpdate(dataOwner, session.getLastAccessedTime()));
1456            if (log_.isTraceEnabled())
1457            {
1458               if (obj == null)
1459               {
1460                  log_.trace("New session " + realId + " added to unloaded session map");
1461               }
1462               else
1463               {
1464                  log_.trace("Updated timestamp for unloaded session " + realId);
1465               }
1466            }
1467            sessions_.remove(realId);
1468            stats_.removeStats(realId);
1469         }
1470         activeCounter_--;
1471      }
1472   }
1473
1474   /**
1475    * Gets the session id with any jvmRoute removed.
1476    *
1477    * @param id a session id with or without an appended jvmRoute.
1478    * Cannot be <code>null</code>.
1479    */

1480   protected String JavaDoc getRealId(String JavaDoc id)
1481   {
1482      return (useJK_ ? Util.getRealId(id) : id);
1483   }
1484   
1485   /**
1486    * Callback from the CacheListener to notify us that a session
1487    * we haven't loaded has been changed.
1488    *
1489    * @param realId the session id, without any trailing jvmRoute
1490    * @param dataOwner the owner of the session. Can be <code>null</code> if
1491    * the owner is unknown.
1492    */

1493   protected void unloadedSessionChanged(String JavaDoc realId, String JavaDoc dataOwner)
1494   {
1495      Object JavaDoc obj = unloadedSessions_.put(realId,
1496            new OwnedSessionUpdate(dataOwner, System.currentTimeMillis()));
1497      if (log_.isTraceEnabled())
1498      {
1499         if (obj == null)
1500         {
1501            log_.trace("New session " + realId + " added to unloaded session map");
1502         }
1503         else
1504         {
1505            log_.trace("Updated timestamp for unloaded session " + realId);
1506         }
1507      }
1508   }
1509   
1510   /**
1511    * Returns true if the passivation mode is set to true in JBoss-web.xml and JBoss Cache passivation
1512    * has been enabled with proper configured cache loader. Otherwise, it returns false
1513    *
1514    * @return
1515    */

1516   protected boolean isPassivationEnabled()
1517   {
1518      return (passivationMode_ && proxy_.isCachePassivationEnabled());
1519   }
1520   // ---------------------------------------------------- Lifecyle Unembedded
1521

1522   /**
1523    * Start this Manager when running embedded in JBoss AS.
1524    *
1525    * @throws org.apache.catalina.LifecycleException
1526    */

1527   private void startEmbedded() throws LifecycleException
1528   {
1529      super.start();
1530      
1531      // Start the JBossCacheService
1532
// Will need to pass the classloader that is associated with this
1533
//web app so de-serialization will work correctly.
1534
tcl_ = super.getContainer().getLoader().getClassLoader();
1535
1536      proxy_.start(tcl_, this);
1537
1538      tm = proxy_.getTransactionManager();
1539      if(tm == null)
1540      {
1541         throw new LifecycleException("JBossCacheManager.start(): Obtain null tm");
1542      }
1543      
1544      try
1545      {
1546         initializeUnloadedSessions();
1547         
1548         // Setup our SnapshotManager
1549
initSnapshotManager();
1550         
1551         // Add SnapshotValve and, if needed, JvmRouteValve and batch repl valve
1552
installValves();
1553
1554         log_.debug("start(): JBossCacheService started");
1555      }
1556      catch (Exception JavaDoc e)
1557      {
1558         log_.error("Unable to start manager.", e);
1559         throw new LifecycleException(e);
1560      }
1561   }
1562   
1563   // ----------------------------------------------- Lifecyle When Unembedded
1564

1565   /**
1566    * Start this Manager when running in standalone Tomcat.
1567    */

1568   private void startUnembedded() throws LifecycleException
1569   {
1570      if (started_)
1571      {
1572         return;
1573      }
1574      
1575      log_.info("Manager is about to start");
1576
1577      // Notify our interested LifecycleListeners
1578
lifecycle_.fireLifecycleEvent(BEFORE_START_EVENT, this);
1579      
1580      if (snapshotMode_ == null)
1581      {
1582         // We were not instantiated by a JBossCacheCluster, so we need to
1583
// find one and let it configure our cluster-wide properties
1584
try
1585         {
1586            JBossCacheCluster cluster = (JBossCacheCluster) container_.getCluster();
1587            cluster.configureManager(this);
1588         }
1589         catch (ClassCastException JavaDoc e)
1590         {
1591            String JavaDoc msg = "Cluster is not an instance of JBossCacheCluster";
1592            log_.error(msg, e);
1593            throw new LifecycleException(msg, e);
1594         }
1595      }
1596      
1597      // Validate attributes
1598

1599      try
1600      {
1601         this.invalidateSessionPolicy_ = InvalidateSessionPolicy.fromString(replTriggerString_);
1602      }
1603      catch (IllegalArgumentException JavaDoc iae)
1604      {
1605         throw new LifecycleException("replication-trigger value set to a " +
1606                                       "non-valid value: '" +
1607                                       replTriggerString_ +
1608                                       "' (should be ['SET_AND_GET', " +
1609                                       "'SET_AND_NON_PRIMITIVE_GET', 'SET'])");
1610      }
1611      
1612      try
1613      {
1614      }
1615      catch (IllegalArgumentException JavaDoc iae)
1616      {
1617         throw new LifecycleException("replication-granularity value set to " +
1618                                       "a non-valid value: '" +
1619                                       replGranularityString_ +
1620                                       "' (should be ['SESSION', " +
1621                                       "'ATTRIBUTE' or 'FIELD'])");
1622      }
1623      
1624      // Create the JBossCacheService
1625
try
1626      {
1627         proxy_ = new JBossCacheService(cacheObjectNameString_);
1628         
1629         // Confirm our replication granularity is compatible with the cache
1630
// Throws ISE if not
1631
validateFieldMarshalling();
1632         
1633         // We need to pass the classloader that is associated with this
1634
// web app so de-serialization will work correctly.
1635
tcl_ = container_.getLoader().getClassLoader();
1636         proxy_.start(tcl_, this);
1637      }
1638      catch (Throwable JavaDoc t)
1639      {
1640         String JavaDoc str = "Problem starting JBossCacheService for Tomcat clustering";
1641         log_.error(str, t);
1642         throw new LifecycleException(str, t);
1643      }
1644
1645      tm = proxy_.getTransactionManager();
1646      if(tm == null)
1647      {
1648         throw new LifecycleException("JBossCacheManager.start(): Obtain null tm");
1649      }
1650      
1651      try
1652      {
1653         initializeUnloadedSessions();
1654
1655         // Add SnapshotValve and, if needed, JvmRouteValve and batch repl valve
1656
installValves();
1657
1658         started_ = true;
1659         
1660         // Notify our interested LifecycleListeners
1661
lifecycle_.fireLifecycleEvent(AFTER_START_EVENT, this);
1662         
1663         log_.debug("start(): JBossCacheService started");
1664      }
1665      catch (Exception JavaDoc e)
1666      {
1667         log_.error("Unable to start manager.", e);
1668         throw new LifecycleException(e);
1669      }
1670      
1671      try
1672      {
1673         registerMBeans();
1674      }
1675      catch (Exception JavaDoc e)
1676      {
1677         log_.error("Could not register ManagerMBean with MBeanServer", e);
1678      }
1679   }
1680
1681   /**
1682    * Register this Manager with JMX.
1683    */

1684   private void registerMBeans()
1685   {
1686      try
1687      {
1688         MBeanServer JavaDoc server = getMBeanServer();
1689
1690         String JavaDoc domain;
1691         if (container_ instanceof ContainerBase)
1692         {
1693            domain = ((ContainerBase) container_).getDomain();
1694         }
1695         else
1696         {
1697            domain = server.getDefaultDomain();
1698         }
1699         String JavaDoc hostName = ((Host) container_.getParent()).getName();
1700         hostName = (hostName == null) ? "localhost" : hostName;
1701         ObjectName JavaDoc clusterName = new ObjectName JavaDoc(domain
1702               + ":service=ClusterManager,WebModule=//" + hostName
1703               + ((Context) container_).getPath());
1704
1705         if (server.isRegistered(clusterName))
1706         {
1707            log_.warn("MBean " + clusterName + " already registered");
1708            return;
1709         }
1710
1711         objectName_ = clusterName;
1712         server.registerMBean(this, clusterName);
1713
1714      }
1715      catch (Exception JavaDoc ex)
1716      {
1717         log_.error(ex.getMessage(), ex);
1718      }
1719   }
1720
1721   /**
1722    * Unregister this Manager from the JMX server.
1723    */

1724   private void unregisterMBeans()
1725   {
1726      if (mserver_ != null)
1727      {
1728         try
1729         {
1730            mserver_.unregisterMBean(objectName_);
1731         }
1732         catch (Exception JavaDoc e)
1733         {
1734            log_.error(e);
1735         }
1736      }
1737   }
1738
1739   /**
1740    * Get the current MBean Server.
1741    *
1742    * @return
1743    * @throws Exception
1744    */

1745   private MBeanServer JavaDoc getMBeanServer() throws Exception JavaDoc
1746   {
1747      if (mserver_ == null)
1748      {
1749         mserver_ = MBeanServerLocator.locateJBoss();
1750      }
1751      return (mserver_);
1752   }
1753   
1754   /**
1755    * Gets the ids of all sessions in the distributed cache and adds
1756    * them to the unloaded sessions map, with the current time as the
1757    * last replication time. This means these sessions may not be
1758    * evicted from the cache for a period well beyond when they would
1759    * normally expire, but this is a necessary tradeoff to avoid
1760    * deserializing them all to check their lastAccessedTime.
1761    */

1762   private void initializeUnloadedSessions() throws CacheException
1763   {
1764      Map JavaDoc sessions = proxy_.getSessionIds();
1765      if (sessions != null)
1766      {
1767         long now = System.currentTimeMillis();
1768         for (Iterator JavaDoc it = sessions.entrySet().iterator(); it.hasNext(); )
1769         {
1770            Map.Entry JavaDoc entry = (Entry) it.next();
1771            unloadedSessions_.put(entry.getKey(),
1772                  new OwnedSessionUpdate((String JavaDoc) entry.getValue(), now));
1773         }
1774      }
1775   }
1776
1777   /**
1778    * Instantiate a SnapshotManager and ClusteredSessionValve and add
1779    * the valve to our parent Context's pipeline.
1780    * Add a JvmRouteValve and BatchReplicationClusteredSessionValve if needed.
1781    *
1782    */

1783   private void installValves()
1784   {
1785      if (useJK_)
1786      {
1787         log_.info("We are using mod_jk(2) for load-balancing. " +
1788                   "Will add JvmRouteValve.");
1789         
1790         installContextValve(new JvmRouteValve(this));
1791      }
1792         
1793      // Add batch replication valve if needed.
1794
// TODO -- should we add this even if not FIELD in case a cross-context
1795
// call traverses a field-based webapp?
1796
if (replicationGranularity_ == ReplicationGranularity.FIELD &&
1797          Boolean.TRUE.equals(replicationFieldBatchMode_))
1798      {
1799         Valve batchValve = new BatchReplicationClusteredSessionValve(this);
1800         log_.debug("Adding BatchReplicationClusteredSessionValve for batch replication.");
1801         installContextValve(batchValve);
1802      }
1803
1804      // Add clustered session valve
1805
ClusteredSessionValve valve = new ClusteredSessionValve();
1806      installContextValve(valve);
1807   }
1808
1809   /**
1810    * Create and start a snapshot manager.
1811    */

1812   private void initSnapshotManager()
1813   {
1814      String JavaDoc ctxPath = ((Context) container_).getPath();
1815      if ("instant".equals(snapshotMode_)
1816            || replicationGranularity_== ReplicationGranularity.FIELD)
1817      {
1818         snapshotManager_ = new InstantSnapshotManager(this, ctxPath);
1819      }
1820      else if ("interval".equals(snapshotMode_))
1821      {
1822         snapshotManager_ = new IntervalSnapshotManager(this, ctxPath, snapshotInterval_);
1823      }
1824      else
1825      {
1826         log_.error("Snapshot mode must be 'instant' or 'interval' - " +
1827                    "using 'instant'");
1828         snapshotManager_ = new InstantSnapshotManager(this, ctxPath);
1829      }
1830      
1831      snapshotManager_.start();
1832   }
1833   
1834   private void installContextValve(Valve valve)
1835   {
1836      boolean installed = false;
1837      
1838      // In embedded mode, install the valve via JMX to be consistent
1839
// with the way the overall context is created in TomcatDeployer.
1840
// We can't do this in unembedded mode because we are called
1841
// before our Context is registered with the MBean server
1842
if (embedded_ && getContextObjectName() != null) {
1843         try
1844         {
1845            getMBeanServer().invoke(getContextObjectName(), "addValve",
1846                                    new Object JavaDoc[]{valve},
1847                                    new String JavaDoc[]{"org.apache.catalina.Valve"});
1848            installed = true;
1849         }
1850         catch (Exception JavaDoc e)
1851         {
1852            // JBAS-2422. If the context is restarted via JMX, the above
1853
// JMX call will fail as the context will not be registered
1854
// when it's made. So we catch the exception and fall back
1855
// to adding the valve directly.
1856
// TODO consider skipping adding via JMX and just do it directly
1857
log_.debug("Caught exception installing valve to Context", e);
1858         }
1859      }
1860      
1861      if (!installed)
1862      {
1863         // If possible install via the ContainerBase.addValve() API.
1864
if (container_ instanceof ContainerBase)
1865         {
1866            ((ContainerBase) container_).addValve(valve);
1867         }
1868         else
1869         {
1870            // No choice; have to add it to the context's pipeline
1871
container_.getPipeline().addValve(valve);
1872         }
1873      }
1874   }
1875
1876   /**
1877    * If we are using FIELD granularity, checks that the TreeCache
1878    * supports marshalling.
1879    *
1880    * @throws IllegalStateException if not
1881    */

1882   private void validateFieldMarshalling()
1883   {
1884      if (replicationGranularity_ == ReplicationGranularity.FIELD
1885            && !proxy_.isMarshallingAvailable())
1886      {
1887         // BES 16/8/2006 -- throw ISE, not ClusteringNotSupportedException, as a
1888
// misconfig should be treated differently from the absence of clustering
1889
// services
1890
throw new IllegalStateException JavaDoc("replication-granularity value is set to " +
1891               "'FIELD' but is not supported by the cache service configuration. " +
1892               "Must set 'UseRegionBasedMarshalling' to 'true' in the " +
1893               "tc6-cluster.sar jboss-service.xml");
1894      }
1895   }
1896   
1897   private ObjectName JavaDoc getContextObjectName()
1898   {
1899      String JavaDoc oname = container_.getObjectName();
1900      try
1901      {
1902         return (oname == null) ? null : new ObjectName JavaDoc(oname);
1903      }
1904      catch (MalformedObjectNameException JavaDoc e)
1905      {
1906         log_.warn("Error creating object name from string " + oname, e);
1907         return null;
1908      }
1909   }
1910
1911   
1912   private class OwnedSessionUpdate
1913   {
1914      String JavaDoc owner;
1915      long updateTime;
1916      
1917      OwnedSessionUpdate(String JavaDoc owner, long updateTime)
1918      {
1919         this.owner = owner;
1920         this.updateTime = updateTime;
1921      }
1922   }
1923}
1924
Popular Tags