1 16 17 package org.apache.catalina.cluster.session; 18 19 import java.beans.PropertyChangeEvent ; 20 import java.beans.PropertyChangeListener ; 21 import java.io.BufferedInputStream ; 22 import java.io.BufferedOutputStream ; 23 import java.io.ByteArrayInputStream ; 24 import java.io.ByteArrayOutputStream ; 25 import java.io.IOException ; 26 import java.io.ObjectInputStream ; 27 import java.io.ObjectOutputStream ; 28 import java.util.ArrayList ; 29 import java.util.Iterator ; 30 import org.apache.catalina.Container; 31 import org.apache.catalina.Context; 32 import org.apache.catalina.Lifecycle; 33 import org.apache.catalina.LifecycleException; 34 import org.apache.catalina.LifecycleListener; 35 import org.apache.catalina.Loader; 36 import org.apache.catalina.Session; 37 import org.apache.catalina.util.CustomObjectInputStream; 38 import org.apache.catalina.util.LifecycleSupport; 39 import org.apache.catalina.util.StringManager; 40 41 import org.apache.catalina.session.ManagerBase; 42 import org.apache.catalina.cluster.ClusterManager; 43 import org.apache.catalina.cluster.ClusterMessage; 44 import org.apache.catalina.cluster.Member; 45 import org.apache.catalina.cluster.CatalinaCluster; 46 47 64 65 public class DeltaManager extends ManagerBase implements Lifecycle, 66 PropertyChangeListener , ClusterManager { 67 68 70 public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 71 .getLog(DeltaManager.class); 72 73 76 protected static StringManager sm = StringManager 77 .getManager(Constants.Package); 78 79 81 84 private static final String info = "DeltaManager/1.1"; 85 86 89 protected LifecycleSupport lifecycle = new LifecycleSupport(this); 90 91 94 private int maxActiveSessions = -1; 95 96 99 protected static String managerName = "DeltaManager"; 100 101 protected String name = null; 102 103 106 private boolean started = false; 107 108 int rejectedSessions = 0; 109 110 int expiredSessions = 0; 111 112 long processingTime = 0; 113 114 private CatalinaCluster cluster = null; 115 116 private boolean stateTransferred; 117 118 private boolean useDirtyFlag; 119 120 private boolean expireSessionsOnShutdown; 121 122 private boolean printToScreen; 123 124 private boolean notifyListenersOnReplication = false; 125 126 public DeltaManager() { 128 super(); 129 } 130 131 133 141 public void setContainer(Container container) { 142 143 if ((this.container != null) && (this.container instanceof Context)) 145 ((Context) this.container).removePropertyChangeListener(this); 146 147 super.setContainer(container); 149 150 if ((this.container != null) && (this.container instanceof Context)) { 152 setMaxInactiveInterval(((Context) this.container) 153 .getSessionTimeout() * 60); 154 ((Context) this.container).addPropertyChangeListener(this); 155 } 156 157 } 158 159 164 public String getInfo() { 165 166 return (info); 167 168 } 169 170 173 public int getMaxActiveSessions() { 174 175 return (this.maxActiveSessions); 176 177 } 178 179 184 public int getRejectedSessions() { 185 return rejectedSessions; 186 } 187 188 public void setRejectedSessions(int rejectedSessions) { 189 this.rejectedSessions = rejectedSessions; 190 } 191 192 198 public void setMaxActiveSessions(int max) { 199 200 int oldMaxActiveSessions = this.maxActiveSessions; 201 this.maxActiveSessions = max; 202 support.firePropertyChange("maxActiveSessions", new Integer ( 203 oldMaxActiveSessions), new Integer (this.maxActiveSessions)); 204 205 } 206 207 210 public String getName() { 211 212 return (name); 213 214 } 215 216 218 237 public Session createSession(String sessionId) { 238 return createSession(sessionId, true); 239 } 240 241 248 public Session createSession(String sessionId, boolean distribute) { 249 250 if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) { 251 rejectedSessions++; 252 throw new IllegalStateException (sm 253 .getString("deltaManager.createSession.ise")); 254 } 255 256 DeltaSession session = getNewDeltaSession(); 258 if (sessionId == null) { 259 sessionId = generateSessionId(); 260 synchronized (sessions) { 261 while (sessions.get(sessionId) != null) { duplicates++; 264 sessionId = generateSessionId(); 265 } 266 } 267 } 268 269 session.setNew(true); 270 session.setValid(true); 271 session.setCreationTime(System.currentTimeMillis()); 272 session.setMaxInactiveInterval(this.maxInactiveInterval); 273 session.setId(sessionId); 274 session.resetDeltaRequest(); 275 277 sessionCounter++; 278 279 if (distribute) { 280 SessionMessage msg = new SessionMessageImpl(getName(), 281 SessionMessage.EVT_SESSION_CREATED, null, sessionId, 282 sessionId + System.currentTimeMillis()); 283 if (log.isDebugEnabled()) 284 log.debug(sm.getString("deltaManager.sendMessage.newSession", 285 name, sessionId)); 286 cluster.send(msg); 287 session.resetDeltaRequest(); 288 } 289 if (log.isDebugEnabled()) 290 log.debug(sm.getString("deltaManager.createSession.newSession", 291 sessionId, new Integer (sessions.size()))); 292 293 return (session); 294 295 } 296 297 300 protected DeltaSession getNewDeltaSession() { 301 return new DeltaSession(this); 302 } 303 304 private DeltaRequest loadDeltaRequest(DeltaSession session, byte[] data) 305 throws ClassNotFoundException , IOException { 306 ByteArrayInputStream fis = null; 307 ReplicationStream ois = null; 308 Loader loader = null; 309 ClassLoader classLoader = null; 310 if (container != null) 314 loader = container.getLoader(); 315 if (loader != null) 316 classLoader = loader.getClassLoader(); 317 else 318 classLoader = Thread.currentThread().getContextClassLoader(); 319 fis = new ByteArrayInputStream (data); 321 ois = new ReplicationStream(fis, classLoader); 322 session.getDeltaRequest().readExternal(ois); 323 ois.close(); 324 return session.getDeltaRequest(); 325 } 326 327 private byte[] unloadDeltaRequest(DeltaRequest deltaRequest) 328 throws IOException { 329 ByteArrayOutputStream bos = new ByteArrayOutputStream (); 330 ObjectOutputStream oos = new ObjectOutputStream (bos); 331 deltaRequest.writeExternal(oos); 332 oos.flush(); 333 oos.close(); 334 return bos.toByteArray(); 335 } 336 337 347 private void doLoad(byte[] data) throws ClassNotFoundException , IOException { 348 349 ByteArrayInputStream fis = null; 353 ObjectInputStream ois = null; 354 Loader loader = null; 355 ClassLoader classLoader = null; 356 ClassLoader originalLoader = Thread.currentThread() 357 .getContextClassLoader(); 358 try { 359 360 try { 361 fis = new ByteArrayInputStream (data); 362 BufferedInputStream bis = new BufferedInputStream (fis); 363 if (container != null) 364 loader = container.getLoader(); 365 if (loader != null) 366 classLoader = loader.getClassLoader(); 367 if (classLoader != null) { 368 if (log.isTraceEnabled()) 369 log.trace(sm.getString( 370 "deltaManager.loading.withContextClassLoader", 371 getName())); 372 ois = new CustomObjectInputStream(bis, classLoader); 373 Thread.currentThread().setContextClassLoader(classLoader); 374 } else { 375 if (log.isTraceEnabled()) 376 log.trace(sm.getString( 377 "deltaManager.loading.withoutClassLoader", 378 getName())); 379 ois = new ObjectInputStream (bis); 380 } 381 } catch (IOException e) { 382 log.error(sm.getString("deltaManager.loading.ioe", e), e); 383 if (ois != null) { 384 try { 385 ois.close(); 386 } catch (IOException f) { 387 ; 388 } 389 ois = null; 390 } 391 throw e; 392 } 393 synchronized (sessions) { 395 try { 396 Integer count = (Integer ) ois.readObject(); 397 int n = count.intValue(); 398 for (int i = 0; i < n; i++) { 399 DeltaSession session = getNewDeltaSession(); 400 session.readObjectData(ois); 401 session.setManager(this); 402 session.setValid(true); 403 session.setPrimarySession(false); 404 session.access(); 409 session.setAccessCount(0); 412 session.resetDeltaRequest(); 413 sessions.put(session.getId(), session); 414 } 415 } catch (ClassNotFoundException e) { 416 log.error(sm.getString("deltaManager.loading.cnfe", e), e); 417 if (ois != null) { 418 try { 419 ois.close(); 420 } catch (IOException f) { 421 ; 422 } 423 ois = null; 424 } 425 throw e; 426 } catch (IOException e) { 427 log.error(sm.getString("deltaManager.loading.ioe", e), e); 428 if (ois != null) { 429 try { 430 ois.close(); 431 } catch (IOException f) { 432 ; 433 } 434 ois = null; 435 } 436 throw e; 437 } finally { 438 try { 440 if (ois != null) 441 ois.close(); 442 } catch (IOException f) { 443 } 445 } 446 } 447 } finally { 448 if (originalLoader != null) 449 Thread.currentThread().setContextClassLoader(originalLoader); 450 } 451 452 } 453 454 462 private byte[] doUnload() throws IOException { 463 464 ByteArrayOutputStream fos = null; 466 ObjectOutputStream oos = null; 467 try { 468 fos = new ByteArrayOutputStream (); 469 oos = new ObjectOutputStream (new BufferedOutputStream (fos)); 470 } catch (IOException e) { 471 log.error(sm.getString("deltaManager.unloading.ioe", e), e); 472 if (oos != null) { 473 try { 474 oos.close(); 475 } catch (IOException f) { 476 ; 477 } 478 oos = null; 479 } 480 throw e; 481 } 482 483 ArrayList list = new ArrayList (); 485 synchronized (sessions) { 486 try { 487 oos.writeObject(new Integer (sessions.size())); 488 Iterator elements = sessions.values().iterator(); 489 while (elements.hasNext()) { 490 DeltaSession session = (DeltaSession) elements.next(); 491 list.add(session); 492 session.writeObjectData(oos); 493 } 494 oos.flush(); 495 oos.close(); 496 oos = null; 497 } catch (IOException e) { 498 log.error(sm.getString("deltaManager.unloading.ioe", e), e); 499 if (oos != null) { 500 try { 501 oos.close(); 502 } catch (IOException f) { 503 ; 504 } 505 oos = null; 506 } 507 throw e; 508 } 509 } 510 511 return fos.toByteArray(); 513 } 514 515 517 523 public void addLifecycleListener(LifecycleListener listener) { 524 525 lifecycle.addLifecycleListener(listener); 526 527 } 528 529 533 public LifecycleListener[] findLifecycleListeners() { 534 535 return lifecycle.findLifecycleListeners(); 536 537 } 538 539 545 public void removeLifecycleListener(LifecycleListener listener) { 546 547 lifecycle.removeLifecycleListener(listener); 548 549 } 550 551 560 public void start() throws LifecycleException { 561 if (!initialized) 562 init(); 563 564 if (started) { 566 return; 567 } 568 started = true; 569 lifecycle.fireLifecycleEvent(START_EVENT, null); 570 571 String dummy = generateSessionId(); 573 574 try { 576 if (cluster == null) { 578 log.error(sm.getString("deltaManager.noCluster", getName())); 579 return; 580 } 581 if (log.isInfoEnabled()) 582 log.info(sm 583 .getString("deltaManager.startClustering", getName())); 584 getCluster().addManager(getName(), this); 587 588 if (cluster.getMembers().length > 0) { 589 Member mbr = cluster.getMembers()[0]; 590 SessionMessage msg = new SessionMessageImpl(this.getName(), 591 SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL", 592 "GET-ALL-" + getName()); 593 603 cluster.send(msg, mbr); 605 if (log.isWarnEnabled()) 606 log.warn(sm.getString("deltaManager.waitForSessionState", 607 getName(), mbr)); 608 long reqStart = System.currentTimeMillis(); 609 long reqNow = 0; 610 boolean isTimeout = false; 611 do { 612 try { 613 Thread.sleep(100); 614 } catch (Exception sleep) { 615 } 616 reqNow = System.currentTimeMillis(); 617 isTimeout = ((reqNow - reqStart) > (1000 * 60)); 618 } while ((!getStateTransferred()) && (!isTimeout)); 619 if (isTimeout || (!getStateTransferred())) { 620 log.error(sm.getString("deltaManager.noSessionState", 621 getName())); 622 } else { 623 if (log.isInfoEnabled()) 624 log.info(sm.getString("deltaManager.sessionReceived", 625 getName(), new Long (reqNow - reqStart))); 626 } 627 } else { 628 if (log.isInfoEnabled()) 629 log.info(sm.getString("deltaManager.noMembers", getName())); 630 } 632 } catch (Throwable t) { 633 log.error(sm.getString("deltaManager.managerLoad"), t); 634 } 635 } 636 637 646 public void stop() throws LifecycleException { 647 648 if (log.isDebugEnabled()) 649 log.debug(sm.getString("deltaManager.stopped", getName())); 650 651 getCluster().removeManager(getName()); 652 653 if (!started) 655 throw new LifecycleException(sm 656 .getString("deltaManager.notStarted")); 657 lifecycle.fireLifecycleEvent(STOP_EVENT, null); 658 started = false; 659 660 if (log.isInfoEnabled()) 662 log.info(sm.getString("deltaManager.expireSessions", getName())); 663 Session sessions[] = findSessions(); 664 for (int i = 0; i < sessions.length; i++) { 665 DeltaSession session = (DeltaSession) sessions[i]; 666 if (!session.isValid()) 667 continue; 668 try { 669 session.expire(true, this.getExpireSessionsOnShutdown()); 670 } catch (Throwable t) { 671 ; 672 } } 675 this.random = null; 677 678 if (initialized) { 679 destroy(); 680 } 681 } 682 683 685 691 public void propertyChange(PropertyChangeEvent event) { 692 693 if (!(event.getSource() instanceof Context)) 695 return; 696 Context context = (Context) event.getSource(); 697 698 if (event.getPropertyName().equals("sessionTimeout")) { 700 try { 701 setMaxInactiveInterval(((Integer ) event.getNewValue()) 702 .intValue() * 60); 703 } catch (NumberFormatException e) { 704 log.error(sm.getString("deltaManager.sessionTimeout", event 705 .getNewValue())); 706 } 707 } 708 709 } 710 711 714 721 public void messageDataReceived(ClusterMessage cmsg) { 722 if (cmsg instanceof SessionMessage) { 723 SessionMessage msg = (SessionMessage) cmsg; 724 messageReceived(msg, msg.getAddress() != null ? (Member) msg 725 .getAddress() : null); 726 } 727 } 728 729 740 public ClusterMessage requestCompleted(String sessionId) { 741 try { 742 DeltaSession session = (DeltaSession) findSession(sessionId); 743 DeltaRequest deltaRequest = session.getDeltaRequest(); 744 SessionMessage msg = null; 745 if (deltaRequest.getSize() > 0) { 746 747 byte[] data = unloadDeltaRequest(deltaRequest); 748 msg = new SessionMessageImpl(name, 749 SessionMessage.EVT_SESSION_DELTA, data, sessionId, 750 sessionId + System.currentTimeMillis()); 751 session.resetDeltaRequest(); 752 if (log.isDebugEnabled()) { 753 log.debug(sm.getString( 754 "deltaManager.createMessage.delta", 755 getName(), sessionId)); 756 } 757 } else if (!session.isPrimarySession()) { 758 msg = new SessionMessageImpl(getName(), 759 SessionMessage.EVT_SESSION_ACCESSED, null, sessionId, 760 sessionId + System.currentTimeMillis()); 761 if (log.isDebugEnabled()) { 762 log.debug(sm.getString( 763 "deltaManager.createMessage.accessChangePrimary", 764 getName(), sessionId)); 765 } 766 } 767 session.setPrimarySession(true); 768 if ((msg == null)) { 770 long replDelta = System.currentTimeMillis() 771 - session.getLastTimeReplicated(); 772 if (replDelta > (getMaxInactiveInterval() * 1000)) { 773 msg = new SessionMessageImpl(getName(), 774 SessionMessage.EVT_SESSION_ACCESSED, null, 775 sessionId, sessionId + System.currentTimeMillis()); 776 if (log.isDebugEnabled()) { 777 log.debug(sm.getString( 778 "deltaManager.createMessage.access", getName(), 779 sessionId)); 780 } 781 } 782 783 } 784 785 if (msg != null) 787 session.setLastTimeReplicated(System.currentTimeMillis()); 788 return msg; 789 } catch (IOException x) { 790 log.error(sm.getString( 791 "deltaManager.createMessage.unableCreateDeltaRequest", 792 sessionId), x); 793 return null; 794 } 795 796 } 797 798 804 protected void sessionExpired(String id) { 805 SessionMessage msg = new SessionMessageImpl(getName(), 806 SessionMessage.EVT_SESSION_EXPIRED, null, id, id 807 + "-EXPIRED-MSG"); 808 if (log.isDebugEnabled()) 809 log.debug(sm.getString("deltaManager.createMessage.expire", 810 getName(), id)); 811 cluster.send(msg); 812 } 813 814 821 public String [] getInvalidatedSessions() { 822 return new String [0]; 823 } 824 825 836 protected void messageReceived(SessionMessage msg, Member sender) { 837 try { 838 if (log.isDebugEnabled()) 839 log.debug(sm.getString("deltaManager.receiveMessage.eventType", 840 getName(), msg.getEventTypeString(), sender)); 841 switch (msg.getEventType()) { 842 case SessionMessage.EVT_GET_ALL_SESSIONS: { 843 if (log.isDebugEnabled()) 845 log.debug(sm.getString( 846 "deltaManager.receiveMessage.unloadingBegin", 847 getName())); 848 byte[] data = doUnload(); 849 if (log.isDebugEnabled()) 850 log.debug(sm.getString( 851 "deltaManager.receiveMessage.unloadingAfter", 852 getName())); 853 SessionMessage newmsg = new SessionMessageImpl(name, 854 SessionMessage.EVT_ALL_SESSION_DATA, data, 855 "SESSION-STATE", "SESSION-STATE-" + getName()); 856 if (log.isDebugEnabled()) 857 log.debug(sm.getString( 858 "deltaManager.createMessage.allSessionData", 859 getName())); 860 cluster.send(newmsg, sender); 861 break; 862 } 863 case SessionMessage.EVT_ALL_SESSION_DATA: { 864 if (log.isDebugEnabled()) 865 log.debug(sm.getString( 866 "deltaManager.receiveMessage.allSessionDataBegin", 867 getName())); 868 byte[] data = msg.getSession(); 869 doLoad(data); 870 if (log.isDebugEnabled()) 871 log.debug(sm.getString( 872 "deltaManager.receiveMessage.allSessionDataAfter", 873 getName())); 874 stateTransferred = true; 875 break; 876 } 877 case SessionMessage.EVT_SESSION_CREATED: { 878 if (log.isDebugEnabled()) 879 log.debug(sm.getString( 880 "deltaManager.receiveMessage.createNewSession", 881 getName(), msg.getSessionID())); 882 DeltaSession session = (DeltaSession) createSession(msg 883 .getSessionID(), false); 884 session.setId(msg.getSessionID()); 886 session.setNew(false); 887 session.setPrimarySession(false); 888 session.resetDeltaRequest(); 890 break; 891 } 892 case SessionMessage.EVT_SESSION_EXPIRED: { 893 DeltaSession session = (DeltaSession) findSession(msg 894 .getSessionID()); 895 if (session != null) { 896 if (log.isDebugEnabled()) 897 log.debug(sm.getString( 898 "deltaManager.receiveMessage.expired", 899 getName(), msg.getSessionID())); 900 session.expire(true, false); 902 } break; 904 } 905 case SessionMessage.EVT_SESSION_ACCESSED: { 906 DeltaSession session = (DeltaSession) findSession(msg 907 .getSessionID()); 908 if (session != null) { 909 if (log.isDebugEnabled()) 910 log.debug(sm.getString( 911 "deltaManager.receiveMessage.accessed", 912 getName(), msg.getSessionID())); 913 session.access(); 914 session.setPrimarySession(false); 915 session.endAccess(); 916 } 917 break; 918 } 919 case SessionMessage.EVT_SESSION_DELTA: { 920 byte[] delta = msg.getSession(); 921 DeltaSession session = (DeltaSession) findSession(msg 922 .getSessionID()); 923 if (session != null) { 924 log.debug(sm.getString("deltaManager.receiveMessage.delta", 925 getName(), msg.getSessionID())); 926 DeltaRequest dreq = loadDeltaRequest(session, delta); 927 dreq.execute(session, notifyListenersOnReplication); 928 session.setPrimarySession(false); 929 } 930 931 break; 932 } 933 default: { 934 break; 936 } 937 } } catch (Exception x) { 939 log.error(sm.getString("deltaManager.receiveMessage.error", 940 getName()), x); 941 } 942 } 943 944 946 public boolean getStateTransferred() { 947 return stateTransferred; 948 } 949 950 public void setStateTransferred(boolean stateTransferred) { 951 this.stateTransferred = stateTransferred; 952 } 953 954 public CatalinaCluster getCluster() { 955 return cluster; 956 } 957 958 public void setCluster(CatalinaCluster cluster) { 959 this.cluster = cluster; 960 } 961 962 public void load() { 963 964 } 965 966 public void unload() { 967 968 } 969 970 public boolean getUseDirtyFlag() { 971 return useDirtyFlag; 972 } 973 974 public void setUseDirtyFlag(boolean useDirtyFlag) { 975 this.useDirtyFlag = useDirtyFlag; 976 } 977 978 public boolean getExpireSessionsOnShutdown() { 979 return expireSessionsOnShutdown; 980 } 981 982 public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) { 983 this.expireSessionsOnShutdown = expireSessionsOnShutdown; 984 } 985 986 public boolean getPrintToScreen() { 987 return printToScreen; 988 } 989 990 public void setPrintToScreen(boolean printToScreen) { 991 this.printToScreen = printToScreen; 992 } 993 994 public void setName(String name) { 995 this.name = name; 996 } 997 998 public boolean getNotifyListenersOnReplication() { 999 return notifyListenersOnReplication; 1000 } 1001 1002 public void setNotifyListenersOnReplication( 1003 boolean notifyListenersOnReplication) { 1004 this.notifyListenersOnReplication = notifyListenersOnReplication; 1005 } 1006 1007} | Popular Tags |