KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > catalina > cluster > session > SimpleTcpReplicationManager


1 /*
2  * Copyright 1999,2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package org.apache.catalina.cluster.session;
17
18 import java.io.IOException JavaDoc;
19
20 import org.apache.catalina.LifecycleException;
21 import org.apache.catalina.Session;
22 import org.apache.catalina.cluster.CatalinaCluster;
23 import org.apache.catalina.cluster.ClusterMessage;
24 import org.apache.catalina.cluster.Member;
25 import org.apache.catalina.realm.GenericPrincipal;
26
27 /**
28  * Title: Tomcat Session Replication for Tomcat 4.0 <BR>
29  * Description: A very simple straight forward implementation of
30  * session replication of servers in a cluster.<BR>
31  * This session replication is implemented "live". By live
32  * I mean, when a session attribute is added into a session on Node A
33  * a message is broadcasted to other messages and setAttribute is called on the
34  * replicated sessions.<BR>
35  * A full description of this implementation can be found under
36  * <href="http://www.filip.net/tomcat/">Filip's Tomcat Page</a><BR>
37  *
38  * Copyright: See apache license
39  * Company: www.filip.net
40  * @author <a HREF="mailto:mail@filip.net">Filip Hanik</a>
41  * @author Bela Ban (modifications for synchronous replication)
42  * @version 1.0 for TC 4.0
43  * Description: The InMemoryReplicationManager is a session manager that replicated
44  * session information in memory. It uses <a HREF="www.javagroups.com">JavaGroups</a> as
45  * a communication protocol to ensure guaranteed and ordered message delivery.
46  * JavaGroups also provides a very flexible protocol stack to ensure that the replication
47  * can be used in any environment.
48  * <BR><BR>
49  * The InMemoryReplicationManager extends the StandardManager hence it allows for us
50  * to inherit all the basic session management features like expiration, session listeners etc
51  * <BR><BR>
52  * To communicate with other nodes in the cluster, the InMemoryReplicationManager sends out 7 different type of multicast messages
53  * all defined in the SessionMessage class.<BR>
54  * When a session is replicated (not an attribute added/removed) the session is serialized into
55  * a byte array using the StandardSession.readObjectData, StandardSession.writeObjectData methods.
56  */

57 public class SimpleTcpReplicationManager extends org.apache.catalina.session.StandardManager
58 implements org.apache.catalina.cluster.ClusterManager
59 {
60     public static org.apache.commons.logging.Log log =
61         org.apache.commons.logging.LogFactory.getLog( SimpleTcpReplicationManager.class );
62
63     //the channel configuration
64
protected String JavaDoc mChannelConfig = null;
65
66     //the group name
67
protected String JavaDoc mGroupName = "TomcatReplication";
68
69     //somehow start() gets called more than once
70
protected boolean mChannelStarted = false;
71
72     //log to screen
73
protected boolean mPrintToScreen = true;
74
75
76
77     protected boolean mManagerRunning = false;
78
79     /** Use synchronous rather than asynchronous replication. Every session modification (creation, change, removal etc)
80      * will be sent to all members. The call will then wait for max milliseconds, or forever (if timeout is 0) for
81      * all responses.
82      */

83     protected boolean synchronousReplication=true;
84
85     /** Set to true if we don't want the sessions to expire on shutdown */
86     protected boolean mExpireSessionsOnShutdown = true;
87
88     protected boolean useDirtyFlag = false;
89
90     protected String JavaDoc name;
91
92     protected boolean distributable = true;
93
94     protected CatalinaCluster cluster;
95
96     protected java.util.HashMap JavaDoc invalidatedSessions = new java.util.HashMap JavaDoc();
97
98     /**
99      * Flag to keep track if the state has been transferred or not
100      * Assumes false.
101      */

102     protected boolean stateTransferred = false;
103     private boolean notifyListenersOnReplication;
104
105     /**
106      * Constructor, just calls super()
107      *
108      */

109     public SimpleTcpReplicationManager()
110     {
111         super();
112     }
113
114
115     public boolean isManagerRunning()
116     {
117         return mManagerRunning;
118     }
119
120     public void setUseDirtyFlag(boolean usedirtyflag)
121     {
122         this.useDirtyFlag = usedirtyflag;
123     }
124
125     public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown)
126     {
127         mExpireSessionsOnShutdown = expireSessionsOnShutdown;
128     }
129
130     public void setCluster(CatalinaCluster cluster) {
131         if(log.isDebugEnabled())
132             log.debug("Cluster associated with SimpleTcpReplicationManager");
133         this.cluster = cluster;
134     }
135
136     public boolean getExpireSessionsOnShutdown()
137     {
138         return mExpireSessionsOnShutdown;
139     }
140
141     public void setPrintToScreen(boolean printtoscreen)
142     {
143         if(log.isDebugEnabled())
144             log.debug("Setting screen debug to:"+printtoscreen);
145         mPrintToScreen = printtoscreen;
146     }
147
148     public void setSynchronousReplication(boolean flag)
149     {
150         synchronousReplication=flag;
151     }
152
153     /**
154      * Override persistence since they don't go hand in hand with replication for now.
155      */

156     public void unload() throws IOException JavaDoc {
157         if ( !getDistributable() ) {
158             super.unload();
159         }
160     }
161
162     /**
163      * Creates a HTTP session.
164      * Most of the code in here is copied from the StandardManager.
165      * This is not pretty, yeah I know, but it was necessary since the
166      * StandardManager had hard coded the session instantiation to the a
167      * StandardSession, when we actually want to instantiate a ReplicatedSession<BR>
168      * If the call comes from the Tomcat servlet engine, a SessionMessage goes out to the other
169      * nodes in the cluster that this session has been created.
170      * @param notify - if set to true the other nodes in the cluster will be notified.
171      * This flag is needed so that we can create a session before we deserialize
172      * a replicated one
173      *
174      * @see ReplicatedSession
175      */

176     protected Session createSession(boolean notify, boolean setId)
177     {
178
179         //inherited from the basic manager
180
if ((getMaxActiveSessions() >= 0) &&
181            (sessions.size() >= getMaxActiveSessions()))
182             throw new IllegalStateException JavaDoc(sm.getString("standardManager.createSession.ise"));
183
184
185         Session session = new ReplicatedSession(this);
186
187         // Initialize the properties of the new session and return it
188
session.setNew(true);
189         session.setValid(true);
190         session.setCreationTime(System.currentTimeMillis());
191         session.setMaxInactiveInterval(this.maxInactiveInterval);
192         String JavaDoc sessionId = generateSessionId();
193         if ( setId ) session.setId(sessionId);
194         if ( notify && (cluster!=null) ) {
195             ((ReplicatedSession)session).setIsDirty(true);
196         }
197         return (session);
198     }//createSession
199

200     //=========================================================================
201
// OVERRIDE THESE METHODS TO IMPLEMENT THE REPLICATION
202
//=========================================================================
203

204     /**
205      * Construct and return a new session object, based on the default
206      * settings specified by this Manager's properties. The session
207      * id will be assigned by this method, and available via the getId()
208      * method of the returned session. If a new session cannot be created
209      * for any reason, return <code>null</code>.
210      *
211      * @exception IllegalStateException if a new session cannot be
212      * instantiated for any reason
213      */

214     public Session createSession()
215     {
216         //create a session and notify the other nodes in the cluster
217
Session session = createSession(getDistributable(),true);
218         add(session);
219         return session;
220     }
221
222     public void sessionInvalidated(String JavaDoc sessionId) {
223         synchronized ( invalidatedSessions ) {
224             invalidatedSessions.put(sessionId, sessionId);
225         }
226     }
227
228     public String JavaDoc[] getInvalidatedSessions() {
229         synchronized ( invalidatedSessions ) {
230             String JavaDoc[] result = new String JavaDoc[invalidatedSessions.size()];
231             invalidatedSessions.values().toArray(result);
232             return result;
233         }
234
235     }
236
237     public ClusterMessage requestCompleted(String JavaDoc sessionId)
238     {
239         if ( !getDistributable() ) {
240             log.warn("Received requestCompleted message, although this context["+
241                      getName()+"] is not distributable. Ignoring message");
242             return null;
243         }
244         //notify javagroups
245
try
246         {
247             if ( invalidatedSessions.get(sessionId) != null ) {
248                 synchronized ( invalidatedSessions ) {
249                     invalidatedSessions.remove(sessionId);
250                     SessionMessage msg = new SessionMessageImpl(name,
251                     SessionMessage.EVT_SESSION_EXPIRED,
252                     null,
253                     sessionId,
254                     sessionId);
255                 return msg;
256                 }
257             } else {
258                 ReplicatedSession session = (ReplicatedSession) findSession(
259                     sessionId);
260                 if (session != null) {
261                     //return immediately if the session is not dirty
262
if (useDirtyFlag && (!session.isDirty())) {
263                         //but before we return doing nothing,
264
//see if we should send
265
//an updated last access message so that
266
//sessions across cluster dont expire
267
long interval = session.getMaxInactiveInterval();
268                         long lastaccdist = System.currentTimeMillis() -
269                             session.getLastAccessWasDistributed();
270                         if ( ((interval*1000) / lastaccdist)< 3 ) {
271                             SessionMessage accmsg = new SessionMessageImpl(name,
272                                 SessionMessage.EVT_SESSION_ACCESSED,
273                                 null,
274                                 sessionId,
275                                 sessionId);
276                             session.setLastAccessWasDistributed(System.currentTimeMillis());
277                             return accmsg;
278                         }
279                         return null;
280                     }
281
282                     session.setIsDirty(false);
283                     if (log.isDebugEnabled()) {
284                         try {
285                             log.debug("Sending session to cluster=" + session);
286                         }
287                         catch (Exception JavaDoc ignore) {}
288                     }
289                     SessionMessage msg = new SessionMessageImpl(name,
290                         SessionMessage.EVT_SESSION_CREATED,
291                         writeSession(session),
292                         session.getId(),
293                         session.getId());
294                     return msg;
295                 } //end if
296
}//end if
297
}
298         catch (Exception JavaDoc x )
299         {
300             log.error("Unable to replicate session",x);
301         }
302         return null;
303     }
304
305     /**
306      * Serialize a session into a byte array<BR>
307      * This method simple calls the writeObjectData method on the session
308      * and returns the byte data from that call
309      * @param session - the session to be serialized
310      * @return a byte array containing the session data, null if the serialization failed
311      */

312     protected byte[] writeSession( Session session )
313     {
314         try
315         {
316             java.io.ByteArrayOutputStream JavaDoc session_data = new java.io.ByteArrayOutputStream JavaDoc();
317             java.io.ObjectOutputStream JavaDoc session_out = new java.io.ObjectOutputStream JavaDoc(session_data);
318             session_out.flush();
319             boolean hasPrincipal = session.getPrincipal() != null;
320             session_out.writeBoolean(hasPrincipal);
321             if ( hasPrincipal )
322             {
323                 session_out.writeObject(SerializablePrincipal.createPrincipal((GenericPrincipal)session.getPrincipal()));
324             }//end if
325
((ReplicatedSession)session).writeObjectData(session_out);
326             return session_data.toByteArray();
327
328         }
329         catch ( Exception JavaDoc x )
330         {
331             log.error("Failed to serialize the session!",x);
332         }
333         return null;
334     }
335
336     /**
337      * Reinstantiates a serialized session from the data passed in.
338      * This will first call createSession() so that we get a fresh instance with all
339      * the managers set and all the transient fields validated.
340      * Then it calls Session.readObjectData(byte[]) to deserialize the object
341      * @param data - a byte array containing session data
342      * @return a valid Session object, null if an error occurs
343      *
344      */

345     protected Session readSession( byte[] data, String JavaDoc sessionId )
346     {
347         try
348         {
349             java.io.ByteArrayInputStream JavaDoc session_data = new java.io.ByteArrayInputStream JavaDoc(data);
350             ReplicationStream session_in = new ReplicationStream(session_data,container.getLoader().getClassLoader());
351
352             Session session = sessionId!=null?this.findSession(sessionId):null;
353             boolean isNew = (session==null);
354             //clear the old values from the existing session
355
if ( session!=null ) {
356                 ReplicatedSession rs = (ReplicatedSession)session;
357                 rs.expire(false); //cleans up the previous values, since we are not doing removes
358
session = null;
359             }//end if
360

361             if (session==null) {
362                 session = createSession(false, false);
363                 sessions.remove(session.getId());
364             }
365             
366             
367             boolean hasPrincipal = session_in.readBoolean();
368             SerializablePrincipal p = null;
369             if ( hasPrincipal )
370                 p = (SerializablePrincipal)session_in.readObject();
371             ((ReplicatedSession)session).readObjectData(session_in);
372             if ( hasPrincipal )
373                 session.setPrincipal(p.getPrincipal(getContainer().getRealm()));
374             ((ReplicatedSession)session).setId(sessionId,isNew);
375             ReplicatedSession rsession = (ReplicatedSession)session;
376             rsession.setAccessCount(1);
377             session.setManager(this);
378             session.setValid(true);
379             rsession.setLastAccessedTime(System.currentTimeMillis());
380             rsession.setThisAccessedTime(System.currentTimeMillis());
381             ((ReplicatedSession)session).setAccessCount(0);
382             session.setNew(false);
383 // System.out.println("Session loaded id="+sessionId +
384
// " actualId="+session.getId()+
385
// " exists="+this.sessions.containsKey(sessionId)+
386
// " valid="+rsession.isValid());
387
return session;
388
389         }
390         catch ( Exception JavaDoc x )
391         {
392             log.error("Failed to deserialize the session!",x);
393         }
394         return null;
395     }
396
397     public String JavaDoc getName() {
398         return this.name;
399     }
400     /**
401      * Prepare for the beginning of active use of the public methods of this
402      * component. This method should be called after <code>configure()</code>,
403      * and before any of the public methods of the component are utilized.<BR>
404      * Starts the cluster communication channel, this will connect with the other nodes
405      * in the cluster, and request the current session state to be transferred to this node.
406      * @exception IllegalStateException if this component has already been
407      * started
408      * @exception LifecycleException if this component detects a fatal error
409      * that prevents this component from being used
410      */

411     public void start() throws LifecycleException {
412         mManagerRunning = true;
413         super.start();
414         //start the javagroups channel
415
try {
416             //the channel is already running
417
if ( mChannelStarted ) return;
418             if(log.isInfoEnabled())
419                 log.info("Starting clustering manager...:"+getName());
420             if ( cluster == null ) {
421                 log.error("Starting... no cluster associated with this context:"+getName());
422                 return;
423             }
424             cluster.addManager(getName(),this);
425
426             if (cluster.getMembers().length > 0) {
427                 Member mbr = cluster.getMembers()[0];
428                 SessionMessage msg =
429                     new SessionMessageImpl(this.getName(),
430                                        SessionMessage.EVT_GET_ALL_SESSIONS,
431                                        null,
432                                        "GET-ALL",
433                                        "GET-ALL-"+this.getName());
434                 cluster.send(msg, mbr);
435                 if(log.isWarnEnabled())
436                      log.warn("Manager["+getName()+"], requesting session state from "+mbr+
437                          ". This operation will timeout if no session state has been received within "+
438                          "60 seconds");
439                 long reqStart = System.currentTimeMillis();
440                 long reqNow = 0;
441                 boolean isTimeout=false;
442                 do {
443                     try {
444                         Thread.sleep(100);
445                     }catch ( Exception JavaDoc sleep) {}
446                     reqNow = System.currentTimeMillis();
447                     isTimeout=((reqNow-reqStart)>(1000*60));
448                 } while ( (!isStateTransferred()) && (!isTimeout));
449                 if ( isTimeout || (!isStateTransferred()) ) {
450                     log.error("Manager["+getName()+"], No session state received, timing out.");
451                 }else {
452                     if(log.isInfoEnabled())
453                         log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms.");
454                 }
455             } else {
456                 if(log.isInfoEnabled())
457                     log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group.");
458             }//end if
459
mChannelStarted = true;
460         } catch ( Exception JavaDoc x ) {
461             log.error("Unable to start SimpleTcpReplicationManager",x);
462         }
463     }
464
465     /**
466      * Gracefully terminate the active use of the public methods of this
467      * component. This method should be the last one called on a given
468      * instance of this component.<BR>
469      * This will disconnect the cluster communication channel and stop the listener thread.
470      * @exception IllegalStateException if this component has not been started
471      * @exception LifecycleException if this component detects a fatal error
472      * that needs to be reported
473      */

474     public void stop() throws LifecycleException
475     {
476         mManagerRunning = false;
477         mChannelStarted = false;
478         super.stop();
479         //stop the javagroup channel
480
try
481         {
482             this.sessions.clear();
483             cluster.removeManager(getName());
484 // mReplicationListener.stopListening();
485
// mReplicationTransmitter.stop();
486
// service.stop();
487
// service = null;
488
}
489         catch ( Exception JavaDoc x )
490         {
491             log.error("Unable to stop SimpleTcpReplicationManager",x);
492         }
493     }
494
495     public void setDistributable(boolean dist) {
496         this.distributable = dist;
497     }
498
499     public boolean getDistributable() {
500         return distributable;
501     }
502
503     /**
504      * This method is called by the received thread when a SessionMessage has
505      * been received from one of the other nodes in the cluster.
506      * @param msg - the message received
507      * @param sender - the sender of the message, this is used if we receive a
508      * EVT_GET_ALL_SESSION message, so that we only reply to
509      * the requesting node
510      */

511     protected void messageReceived( SessionMessage msg, Member sender ) {
512         try {
513             if(log.isInfoEnabled()) {
514                 log.debug("Received SessionMessage of type="+msg.getEventTypeString());
515                 log.debug("Received SessionMessage sender="+sender);
516             }
517             switch ( msg.getEventType() ) {
518                 case SessionMessage.EVT_GET_ALL_SESSIONS: {
519                     //get a list of all the session from this manager
520
Object JavaDoc[] sessions = findSessions();
521                     java.io.ByteArrayOutputStream JavaDoc bout = new java.io.ByteArrayOutputStream JavaDoc();
522                     java.io.ObjectOutputStream JavaDoc oout = new java.io.ObjectOutputStream JavaDoc(bout);
523                     oout.writeInt(sessions.length);
524                     for (int i=0; i<sessions.length; i++){
525                         ReplicatedSession ses = (ReplicatedSession)sessions[i];
526                         oout.writeUTF(ses.getId());
527                         byte[] data = writeSession(ses);
528                         oout.writeObject(data);
529                     }//for
530
//don't send a message if we don't have to
531
oout.flush();
532                     oout.close();
533                     byte[] data = bout.toByteArray();
534                     SessionMessage newmsg = new SessionMessageImpl(name,
535                         SessionMessage.EVT_ALL_SESSION_DATA,
536                         data, "SESSION-STATE","SESSION-STATE-"+getName());
537                     cluster.send(newmsg, sender);
538                     break;
539                 }
540                 case SessionMessage.EVT_ALL_SESSION_DATA: {
541                     java.io.ByteArrayInputStream JavaDoc bin =
542                         new java.io.ByteArrayInputStream JavaDoc(msg.getSession());
543                     java.io.ObjectInputStream JavaDoc oin = new java.io.ObjectInputStream JavaDoc(bin);
544                     int size = oin.readInt();
545                     for ( int i=0; i<size; i++) {
546                         String JavaDoc id = oin.readUTF();
547                         byte[] data = (byte[])oin.readObject();
548                         Session session = readSession(data,id);
549                     }//for
550
stateTransferred=true;
551                     break;
552                 }
553                 case SessionMessage.EVT_SESSION_CREATED: {
554                     Session session = this.readSession(msg.getSession(),msg.getSessionID());
555                     if ( log.isDebugEnabled() ) {
556                         log.debug("Received replicated session=" + session +
557                             " isValid=" + session.isValid());
558                     }
559                     break;
560                 }
561                 case SessionMessage.EVT_SESSION_EXPIRED: {
562                     Session session = findSession(msg.getSessionID());
563                     if ( session != null ) {
564                         session.expire();
565                         this.remove(session);
566                     }//end if
567
break;
568                 }
569                 case SessionMessage.EVT_SESSION_ACCESSED :{
570                     Session session = findSession(msg.getSessionID());
571                     if ( session != null ) {
572                         session.access();
573                         session.endAccess();
574                     }
575                     break;
576                 }
577                 default: {
578                     //we didn't recognize the message type, do nothing
579
break;
580                 }
581             }//switch
582
}
583         catch ( Exception JavaDoc x )
584         {
585             log.error("Unable to receive message through TCP channel",x);
586         }
587     }
588
589     public void messageDataReceived(ClusterMessage cmsg) {
590         try {
591             if ( cmsg instanceof SessionMessage ) {
592                 SessionMessage msg = (SessionMessage)cmsg;
593                 messageReceived(msg,
594                                 msg.getAddress() != null ? (Member) msg.getAddress() : null);
595             }
596         } catch(Throwable JavaDoc ex){
597             log.error("InMemoryReplicationManager.messageDataReceived()", ex);
598         }//catch
599
}
600
601     public boolean isStateTransferred() {
602         return stateTransferred;
603     }
604
605     public void setName(String JavaDoc name) {
606         this.name = name;
607     }
608     public boolean getNotifyListenersOnReplication() {
609         return notifyListenersOnReplication;
610     }
611     public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) {
612         this.notifyListenersOnReplication = notifyListenersOnReplication;
613     }
614 }
615
Popular Tags