KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > catalina > cluster > session > DeltaManager


1 /*
2  * Copyright 1999,2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.apache.catalina.cluster.session;
18
19 import java.beans.PropertyChangeEvent JavaDoc;
20 import java.beans.PropertyChangeListener JavaDoc;
21 import java.io.BufferedInputStream JavaDoc;
22 import java.io.BufferedOutputStream JavaDoc;
23 import java.io.ByteArrayInputStream JavaDoc;
24 import java.io.ByteArrayOutputStream JavaDoc;
25 import java.io.IOException JavaDoc;
26 import java.io.ObjectInputStream JavaDoc;
27 import java.io.ObjectOutputStream JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.Iterator JavaDoc;
30 import org.apache.catalina.Container;
31 import org.apache.catalina.Context;
32 import org.apache.catalina.Lifecycle;
33 import org.apache.catalina.LifecycleException;
34 import org.apache.catalina.LifecycleListener;
35 import org.apache.catalina.Loader;
36 import org.apache.catalina.Session;
37 import org.apache.catalina.util.CustomObjectInputStream;
38 import org.apache.catalina.util.LifecycleSupport;
39 import org.apache.catalina.util.StringManager;
40
41 import org.apache.catalina.session.ManagerBase;
42 import org.apache.catalina.cluster.ClusterManager;
43 import org.apache.catalina.cluster.ClusterMessage;
44 import org.apache.catalina.cluster.Member;
45 import org.apache.catalina.cluster.CatalinaCluster;
46
47 /**
48  * The DeltaManager manages replicated sessions by only replicating the deltas
49  * in data. For applications written to handle this, the DeltaManager is the
50  * optimal way of replicating data.
51  *
52  * This code is almost identical to StandardManager with a difference in how it
53  * persists sessions and some modifications to it.
54  *
55  * <b>IMPLEMENTATION NOTE </b>: Correct behavior of session storing and
56  * reloading depends upon external calls to the <code>start()</code> and
57  * <code>stop()</code> methods of this class at the correct times.
58  *
59  * @author Filip Hanik
60  * @author Craig R. McClanahan
61  * @author Jean-Francois Arcand
62  * @version $Revision: 1.41 $ $Date: 2005/03/03 14:06:36 $
63  */

64
65 public class DeltaManager extends ManagerBase implements Lifecycle,
66         PropertyChangeListener JavaDoc, ClusterManager {
67
68     // ---------------------------------------------------- Security Classes
69

70     public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
71             .getLog(DeltaManager.class);
72
73     /**
74      * The string manager for this package.
75      */

76     protected static StringManager sm = StringManager
77             .getManager(Constants.Package);
78
79     // ----------------------------------------------------- Instance Variables
80

81     /**
82      * The descriptive information about this implementation.
83      */

84     private static final String JavaDoc info = "DeltaManager/1.1";
85
86     /**
87      * The lifecycle event support for this component.
88      */

89     protected LifecycleSupport lifecycle = new LifecycleSupport(this);
90
91     /**
92      * The maximum number of active Sessions allowed, or -1 for no limit.
93      */

94     private int maxActiveSessions = -1;
95
96     /**
97      * The descriptive name of this Manager implementation (for logging).
98      */

99     protected static String JavaDoc managerName = "DeltaManager";
100
101     protected String JavaDoc name = null;
102
103     /**
104      * Has this component been started yet?
105      */

106     private boolean started = false;
107
108     int rejectedSessions = 0;
109
110     int expiredSessions = 0;
111
112     long processingTime = 0;
113
114     private CatalinaCluster cluster = null;
115
116     private boolean stateTransferred;
117
118     private boolean useDirtyFlag;
119
120     private boolean expireSessionsOnShutdown;
121
122     private boolean printToScreen;
123
124     private boolean notifyListenersOnReplication = false;
125
126     // ------------------------------------------------------------- Constructor
127
public DeltaManager() {
128         super();
129     }
130
131     // ------------------------------------------------------------- Properties
132

133     /**
134      * Set the Container with which this Manager has been associated. If it is a
135      * Context (the usual case), listen for changes to the session timeout
136      * property.
137      *
138      * @param container
139      * The associated Container
140      */

141     public void setContainer(Container container) {
142
143         // De-register from the old Container (if any)
144
if ((this.container != null) && (this.container instanceof Context))
145             ((Context) this.container).removePropertyChangeListener(this);
146
147         // Default processing provided by our superclass
148
super.setContainer(container);
149
150         // Register with the new Container (if any)
151
if ((this.container != null) && (this.container instanceof Context)) {
152             setMaxInactiveInterval(((Context) this.container)
153                     .getSessionTimeout() * 60);
154             ((Context) this.container).addPropertyChangeListener(this);
155         }
156
157     }
158
159     /**
160      * Return descriptive information about this Manager implementation and the
161      * corresponding version number, in the format
162      * <code>&lt;description&gt;/&lt;version&gt;</code>.
163      */

164     public String JavaDoc getInfo() {
165
166         return (info);
167
168     }
169
170     /**
171      * Return the maximum number of active Sessions allowed, or -1 for no limit.
172      */

173     public int getMaxActiveSessions() {
174
175         return (this.maxActiveSessions);
176
177     }
178
179     /**
180      * Number of session creations that failed due to maxActiveSessions
181      *
182      * @return The count
183      */

184     public int getRejectedSessions() {
185         return rejectedSessions;
186     }
187
188     public void setRejectedSessions(int rejectedSessions) {
189         this.rejectedSessions = rejectedSessions;
190     }
191
192     /**
193      * Set the maximum number of actives Sessions allowed, or -1 for no limit.
194      *
195      * @param max
196      * The new maximum number of sessions
197      */

198     public void setMaxActiveSessions(int max) {
199
200         int oldMaxActiveSessions = this.maxActiveSessions;
201         this.maxActiveSessions = max;
202         support.firePropertyChange("maxActiveSessions", new Integer JavaDoc(
203                 oldMaxActiveSessions), new Integer JavaDoc(this.maxActiveSessions));
204
205     }
206
207     /**
208      * Return the descriptive short name of this Manager implementation.
209      */

210     public String JavaDoc getName() {
211
212         return (name);
213
214     }
215
216     // --------------------------------------------------------- Public Methods
217

218     /**
219      * Construct and return a new session object, based on the default settings
220      * specified by this Manager's properties. The session id will be assigned
221      * by this method, and available via the getId() method of the returned
222      * session. If a new session cannot be created for any reason, return
223      * <code>null</code>.
224      *
225      * @exception IllegalStateException
226      * if a new session cannot be instantiated for any reason
227      *
228      * Construct and return a new session object, based on the default settings
229      * specified by this Manager's properties. The session id will be assigned
230      * by this method, and available via the getId() method of the returned
231      * session. If a new session cannot be created for any reason, return
232      * <code>null</code>.
233      *
234      * @exception IllegalStateException
235      * if a new session cannot be instantiated for any reason
236      */

237     public Session createSession(String JavaDoc sessionId) {
238         return createSession(sessionId, true);
239     }
240
241     /**
242      * create new session with check maxActiveSessions and send session creation
243      * to other cluster nodes.
244      *
245      * @param distribute
246      * @return
247      */

248     public Session createSession(String JavaDoc sessionId, boolean distribute) {
249
250         if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) {
251             rejectedSessions++;
252             throw new IllegalStateException JavaDoc(sm
253                     .getString("deltaManager.createSession.ise"));
254         }
255
256         // Recycle or create a Session instance
257
DeltaSession session = getNewDeltaSession();
258         if (sessionId == null) {
259             sessionId = generateSessionId();
260             synchronized (sessions) {
261                 while (sessions.get(sessionId) != null) { // Guarantee
262
// uniqueness
263
duplicates++;
264                     sessionId = generateSessionId();
265                 }
266             }
267         }
268
269         session.setNew(true);
270         session.setValid(true);
271         session.setCreationTime(System.currentTimeMillis());
272         session.setMaxInactiveInterval(this.maxInactiveInterval);
273         session.setId(sessionId);
274         session.resetDeltaRequest();
275         // Initialize the properties of the new session and return it
276

277         sessionCounter++;
278
279         if (distribute) {
280             SessionMessage msg = new SessionMessageImpl(getName(),
281                     SessionMessage.EVT_SESSION_CREATED, null, sessionId,
282                     sessionId + System.currentTimeMillis());
283             if (log.isDebugEnabled())
284                 log.debug(sm.getString("deltaManager.sendMessage.newSession",
285                         name, sessionId));
286             cluster.send(msg);
287             session.resetDeltaRequest();
288         }
289         if (log.isDebugEnabled())
290             log.debug(sm.getString("deltaManager.createSession.newSession",
291                     sessionId, new Integer JavaDoc(sessions.size())));
292
293         return (session);
294
295     }
296
297     /**
298      * Get new session class to be used in the doLoad() method.
299      */

300     protected DeltaSession getNewDeltaSession() {
301         return new DeltaSession(this);
302     }
303
304     private DeltaRequest loadDeltaRequest(DeltaSession session, byte[] data)
305             throws ClassNotFoundException JavaDoc, IOException JavaDoc {
306         ByteArrayInputStream JavaDoc fis = null;
307         ReplicationStream ois = null;
308         Loader loader = null;
309         ClassLoader JavaDoc classLoader = null;
310         //fix to be able to run the DeltaManager
311
//stand alone without a container.
312
//use the Threads context class loader
313
if (container != null)
314             loader = container.getLoader();
315         if (loader != null)
316             classLoader = loader.getClassLoader();
317         else
318             classLoader = Thread.currentThread().getContextClassLoader();
319         //end fix
320
fis = new ByteArrayInputStream JavaDoc(data);
321         ois = new ReplicationStream(fis, classLoader);
322         session.getDeltaRequest().readExternal(ois);
323         ois.close();
324         return session.getDeltaRequest();
325     }
326
327     private byte[] unloadDeltaRequest(DeltaRequest deltaRequest)
328             throws IOException JavaDoc {
329         ByteArrayOutputStream JavaDoc bos = new ByteArrayOutputStream JavaDoc();
330         ObjectOutputStream JavaDoc oos = new ObjectOutputStream JavaDoc(bos);
331         deltaRequest.writeExternal(oos);
332         oos.flush();
333         oos.close();
334         return bos.toByteArray();
335     }
336
337     /**
338      * Load any currently active sessions that were previously unloaded to the
339      * appropriate persistence mechanism, if any. If persistence is not
340      * supported, this method returns without doing anything.
341      *
342      * @exception ClassNotFoundException
343      * if a serialized class cannot be found during the reload
344      * @exception IOException
345      * if an input/output error occurs
346      */

347     private void doLoad(byte[] data) throws ClassNotFoundException JavaDoc, IOException JavaDoc {
348
349         // Initialize our internal data structures
350
//sessions.clear(); //should not do this
351
// Open an input stream to the specified pathname, if any
352
ByteArrayInputStream JavaDoc fis = null;
353         ObjectInputStream JavaDoc ois = null;
354         Loader loader = null;
355         ClassLoader JavaDoc classLoader = null;
356         ClassLoader JavaDoc originalLoader = Thread.currentThread()
357                 .getContextClassLoader();
358         try {
359
360             try {
361                 fis = new ByteArrayInputStream JavaDoc(data);
362                 BufferedInputStream JavaDoc bis = new BufferedInputStream JavaDoc(fis);
363                 if (container != null)
364                     loader = container.getLoader();
365                 if (loader != null)
366                     classLoader = loader.getClassLoader();
367                 if (classLoader != null) {
368                     if (log.isTraceEnabled())
369                         log.trace(sm.getString(
370                                 "deltaManager.loading.withContextClassLoader",
371                                 getName()));
372                     ois = new CustomObjectInputStream(bis, classLoader);
373                     Thread.currentThread().setContextClassLoader(classLoader);
374                 } else {
375                     if (log.isTraceEnabled())
376                         log.trace(sm.getString(
377                                 "deltaManager.loading.withoutClassLoader",
378                                 getName()));
379                     ois = new ObjectInputStream JavaDoc(bis);
380                 }
381             } catch (IOException JavaDoc e) {
382                 log.error(sm.getString("deltaManager.loading.ioe", e), e);
383                 if (ois != null) {
384                     try {
385                         ois.close();
386                     } catch (IOException JavaDoc f) {
387                         ;
388                     }
389                     ois = null;
390                 }
391                 throw e;
392             }
393             // Load the previously unloaded active sessions
394
synchronized (sessions) {
395                 try {
396                     Integer JavaDoc count = (Integer JavaDoc) ois.readObject();
397                     int n = count.intValue();
398                     for (int i = 0; i < n; i++) {
399                         DeltaSession session = getNewDeltaSession();
400                         session.readObjectData(ois);
401                         session.setManager(this);
402                         session.setValid(true);
403                         session.setPrimarySession(false);
404                         //in case the nodes in the cluster are out of
405
//time synch, this will make sure that we have the
406
//correct timestamp, isValid returns true, cause
407
// accessCount=1
408
session.access();
409                         //make sure that the session gets ready to expire if
410
// needed
411
session.setAccessCount(0);
412                         session.resetDeltaRequest();
413                         sessions.put(session.getId(), session);
414                     }
415                 } catch (ClassNotFoundException JavaDoc e) {
416                     log.error(sm.getString("deltaManager.loading.cnfe", e), e);
417                     if (ois != null) {
418                         try {
419                             ois.close();
420                         } catch (IOException JavaDoc f) {
421                             ;
422                         }
423                         ois = null;
424                     }
425                     throw e;
426                 } catch (IOException JavaDoc e) {
427                     log.error(sm.getString("deltaManager.loading.ioe", e), e);
428                     if (ois != null) {
429                         try {
430                             ois.close();
431                         } catch (IOException JavaDoc f) {
432                             ;
433                         }
434                         ois = null;
435                     }
436                     throw e;
437                 } finally {
438                     // Close the input stream
439
try {
440                         if (ois != null)
441                             ois.close();
442                     } catch (IOException JavaDoc f) {
443                         // ignored
444
}
445                 }
446             }
447         } finally {
448             if (originalLoader != null)
449                 Thread.currentThread().setContextClassLoader(originalLoader);
450         }
451
452     }
453
454     /**
455      * Save any currently active sessions in the appropriate persistence
456      * mechanism, if any. If persistence is not supported, this method returns
457      * without doing anything.
458      *
459      * @exception IOException
460      * if an input/output error occurs
461      */

462     private byte[] doUnload() throws IOException JavaDoc {
463
464         // Open an output stream to the specified pathname, if any
465
ByteArrayOutputStream JavaDoc fos = null;
466         ObjectOutputStream JavaDoc oos = null;
467         try {
468             fos = new ByteArrayOutputStream JavaDoc();
469             oos = new ObjectOutputStream JavaDoc(new BufferedOutputStream JavaDoc(fos));
470         } catch (IOException JavaDoc e) {
471             log.error(sm.getString("deltaManager.unloading.ioe", e), e);
472             if (oos != null) {
473                 try {
474                     oos.close();
475                 } catch (IOException JavaDoc f) {
476                     ;
477                 }
478                 oos = null;
479             }
480             throw e;
481         }
482
483         // Write the number of active sessions, followed by the details
484
ArrayList JavaDoc list = new ArrayList JavaDoc();
485         synchronized (sessions) {
486             try {
487                 oos.writeObject(new Integer JavaDoc(sessions.size()));
488                 Iterator JavaDoc elements = sessions.values().iterator();
489                 while (elements.hasNext()) {
490                     DeltaSession session = (DeltaSession) elements.next();
491                     list.add(session);
492                     session.writeObjectData(oos);
493                 }
494                 oos.flush();
495                 oos.close();
496                 oos = null;
497             } catch (IOException JavaDoc e) {
498                 log.error(sm.getString("deltaManager.unloading.ioe", e), e);
499                 if (oos != null) {
500                     try {
501                         oos.close();
502                     } catch (IOException JavaDoc f) {
503                         ;
504                     }
505                     oos = null;
506                 }
507                 throw e;
508             }
509         }
510
511         // Flush and close the output stream
512
return fos.toByteArray();
513     }
514
515     // ------------------------------------------------------ Lifecycle Methods
516

517     /**
518      * Add a lifecycle event listener to this component.
519      *
520      * @param listener
521      * The listener to add
522      */

523     public void addLifecycleListener(LifecycleListener listener) {
524
525         lifecycle.addLifecycleListener(listener);
526
527     }
528
529     /**
530      * Get the lifecycle listeners associated with this lifecycle. If this
531      * Lifecycle has no listeners registered, a zero-length array is returned.
532      */

533     public LifecycleListener[] findLifecycleListeners() {
534
535         return lifecycle.findLifecycleListeners();
536
537     }
538
539     /**
540      * Remove a lifecycle event listener from this component.
541      *
542      * @param listener
543      * The listener to remove
544      */

545     public void removeLifecycleListener(LifecycleListener listener) {
546
547         lifecycle.removeLifecycleListener(listener);
548
549     }
550
551     /**
552      * Prepare for the beginning of active use of the public methods of this
553      * component. This method should be called after <code>configure()</code>,
554      * and before any of the public methods of the component are utilized.
555      *
556      * @exception LifecycleException
557      * if this component detects a fatal error that prevents this
558      * component from being used
559      */

560     public void start() throws LifecycleException {
561         if (!initialized)
562             init();
563
564         // Validate and update our current component state
565
if (started) {
566             return;
567         }
568         started = true;
569         lifecycle.fireLifecycleEvent(START_EVENT, null);
570
571         // Force initialization of the random number generator
572
String JavaDoc dummy = generateSessionId();
573
574         // Load unloaded sessions, if any
575
try {
576             //the channel is already running
577
if (cluster == null) {
578                 log.error(sm.getString("deltaManager.noCluster", getName()));
579                 return;
580             }
581             if (log.isInfoEnabled())
582                 log.info(sm
583                         .getString("deltaManager.startClustering", getName()));
584             //to survice context reloads, as only a stop/start is called, not
585
// createManager
586
getCluster().addManager(getName(), this);
587
588             if (cluster.getMembers().length > 0) {
589                 Member mbr = cluster.getMembers()[0];
590                 SessionMessage msg = new SessionMessageImpl(this.getName(),
591                         SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL",
592                         "GET-ALL-" + getName());
593                 //just to make sure the other server has the context started
594
// long timetowait = 20000-mbr.getMemberAliveTime();
595
// if ( timetowait > 0 ) {
596
// log.info("The other server has not been around more than 20
597
// seconds, will sleep for "+timetowait+" ms. in order to let it
598
// startup");
599
// try { Thread.currentThread().sleep(timetowait); } catch (
600
// Exception x ) {}
601
// }//end if
602

603                 //request session state
604
cluster.send(msg, mbr);
605                 if (log.isWarnEnabled())
606                     log.warn(sm.getString("deltaManager.waitForSessionState",
607                             getName(), mbr));
608                 long reqStart = System.currentTimeMillis();
609                 long reqNow = 0;
610                 boolean isTimeout = false;
611                 do {
612                     try {
613                         Thread.sleep(100);
614                     } catch (Exception JavaDoc sleep) {
615                     }
616                     reqNow = System.currentTimeMillis();
617                     isTimeout = ((reqNow - reqStart) > (1000 * 60));
618                 } while ((!getStateTransferred()) && (!isTimeout));
619                 if (isTimeout || (!getStateTransferred())) {
620                     log.error(sm.getString("deltaManager.noSessionState",
621                             getName()));
622                 } else {
623                     if (log.isInfoEnabled())
624                         log.info(sm.getString("deltaManager.sessionReceived",
625                                 getName(), new Long JavaDoc(reqNow - reqStart)));
626                 }
627             } else {
628                 if (log.isInfoEnabled())
629                     log.info(sm.getString("deltaManager.noMembers", getName()));
630             }//end if
631

632         } catch (Throwable JavaDoc t) {
633             log.error(sm.getString("deltaManager.managerLoad"), t);
634         }
635     }
636
637     /**
638      * Gracefully terminate the active use of the public methods of this
639      * component. This method should be the last one called on a given instance
640      * of this component.
641      *
642      * @exception LifecycleException
643      * if this component detects a fatal error that needs to be
644      * reported
645      */

646     public void stop() throws LifecycleException {
647
648         if (log.isDebugEnabled())
649             log.debug(sm.getString("deltaManager.stopped", getName()));
650
651         getCluster().removeManager(getName());
652
653         // Validate and update our current component state
654
if (!started)
655             throw new LifecycleException(sm
656                     .getString("deltaManager.notStarted"));
657         lifecycle.fireLifecycleEvent(STOP_EVENT, null);
658         started = false;
659
660         // Expire all active sessions
661
if (log.isInfoEnabled())
662             log.info(sm.getString("deltaManager.expireSessions", getName()));
663         Session sessions[] = findSessions();
664         for (int i = 0; i < sessions.length; i++) {
665             DeltaSession session = (DeltaSession) sessions[i];
666             if (!session.isValid())
667                 continue;
668             try {
669                 session.expire(true, this.getExpireSessionsOnShutdown());
670             } catch (Throwable JavaDoc t) {
671                 ;
672             } //catch
673
} //for
674

675         // Require a new random number generator if we are restarted
676
this.random = null;
677
678         if (initialized) {
679             destroy();
680         }
681     }
682
683     // ----------------------------------------- PropertyChangeListener Methods
684

685     /**
686      * Process property change events from our associated Context.
687      *
688      * @param event
689      * The property change event that has occurred
690      */

691     public void propertyChange(PropertyChangeEvent JavaDoc event) {
692
693         // Validate the source of this event
694
if (!(event.getSource() instanceof Context))
695             return;
696         Context context = (Context) event.getSource();
697
698         // Process a relevant property change
699
if (event.getPropertyName().equals("sessionTimeout")) {
700             try {
701                 setMaxInactiveInterval(((Integer JavaDoc) event.getNewValue())
702                         .intValue() * 60);
703             } catch (NumberFormatException JavaDoc e) {
704                 log.error(sm.getString("deltaManager.sessionTimeout", event
705                         .getNewValue()));
706             }
707         }
708
709     }
710
711     // -------------------------------------------------------- Replication
712
// Methods
713

714     /**
715      * A message was received from another node, this is the callback method to
716      * implement if you are interested in receiving replication messages.
717      *
718      * @param msg -
719      * the message received.
720      */

721     public void messageDataReceived(ClusterMessage cmsg) {
722         if (cmsg instanceof SessionMessage) {
723             SessionMessage msg = (SessionMessage) cmsg;
724             messageReceived(msg, msg.getAddress() != null ? (Member) msg
725                     .getAddress() : null);
726         }
727     }
728
729     /**
730      * When the request has been completed, the replication valve will notify
731      * the manager, and the manager will decide whether any replication is
732      * needed or not. If there is a need for replication, the manager will
733      * create a session message and that will be replicated. The cluster
734      * determines where it gets sent.
735      *
736      * @param sessionId -
737      * the sessionId that just completed.
738      * @return a SessionMessage to be sent,
739      */

740     public ClusterMessage requestCompleted(String JavaDoc sessionId) {
741         try {
742             DeltaSession session = (DeltaSession) findSession(sessionId);
743             DeltaRequest deltaRequest = session.getDeltaRequest();
744             SessionMessage msg = null;
745             if (deltaRequest.getSize() > 0) {
746
747                 byte[] data = unloadDeltaRequest(deltaRequest);
748                 msg = new SessionMessageImpl(name,
749                         SessionMessage.EVT_SESSION_DELTA, data, sessionId,
750                         sessionId + System.currentTimeMillis());
751                 session.resetDeltaRequest();
752                 if (log.isDebugEnabled()) {
753                     log.debug(sm.getString(
754                             "deltaManager.createMessage.delta",
755                             getName(), sessionId));
756                 }
757             } else if (!session.isPrimarySession()) {
758                 msg = new SessionMessageImpl(getName(),
759                         SessionMessage.EVT_SESSION_ACCESSED, null, sessionId,
760                         sessionId + System.currentTimeMillis());
761                 if (log.isDebugEnabled()) {
762                     log.debug(sm.getString(
763                             "deltaManager.createMessage.accessChangePrimary",
764                             getName(), sessionId));
765                 }
766             }
767             session.setPrimarySession(true);
768             //check to see if we need to send out an access message
769
if ((msg == null)) {
770                 long replDelta = System.currentTimeMillis()
771                         - session.getLastTimeReplicated();
772                 if (replDelta > (getMaxInactiveInterval() * 1000)) {
773                     msg = new SessionMessageImpl(getName(),
774                             SessionMessage.EVT_SESSION_ACCESSED, null,
775                             sessionId, sessionId + System.currentTimeMillis());
776                     if (log.isDebugEnabled()) {
777                         log.debug(sm.getString(
778                                 "deltaManager.createMessage.access", getName(),
779                                 sessionId));
780                     }
781                 }
782
783             }
784
785             //update last replicated time
786
if (msg != null)
787                 session.setLastTimeReplicated(System.currentTimeMillis());
788             return msg;
789         } catch (IOException JavaDoc x) {
790             log.error(sm.getString(
791                     "deltaManager.createMessage.unableCreateDeltaRequest",
792                     sessionId), x);
793             return null;
794         }
795
796     }
797
798     /**
799      * send session expired to other cluster nodes
800      *
801      * @param id
802      * session id
803      */

804     protected void sessionExpired(String JavaDoc id) {
805         SessionMessage msg = new SessionMessageImpl(getName(),
806                 SessionMessage.EVT_SESSION_EXPIRED, null, id, id
807                         + "-EXPIRED-MSG");
808         if (log.isDebugEnabled())
809             log.debug(sm.getString("deltaManager.createMessage.expire",
810                     getName(), id));
811         cluster.send(msg);
812     }
813
814     /**
815      * When the manager expires session not tied to a request. The cluster will
816      * periodically ask for a list of sessions that should expire and that
817      * should be sent across the wire.
818      *
819      * @return
820      */

821     public String JavaDoc[] getInvalidatedSessions() {
822         return new String JavaDoc[0];
823     }
824
825     /**
826      * This method is called by the received thread when a SessionMessage has
827      * been received from one of the other nodes in the cluster.
828      *
829      * @param msg -
830      * the message received
831      * @param sender -
832      * the sender of the message, this is used if we receive a
833      * EVT_GET_ALL_SESSION message, so that we only reply to the
834      * requesting node
835      */

836     protected void messageReceived(SessionMessage msg, Member sender) {
837         try {
838             if (log.isDebugEnabled())
839                 log.debug(sm.getString("deltaManager.receiveMessage.eventType",
840                         getName(), msg.getEventTypeString(), sender));
841             switch (msg.getEventType()) {
842             case SessionMessage.EVT_GET_ALL_SESSIONS: {
843                 //get a list of all the session from this manager
844
if (log.isDebugEnabled())
845                     log.debug(sm.getString(
846                             "deltaManager.receiveMessage.unloadingBegin",
847                             getName()));
848                 byte[] data = doUnload();
849                 if (log.isDebugEnabled())
850                     log.debug(sm.getString(
851                             "deltaManager.receiveMessage.unloadingAfter",
852                             getName()));
853                 SessionMessage newmsg = new SessionMessageImpl(name,
854                         SessionMessage.EVT_ALL_SESSION_DATA, data,
855                         "SESSION-STATE", "SESSION-STATE-" + getName());
856                 if (log.isDebugEnabled())
857                     log.debug(sm.getString(
858                             "deltaManager.createMessage.allSessionData",
859                             getName()));
860                 cluster.send(newmsg, sender);
861                 break;
862             }
863             case SessionMessage.EVT_ALL_SESSION_DATA: {
864                 if (log.isDebugEnabled())
865                     log.debug(sm.getString(
866                             "deltaManager.receiveMessage.allSessionDataBegin",
867                             getName()));
868                 byte[] data = msg.getSession();
869                 doLoad(data);
870                 if (log.isDebugEnabled())
871                     log.debug(sm.getString(
872                             "deltaManager.receiveMessage.allSessionDataAfter",
873                             getName()));
874                 stateTransferred = true;
875                 break;
876             }
877             case SessionMessage.EVT_SESSION_CREATED: {
878                 if (log.isDebugEnabled())
879                     log.debug(sm.getString(
880                             "deltaManager.receiveMessage.createNewSession",
881                             getName(), msg.getSessionID()));
882                 DeltaSession session = (DeltaSession) createSession(msg
883                         .getSessionID(), false);
884                 // Q: Why inform all session listener at replicate node?
885
session.setId(msg.getSessionID());
886                 session.setNew(false);
887                 session.setPrimarySession(false);
888                 // Q: Why generate a delta data structure?
889
session.resetDeltaRequest();
890                 break;
891             }
892             case SessionMessage.EVT_SESSION_EXPIRED: {
893                 DeltaSession session = (DeltaSession) findSession(msg
894                         .getSessionID());
895                 if (session != null) {
896                     if (log.isDebugEnabled())
897                         log.debug(sm.getString(
898                                 "deltaManager.receiveMessage.expired",
899                                 getName(), msg.getSessionID()));
900                     // Q: Why not only remove from manager?
901
session.expire(true, false);
902                 } //end if
903
break;
904             }
905             case SessionMessage.EVT_SESSION_ACCESSED: {
906                 DeltaSession session = (DeltaSession) findSession(msg
907                         .getSessionID());
908                 if (session != null) {
909                     if (log.isDebugEnabled())
910                         log.debug(sm.getString(
911                                 "deltaManager.receiveMessage.accessed",
912                                 getName(), msg.getSessionID()));
913                     session.access();
914                     session.setPrimarySession(false);
915                     session.endAccess();
916                 }
917                 break;
918             }
919             case SessionMessage.EVT_SESSION_DELTA: {
920                 byte[] delta = msg.getSession();
921                 DeltaSession session = (DeltaSession) findSession(msg
922                         .getSessionID());
923                 if (session != null) {
924                     log.debug(sm.getString("deltaManager.receiveMessage.delta",
925                             getName(), msg.getSessionID()));
926                     DeltaRequest dreq = loadDeltaRequest(session, delta);
927                     dreq.execute(session, notifyListenersOnReplication);
928                     session.setPrimarySession(false);
929                 }
930
931                 break;
932             }
933             default: {
934                 //we didn't recognize the message type, do nothing
935
break;
936             }
937             } //switch
938
} catch (Exception JavaDoc x) {
939             log.error(sm.getString("deltaManager.receiveMessage.error",
940                     getName()), x);
941         }
942     }
943
944     // -------------------------------------------------------- Private Methods
945

946     public boolean getStateTransferred() {
947         return stateTransferred;
948     }
949
950     public void setStateTransferred(boolean stateTransferred) {
951         this.stateTransferred = stateTransferred;
952     }
953
954     public CatalinaCluster getCluster() {
955         return cluster;
956     }
957
958     public void setCluster(CatalinaCluster cluster) {
959         this.cluster = cluster;
960     }
961
962     public void load() {
963
964     }
965
966     public void unload() {
967
968     }
969
970     public boolean getUseDirtyFlag() {
971         return useDirtyFlag;
972     }
973
974     public void setUseDirtyFlag(boolean useDirtyFlag) {
975         this.useDirtyFlag = useDirtyFlag;
976     }
977
978     public boolean getExpireSessionsOnShutdown() {
979         return expireSessionsOnShutdown;
980     }
981
982     public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) {
983         this.expireSessionsOnShutdown = expireSessionsOnShutdown;
984     }
985
986     public boolean getPrintToScreen() {
987         return printToScreen;
988     }
989
990     public void setPrintToScreen(boolean printToScreen) {
991         this.printToScreen = printToScreen;
992     }
993
994     public void setName(String JavaDoc name) {
995         this.name = name;
996     }
997
998     public boolean getNotifyListenersOnReplication() {
999         return notifyListenersOnReplication;
1000    }
1001
1002    public void setNotifyListenersOnReplication(
1003            boolean notifyListenersOnReplication) {
1004        this.notifyListenersOnReplication = notifyListenersOnReplication;
1005    }
1006
1007}
Popular Tags