1 16 package org.apache.catalina.cluster.session; 17 18 import java.io.IOException ; 19 20 import org.apache.catalina.LifecycleException; 21 import org.apache.catalina.Session; 22 import org.apache.catalina.cluster.CatalinaCluster; 23 import org.apache.catalina.cluster.ClusterMessage; 24 import org.apache.catalina.cluster.Member; 25 import org.apache.catalina.realm.GenericPrincipal; 26 27 57 public class SimpleTcpReplicationManager extends org.apache.catalina.session.StandardManager 58 implements org.apache.catalina.cluster.ClusterManager 59 { 60 public static org.apache.commons.logging.Log log = 61 org.apache.commons.logging.LogFactory.getLog( SimpleTcpReplicationManager.class ); 62 63 protected String mChannelConfig = null; 65 66 protected String mGroupName = "TomcatReplication"; 68 69 protected boolean mChannelStarted = false; 71 72 protected boolean mPrintToScreen = true; 74 75 76 77 protected boolean mManagerRunning = false; 78 79 83 protected boolean synchronousReplication=true; 84 85 86 protected boolean mExpireSessionsOnShutdown = true; 87 88 protected boolean useDirtyFlag = false; 89 90 protected String name; 91 92 protected boolean distributable = true; 93 94 protected CatalinaCluster cluster; 95 96 protected java.util.HashMap invalidatedSessions = new java.util.HashMap (); 97 98 102 protected boolean stateTransferred = false; 103 private boolean notifyListenersOnReplication; 104 105 109 public SimpleTcpReplicationManager() 110 { 111 super(); 112 } 113 114 115 public boolean isManagerRunning() 116 { 117 return mManagerRunning; 118 } 119 120 public void setUseDirtyFlag(boolean usedirtyflag) 121 { 122 this.useDirtyFlag = usedirtyflag; 123 } 124 125 public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) 126 { 127 mExpireSessionsOnShutdown = expireSessionsOnShutdown; 128 } 129 130 public void setCluster(CatalinaCluster cluster) { 131 if(log.isDebugEnabled()) 132 log.debug("Cluster associated with SimpleTcpReplicationManager"); 133 this.cluster = cluster; 134 } 135 136 public boolean getExpireSessionsOnShutdown() 137 { 138 return mExpireSessionsOnShutdown; 139 } 140 141 public void setPrintToScreen(boolean printtoscreen) 142 { 143 if(log.isDebugEnabled()) 144 log.debug("Setting screen debug to:"+printtoscreen); 145 mPrintToScreen = printtoscreen; 146 } 147 148 public void setSynchronousReplication(boolean flag) 149 { 150 synchronousReplication=flag; 151 } 152 153 156 public void unload() throws IOException { 157 if ( !getDistributable() ) { 158 super.unload(); 159 } 160 } 161 162 176 protected Session createSession(boolean notify, boolean setId) 177 { 178 179 if ((getMaxActiveSessions() >= 0) && 181 (sessions.size() >= getMaxActiveSessions())) 182 throw new IllegalStateException (sm.getString("standardManager.createSession.ise")); 183 184 185 Session session = new ReplicatedSession(this); 186 187 session.setNew(true); 189 session.setValid(true); 190 session.setCreationTime(System.currentTimeMillis()); 191 session.setMaxInactiveInterval(this.maxInactiveInterval); 192 String sessionId = generateSessionId(); 193 if ( setId ) session.setId(sessionId); 194 if ( notify && (cluster!=null) ) { 195 ((ReplicatedSession)session).setIsDirty(true); 196 } 197 return (session); 198 } 200 204 214 public Session createSession() 215 { 216 Session session = createSession(getDistributable(),true); 218 add(session); 219 return session; 220 } 221 222 public void sessionInvalidated(String sessionId) { 223 synchronized ( invalidatedSessions ) { 224 invalidatedSessions.put(sessionId, sessionId); 225 } 226 } 227 228 public String [] getInvalidatedSessions() { 229 synchronized ( invalidatedSessions ) { 230 String [] result = new String [invalidatedSessions.size()]; 231 invalidatedSessions.values().toArray(result); 232 return result; 233 } 234 235 } 236 237 public ClusterMessage requestCompleted(String sessionId) 238 { 239 if ( !getDistributable() ) { 240 log.warn("Received requestCompleted message, although this context["+ 241 getName()+"] is not distributable. Ignoring message"); 242 return null; 243 } 244 try 246 { 247 if ( invalidatedSessions.get(sessionId) != null ) { 248 synchronized ( invalidatedSessions ) { 249 invalidatedSessions.remove(sessionId); 250 SessionMessage msg = new SessionMessageImpl(name, 251 SessionMessage.EVT_SESSION_EXPIRED, 252 null, 253 sessionId, 254 sessionId); 255 return msg; 256 } 257 } else { 258 ReplicatedSession session = (ReplicatedSession) findSession( 259 sessionId); 260 if (session != null) { 261 if (useDirtyFlag && (!session.isDirty())) { 263 long interval = session.getMaxInactiveInterval(); 268 long lastaccdist = System.currentTimeMillis() - 269 session.getLastAccessWasDistributed(); 270 if ( ((interval*1000) / lastaccdist)< 3 ) { 271 SessionMessage accmsg = new SessionMessageImpl(name, 272 SessionMessage.EVT_SESSION_ACCESSED, 273 null, 274 sessionId, 275 sessionId); 276 session.setLastAccessWasDistributed(System.currentTimeMillis()); 277 return accmsg; 278 } 279 return null; 280 } 281 282 session.setIsDirty(false); 283 if (log.isDebugEnabled()) { 284 try { 285 log.debug("Sending session to cluster=" + session); 286 } 287 catch (Exception ignore) {} 288 } 289 SessionMessage msg = new SessionMessageImpl(name, 290 SessionMessage.EVT_SESSION_CREATED, 291 writeSession(session), 292 session.getId(), 293 session.getId()); 294 return msg; 295 } } } 298 catch (Exception x ) 299 { 300 log.error("Unable to replicate session",x); 301 } 302 return null; 303 } 304 305 312 protected byte[] writeSession( Session session ) 313 { 314 try 315 { 316 java.io.ByteArrayOutputStream session_data = new java.io.ByteArrayOutputStream (); 317 java.io.ObjectOutputStream session_out = new java.io.ObjectOutputStream (session_data); 318 session_out.flush(); 319 boolean hasPrincipal = session.getPrincipal() != null; 320 session_out.writeBoolean(hasPrincipal); 321 if ( hasPrincipal ) 322 { 323 session_out.writeObject(SerializablePrincipal.createPrincipal((GenericPrincipal)session.getPrincipal())); 324 } ((ReplicatedSession)session).writeObjectData(session_out); 326 return session_data.toByteArray(); 327 328 } 329 catch ( Exception x ) 330 { 331 log.error("Failed to serialize the session!",x); 332 } 333 return null; 334 } 335 336 345 protected Session readSession( byte[] data, String sessionId ) 346 { 347 try 348 { 349 java.io.ByteArrayInputStream session_data = new java.io.ByteArrayInputStream (data); 350 ReplicationStream session_in = new ReplicationStream(session_data,container.getLoader().getClassLoader()); 351 352 Session session = sessionId!=null?this.findSession(sessionId):null; 353 boolean isNew = (session==null); 354 if ( session!=null ) { 356 ReplicatedSession rs = (ReplicatedSession)session; 357 rs.expire(false); session = null; 359 } 361 if (session==null) { 362 session = createSession(false, false); 363 sessions.remove(session.getId()); 364 } 365 366 367 boolean hasPrincipal = session_in.readBoolean(); 368 SerializablePrincipal p = null; 369 if ( hasPrincipal ) 370 p = (SerializablePrincipal)session_in.readObject(); 371 ((ReplicatedSession)session).readObjectData(session_in); 372 if ( hasPrincipal ) 373 session.setPrincipal(p.getPrincipal(getContainer().getRealm())); 374 ((ReplicatedSession)session).setId(sessionId,isNew); 375 ReplicatedSession rsession = (ReplicatedSession)session; 376 rsession.setAccessCount(1); 377 session.setManager(this); 378 session.setValid(true); 379 rsession.setLastAccessedTime(System.currentTimeMillis()); 380 rsession.setThisAccessedTime(System.currentTimeMillis()); 381 ((ReplicatedSession)session).setAccessCount(0); 382 session.setNew(false); 383 return session; 388 389 } 390 catch ( Exception x ) 391 { 392 log.error("Failed to deserialize the session!",x); 393 } 394 return null; 395 } 396 397 public String getName() { 398 return this.name; 399 } 400 411 public void start() throws LifecycleException { 412 mManagerRunning = true; 413 super.start(); 414 try { 416 if ( mChannelStarted ) return; 418 if(log.isInfoEnabled()) 419 log.info("Starting clustering manager...:"+getName()); 420 if ( cluster == null ) { 421 log.error("Starting... no cluster associated with this context:"+getName()); 422 return; 423 } 424 cluster.addManager(getName(),this); 425 426 if (cluster.getMembers().length > 0) { 427 Member mbr = cluster.getMembers()[0]; 428 SessionMessage msg = 429 new SessionMessageImpl(this.getName(), 430 SessionMessage.EVT_GET_ALL_SESSIONS, 431 null, 432 "GET-ALL", 433 "GET-ALL-"+this.getName()); 434 cluster.send(msg, mbr); 435 if(log.isWarnEnabled()) 436 log.warn("Manager["+getName()+"], requesting session state from "+mbr+ 437 ". This operation will timeout if no session state has been received within "+ 438 "60 seconds"); 439 long reqStart = System.currentTimeMillis(); 440 long reqNow = 0; 441 boolean isTimeout=false; 442 do { 443 try { 444 Thread.sleep(100); 445 }catch ( Exception sleep) {} 446 reqNow = System.currentTimeMillis(); 447 isTimeout=((reqNow-reqStart)>(1000*60)); 448 } while ( (!isStateTransferred()) && (!isTimeout)); 449 if ( isTimeout || (!isStateTransferred()) ) { 450 log.error("Manager["+getName()+"], No session state received, timing out."); 451 }else { 452 if(log.isInfoEnabled()) 453 log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms."); 454 } 455 } else { 456 if(log.isInfoEnabled()) 457 log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group."); 458 } mChannelStarted = true; 460 } catch ( Exception x ) { 461 log.error("Unable to start SimpleTcpReplicationManager",x); 462 } 463 } 464 465 474 public void stop() throws LifecycleException 475 { 476 mManagerRunning = false; 477 mChannelStarted = false; 478 super.stop(); 479 try 481 { 482 this.sessions.clear(); 483 cluster.removeManager(getName()); 484 } 489 catch ( Exception x ) 490 { 491 log.error("Unable to stop SimpleTcpReplicationManager",x); 492 } 493 } 494 495 public void setDistributable(boolean dist) { 496 this.distributable = dist; 497 } 498 499 public boolean getDistributable() { 500 return distributable; 501 } 502 503 511 protected void messageReceived( SessionMessage msg, Member sender ) { 512 try { 513 if(log.isInfoEnabled()) { 514 log.debug("Received SessionMessage of type="+msg.getEventTypeString()); 515 log.debug("Received SessionMessage sender="+sender); 516 } 517 switch ( msg.getEventType() ) { 518 case SessionMessage.EVT_GET_ALL_SESSIONS: { 519 Object [] sessions = findSessions(); 521 java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream (); 522 java.io.ObjectOutputStream oout = new java.io.ObjectOutputStream (bout); 523 oout.writeInt(sessions.length); 524 for (int i=0; i<sessions.length; i++){ 525 ReplicatedSession ses = (ReplicatedSession)sessions[i]; 526 oout.writeUTF(ses.getId()); 527 byte[] data = writeSession(ses); 528 oout.writeObject(data); 529 } oout.flush(); 532 oout.close(); 533 byte[] data = bout.toByteArray(); 534 SessionMessage newmsg = new SessionMessageImpl(name, 535 SessionMessage.EVT_ALL_SESSION_DATA, 536 data, "SESSION-STATE","SESSION-STATE-"+getName()); 537 cluster.send(newmsg, sender); 538 break; 539 } 540 case SessionMessage.EVT_ALL_SESSION_DATA: { 541 java.io.ByteArrayInputStream bin = 542 new java.io.ByteArrayInputStream (msg.getSession()); 543 java.io.ObjectInputStream oin = new java.io.ObjectInputStream (bin); 544 int size = oin.readInt(); 545 for ( int i=0; i<size; i++) { 546 String id = oin.readUTF(); 547 byte[] data = (byte[])oin.readObject(); 548 Session session = readSession(data,id); 549 } stateTransferred=true; 551 break; 552 } 553 case SessionMessage.EVT_SESSION_CREATED: { 554 Session session = this.readSession(msg.getSession(),msg.getSessionID()); 555 if ( log.isDebugEnabled() ) { 556 log.debug("Received replicated session=" + session + 557 " isValid=" + session.isValid()); 558 } 559 break; 560 } 561 case SessionMessage.EVT_SESSION_EXPIRED: { 562 Session session = findSession(msg.getSessionID()); 563 if ( session != null ) { 564 session.expire(); 565 this.remove(session); 566 } break; 568 } 569 case SessionMessage.EVT_SESSION_ACCESSED :{ 570 Session session = findSession(msg.getSessionID()); 571 if ( session != null ) { 572 session.access(); 573 session.endAccess(); 574 } 575 break; 576 } 577 default: { 578 break; 580 } 581 } } 583 catch ( Exception x ) 584 { 585 log.error("Unable to receive message through TCP channel",x); 586 } 587 } 588 589 public void messageDataReceived(ClusterMessage cmsg) { 590 try { 591 if ( cmsg instanceof SessionMessage ) { 592 SessionMessage msg = (SessionMessage)cmsg; 593 messageReceived(msg, 594 msg.getAddress() != null ? (Member) msg.getAddress() : null); 595 } 596 } catch(Throwable ex){ 597 log.error("InMemoryReplicationManager.messageDataReceived()", ex); 598 } } 600 601 public boolean isStateTransferred() { 602 return stateTransferred; 603 } 604 605 public void setName(String name) { 606 this.name = name; 607 } 608 public boolean getNotifyListenersOnReplication() { 609 return notifyListenersOnReplication; 610 } 611 public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) { 612 this.notifyListenersOnReplication = notifyListenersOnReplication; 613 } 614 } 615 | Popular Tags |