1 22 package org.jboss.web.tomcat.tc6.session; 23 24 import java.util.Collection ; 25 import java.util.Date ; 26 import java.util.HashMap ; 27 import java.util.HashSet ; 28 import java.util.Iterator ; 29 import java.util.Map ; 30 import java.util.Set ; 31 import java.util.Map.Entry; 32 33 import javax.management.MBeanServer ; 34 import javax.management.MalformedObjectNameException ; 35 import javax.management.ObjectName ; 36 import javax.transaction.Status ; 37 import javax.transaction.TransactionManager ; 38 import javax.transaction.RollbackException ; 39 40 import org.apache.catalina.Context; 41 import org.apache.catalina.Host; 42 import org.apache.catalina.LifecycleException; 43 import org.apache.catalina.Session; 44 import org.apache.catalina.Valve; 45 import org.apache.catalina.core.ContainerBase; 46 import org.jboss.cache.CacheException; 47 import org.jboss.metadata.WebMetaData; 48 import org.jboss.mx.util.MBeanServerLocator; 49 50 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 51 52 61 public class JBossCacheManager 62 extends JBossManager 63 implements JBossCacheManagerMBean 64 { 65 66 69 static final String info_ = "JBossCacheManager/1.0"; 70 71 73 76 private TransactionManager tm; 77 78 81 private JBossCacheService proxy_; 82 83 86 private Map unloadedSessions_ = new ConcurrentHashMap(); 87 88 89 private String cacheObjectNameString_ = "jboss.cache:service=TomcatClusteringCache"; 90 91 94 private boolean useJK_ = false; 95 96 97 private boolean embedded_ = false; 98 99 100 private MBeanServer mserver_ = null; 101 102 103 private String snapshotMode_ = null; 104 105 106 private int snapshotInterval_ = 0; 107 108 109 private String replTriggerString_ = null; 110 111 112 private String replGranularityString_ = null; 113 114 119 private Boolean replicationFieldBatchMode_; 120 121 122 private ClassLoader tcl_; 123 124 127 private SnapshotManager snapshotManager_; 128 129 131 public JBossCacheManager() 132 { 133 super(); 134 } 135 136 143 public void init(String name, WebMetaData webMetaData, 144 boolean useJK, boolean useLocalCache) 145 throws ClusteringNotSupportedException 146 { 147 super.init(name, webMetaData, useJK, useLocalCache); 148 this.useJK_ = useJK; 149 this.replicationFieldBatchMode_ = 150 webMetaData.getReplicationFieldBatchMode() ? Boolean.TRUE : Boolean.FALSE; 151 152 proxy_ = new JBossCacheService(cacheObjectNameString_); 153 154 validateFieldMarshalling(); 157 158 embedded_ = true; 159 } 160 161 163 167 public JBossCacheService getCacheService() 168 { 169 return proxy_; 170 } 171 172 176 public String getCacheObjectNameString() 177 { 178 return cacheObjectNameString_; 179 } 180 181 185 public void setCacheObjectNameString(String treeCacheObjectName) 186 { 187 this.cacheObjectNameString_ = treeCacheObjectName; 188 } 189 190 201 public String getSnapshotMode() 202 { 203 return snapshotMode_; 204 } 205 206 213 public void setSnapshotMode(String snapshotMode) 214 { 215 this.snapshotMode_ = snapshotMode; 216 } 217 218 225 public int getSnapshotInterval() 226 { 227 return snapshotInterval_; 228 } 229 230 236 public void setSnapshotInterval(int snapshotInterval) 237 { 238 this.snapshotInterval_ = snapshotInterval; 239 } 240 241 245 public boolean getUseJK() 246 { 247 return useJK_; 248 } 249 250 254 public void setUseJK(boolean useJK) 255 { 256 this.useJK_ = useJK; 257 } 258 259 266 public ReplicationGranularity getReplicationGranularity() 267 { 268 return replicationGranularity_; 269 } 270 271 277 public String getReplicationGranularityString() 278 { 279 if (started_ && this.replGranularityString_ == null) 282 { 283 this.replGranularityString_ = replicationGranularity_.toString(); 284 } 285 return replGranularityString_; 286 } 287 288 297 public void setReplicationGranularityString(String granularity) 298 { 299 this.replGranularityString_ = granularity; 300 } 301 302 306 public String getReplicationTriggerString() 307 { 308 if (started_ && this.replTriggerString_ == null) 311 { 312 this.replTriggerString_ = invalidateSessionPolicy_.toString(); 313 } 314 return this.replTriggerString_; 315 } 316 317 326 public void setReplicationTriggerString(String trigger) 327 { 328 this.replTriggerString_ = trigger; 329 } 330 331 336 public Boolean isReplicationFieldBatchMode() 337 { 338 return replicationFieldBatchMode_; 339 } 340 341 346 public void setReplicationFieldBatchMode(boolean replicationFieldBatchMode) 347 { 348 this.replicationFieldBatchMode_ = Boolean.valueOf(replicationFieldBatchMode); 349 } 350 351 public void setUseLocalCache(boolean useLocalCache) 352 { 353 this.useLocalCache_ = useLocalCache; 354 } 355 356 358 public void expireSession(String sessionId) 359 { 360 Session session = findSession(sessionId); 361 if (session != null) 362 session.expire(); 363 } 364 365 public String getLastAccessedTime(String sessionId) 366 { 367 Session session = findSession(sessionId); 368 if(session == null) { 369 log_.debug("getLastAccessedTime(): Session " + sessionId + 370 " not found"); 371 return ""; 372 } 373 return new Date (session.getLastAccessedTime()).toString(); 374 } 375 376 public Object getSessionAttribute(String sessionId, String key) 377 { 378 ClusteredSession session = (ClusteredSession) findSession(sessionId); 379 return (session == null) ? null : session.getAttribute(key); 380 } 381 382 public String getSessionAttributeString(String sessionId, String key) 383 { 384 Object attr = getSessionAttribute(sessionId, key); 385 return (attr == null) ? null : attr.toString(); 386 } 387 388 public String listLocalSessionIds() 389 { 390 return reportSessionIds(sessions_.keySet()); 391 } 392 393 public String listSessionIds() 394 { 395 Set ids = new HashSet (sessions_.keySet()); 396 ids.addAll(unloadedSessions_.keySet()); 397 return reportSessionIds(ids); 398 } 399 400 private String reportSessionIds(Set ids) 401 { 402 StringBuffer sb = new StringBuffer (); 403 boolean added = false; 404 for (Iterator it = ids.iterator(); it.hasNext(); ) 405 { 406 if (added) 407 { 408 sb.append(','); 409 } 410 else 411 { 412 added = true; 413 } 414 415 sb.append(it.next()); 416 } 417 return sb.toString(); 418 } 419 420 422 428 public void start() throws LifecycleException 429 { 430 if (embedded_) 431 { 432 startEmbedded(); 433 } 434 else 435 { 436 startUnembedded(); 437 } 438 } 439 440 public void stop() throws LifecycleException 441 { 442 if (!started_) 443 { 444 throw new IllegalStateException ("Manager not started"); 445 } 446 447 log_.debug("Stopping"); 448 449 resetStats(); 450 451 lifecycle_.fireLifecycleEvent(BEFORE_STOP_EVENT, this); 453 454 clearSessions(); 455 456 tcl_ = null; 458 459 proxy_.stop(); 460 tm = null; 461 462 snapshotManager_.stop(); 463 464 started_ = false; 465 466 lifecycle_.fireLifecycleEvent(AFTER_STOP_EVENT, this); 468 469 try 470 { 471 unregisterMBeans(); 472 } 473 catch (Exception e) 474 { 475 log_.error("Could not unregister ManagerMBean from MBeanServer", e); 476 } 477 } 478 479 482 protected void clearSessions() 483 { 484 ClusteredSession[] sessions = findLocalSessions(); 486 for(int i=0; i < sessions.length; i++) 487 { 488 ClusteredSession ses = sessions[i]; 489 if (log_.isDebugEnabled()) 493 { 494 log_.debug("clearSessions(): clear session by expiring or passivating: " + ses); 495 } 496 boolean notify = true; 497 boolean localCall = true; 498 boolean localOnly = true; 499 try 500 { 501 if(isPassivationEnabled() && ses.isValid()) 502 { 503 504 processSessionPassivation(ses.getRealId(), this.getContainer().getParent().getName()); 505 } 506 else 507 { 508 ses.expire(notify, localCall, localOnly); 509 } 510 511 } 512 catch (Throwable t) 513 { 514 log_.warn("clearSessions(): Caught exception expiring or passivating session " + 515 ses.getIdInternal(), t); 516 } 517 finally 518 { 519 ses.recycle(); 522 } 523 } 524 525 if(!isPassivationEnabled()) 526 { 527 Map unloaded = new HashMap (unloadedSessions_); 529 Set keys = unloaded.keySet(); 530 for (Iterator it = keys.iterator(); it.hasNext(); ) 531 { 532 String realId = (String ) it.next(); 533 proxy_.removeSessionLocal(realId); 534 unloadedSessions_.remove(realId); 535 } 536 } 537 } 538 539 542 public Session createSession() 543 { 544 return createSession(null); 545 } 546 547 558 public Session createSession(String sessionId) 559 { 560 if (log_.isTraceEnabled()) 561 { 562 log_.trace("createSession: active sessions = " + activeCounter_ + 563 " , session map size = " + sessions_.size() + 564 " and max allowed sessions = " + maxActive_); 565 } 566 570 if(maxActive_ != -1 && sessions_.size() >= maxActive_ && isPassivationEnabled()) 574 { 575 processExpires(); 576 } 577 if (maxActive_ != -1 && sessions_.size() >= maxActive_) 579 { 580 rejectedCounter_++; 582 String msgEnd = (sessionId == null) ? "" : " id " + sessionId; 585 throw new IllegalStateException ("JBossCacheManager.createSession(): number of " + 586 "active sessions exceeds the maximum limit: " + 587 maxActive_ + " when trying to create session" + msgEnd); 588 } 589 590 ClusteredSession session = createEmptyClusteredSession(); 591 592 session.setNew(true); 593 session.setCreationTime(System.currentTimeMillis()); 594 session.setMaxInactiveInterval(this.maxInactiveInterval_); 595 session.setValid(true); 596 597 if (sessionId == null) 598 { 599 sessionId = this.getNextId(); 600 601 if (useJK_) 603 { 604 if (log_.isDebugEnabled()) 605 { 606 log_.debug("createSession(): useJK is true. Will append JvmRoute: " + this.getJvmRoute()); 607 } 608 sessionId += "." + this.getJvmRoute(); 609 } 610 } 611 612 session.setId(sessionId); 614 if (log_.isDebugEnabled()) 615 { 616 log_.debug("Created a ClusteredSession with id: " + sessionId); 617 } 618 619 createdCounter_++; 620 activeCounter_++; 623 624 SessionReplicationContext.bindSession(session, snapshotManager_); 626 627 return session; 628 } 629 630 public boolean storeSession(Session baseSession) 631 { 632 boolean stored = false; 633 if(baseSession != null && started_) 634 { 635 ClusteredSession session = (ClusteredSession) baseSession; 636 637 synchronized (session) 638 { 639 if (log_.isTraceEnabled()) 640 { 641 log_.trace("check to see if needs to store and replicate " + 642 "session with id " + session.getIdInternal()); 643 } 644 645 if (session.isValid() && 646 (session.isSessionDirty() || session.getExceedsMaxUnreplicatedInterval())) 647 { 648 String realId = session.getRealId(); 649 650 long begin = System.currentTimeMillis(); 652 session.passivate(); 653 long elapsed = System.currentTimeMillis() - begin; 654 stats_.updatePassivationStats(realId, elapsed); 655 656 begin = System.currentTimeMillis(); 658 processSessionRepl(session); 659 elapsed = System.currentTimeMillis() - begin; 660 stored = true; 661 stats_.updateReplicationStats(realId, elapsed); 662 } 663 else if (log_.isTraceEnabled()) 664 { 665 log_.trace("Session " + session.getIdInternal() + 666 " did not require replication."); 667 } 668 } 669 } 670 671 return stored; 672 } 673 674 public void add(Session session) 675 { 676 if (session == null) 677 return; 678 679 if (!(session instanceof ClusteredSession)) 680 { 681 throw new IllegalArgumentException ("You can only add instances of " + 682 "type ClusteredSession to this Manager. Session class name: " + 683 session.getClass().getName()); 684 } 685 686 add((ClusteredSession) session, false); 688 } 689 690 699 private void add(ClusteredSession session, boolean replicate) 700 { 701 if (!session.isValid()) 702 { 703 log_.error("Cannot add session with id=" + session.getIdInternal() + 704 " because it is invalid"); 705 return; 706 } 707 708 String realId = session.getRealId(); 709 Object existing = sessions_.put(realId, session); 710 unloadedSessions_.remove(realId); 711 712 if (!session.equals(existing)) 713 { 714 if (replicate) 715 { 716 storeSession(session); 717 } 718 719 activeCounter_++; 720 if (activeCounter_ > maxActiveCounter_) 721 maxActiveCounter_++; 722 723 if (log_.isDebugEnabled()) 724 { 725 log_.debug("Session with id=" + session.getIdInternal() + " added. " + 726 "Current active sessions " + activeCounter_); 727 } 728 } 729 } 730 731 public Session createEmptySession() 734 { 735 return createEmptyClusteredSession(); 736 } 737 738 private ClusteredSession createEmptyClusteredSession() 739 { 740 log_.debug("Creating an empty ClusteredSession"); 741 742 ClusteredSession session = null; 743 switch (replicationGranularity_) 744 { 745 case ATTRIBUTE: 746 session = new AttributeBasedClusteredSession(this); 747 break; 748 case FIELD: 749 session = new FieldBasedClusteredSession(this); 750 break; 751 default: 752 session = new SessionBasedClusteredSession(this); 753 break; 754 } 755 return session; 756 } 757 758 771 public Session findSession(String id) 772 { 773 String realId = getRealId(id); 774 ClusteredSession session = findLocalSession(realId); 776 777 if (session == null 788 && !SessionReplicationContext.isSessionBoundAndExpired(realId, snapshotManager_)) 789 { 790 if (log_.isTraceEnabled()) 791 log_.trace("Checking for session " + realId + " in the distributed cache"); 792 793 session = loadSession(realId); 794 if (session != null) 795 { 796 add(session); 797 } 800 } 801 else if (session != null && session.isOutdated()) 802 { 803 if (log_.isTraceEnabled()) 804 log_.trace("Updating session " + realId + " from the distributed cache"); 805 806 loadSession(realId); 808 } 809 810 if (session != null) 811 { 812 SessionReplicationContext.bindSession(session, snapshotManager_); 814 } 815 816 return session; 817 } 818 819 834 public Session[] findSessions() 835 { 836 if(unloadedSessions_.size() > 0) 838 { 839 Set ids = new HashSet (unloadedSessions_.keySet()); 841 842 if(log_.isDebugEnabled()) { 843 log_.debug("findSessions: loading sessions from distributed cache: " + ids); 844 } 845 846 for(Iterator it = ids.iterator(); it.hasNext();) { 847 loadSession((String ) it.next()); 848 } 849 } 850 851 return findLocalSessions(); 853 } 854 855 862 public ClusteredSession[] findLocalSessions() 863 { 864 Collection coll = sessions_.values(); 865 ClusteredSession[] sess = new ClusteredSession[coll.size()]; 866 sess = (ClusteredSession[]) coll.toArray(sess); 867 return sess; 868 } 869 870 881 public ClusteredSession findLocalSession(String realId) 882 { 883 return (ClusteredSession) sessions_.get(realId); 884 } 885 886 891 public void remove(Session session) 892 { 893 ClusteredSession clusterSess = (ClusteredSession) session; 894 synchronized (clusterSess) 895 { 896 String realId = clusterSess.getRealId(); 897 if (realId == null) 898 return; 899 900 if (log_.isDebugEnabled()) 901 { 902 log_.debug("Removing session from store with id: " + realId); 903 } 904 905 try { 906 SessionReplicationContext.startCacheActivity(); 908 clusterSess.removeMyself(); 909 } 910 finally { 911 SessionReplicationContext.finishCacheActivity(); 912 913 SessionReplicationContext.sessionExpired(clusterSess, realId, snapshotManager_); 916 917 sessions_.remove(realId); 918 stats_.removeStats(realId); 919 activeCounter_--; 920 } 921 } 922 } 923 924 930 public void removeLocal(Session session) 931 { 932 ClusteredSession clusterSess = (ClusteredSession) session; 933 synchronized (clusterSess) 934 { 935 String realId = clusterSess.getRealId(); 936 if (realId == null) return; 937 938 if (log_.isDebugEnabled()) 939 { 940 log_.debug("Removing session from local store with id: " + realId); 941 } 942 943 try { 944 SessionReplicationContext.startCacheActivity(); 946 clusterSess.removeMyselfLocal(); 947 } 948 finally 949 { 950 SessionReplicationContext.finishCacheActivity(); 951 952 SessionReplicationContext.sessionExpired(clusterSess, realId, snapshotManager_); 955 956 sessions_.remove(realId); 957 stats_.removeStats(realId); 958 959 expiredCounter_++; 963 activeCounter_--; 964 } 965 } 966 } 967 968 980 protected ClusteredSession loadSession(String realId) 981 { 982 if (realId == null) 983 { 984 return null; 985 } 986 987 long begin = System.currentTimeMillis(); 988 boolean mustAdd = false; 989 ClusteredSession session = (ClusteredSession) sessions_.get(realId); 990 991 if (session == null) 992 { 993 if (log_.isTraceEnabled()) 996 { 997 log_.trace("createSession: active sessions = " + activeCounter_ + 998 " , session map size = " + sessions_.size() + 999 " and max allowed sessions = " + maxActive_); 1000 } 1001 if(maxActive_ != -1 && sessions_.size() >= maxActive_ && isPassivationEnabled()) 1004 { 1005 processExpires(); 1006 } 1007 if (maxActive_ != -1 && sessions_.size() >= maxActive_) 1009 { 1010 rejectedCounter_++; 1012 String msgEnd = (realId == null) ? "" : " id " + realId; 1015 throw new IllegalStateException ("JBossCacheManager.createSession(): number of " + 1016 "active sessions exceeds the maximum limit: " + 1017 maxActive_ + " when trying to load session" + msgEnd); 1018 } 1019 1020 mustAdd = true; 1024 session = createEmptyClusteredSession(); 1025 } 1026 1027 synchronized (session) 1028 { 1029 boolean doTx = false; 1030 try 1031 { 1032 if(tm.getTransaction() == null) 1037 doTx = true; 1038 1039 if(doTx) 1040 tm.begin(); 1041 1042 SessionReplicationContext.startCacheActivity(); 1045 1046 session = proxy_.loadSession(realId, session); 1047 } 1048 catch (Exception ex) 1049 { 1050 try 1051 { 1052 tm.setRollbackOnly(); 1055 } 1056 catch (Exception exn) 1057 { 1058 log_.error("Caught exception rolling back transaction", exn); 1059 } 1060 if (ex instanceof RuntimeException ) 1062 throw (RuntimeException ) ex; 1063 1064 throw new RuntimeException ("loadSession(): failed to load session " + 1065 realId, ex); 1066 } 1067 finally 1068 { 1069 try { 1070 if(doTx) 1071 endTransaction(realId); 1072 } 1073 finally { 1074 SessionReplicationContext.finishCacheActivity(); 1075 } 1076 } 1077 1078 if (session != null) 1079 { 1080 session.initAfterLoad(this); 1082 if (mustAdd) 1083 add(session, false); long elapsed = System.currentTimeMillis() - begin; 1085 stats_.updateLoadStats(realId, elapsed); 1086 1087 if (log_.isDebugEnabled()) 1088 { 1089 log_.debug("loadSession(): id= " + realId + ", session=" + session); 1090 } 1091 } 1092 else if (log_.isDebugEnabled()) 1093 { 1094 log_.debug("loadSession(): session " + realId + 1095 " not found in distributed cache"); 1096 } 1097 } 1098 1099 return session; 1100 } 1101 1102 1108 protected void processSessionRepl(ClusteredSession session) 1109 { 1110 boolean notSession = (replicationGranularity_ != ReplicationGranularity.SESSION); 1113 boolean doTx = false; 1114 try 1115 { 1116 if(notSession && tm.getTransaction() == null) 1120 doTx = true; 1121 1122 if(doTx) 1123 tm.begin(); 1124 1125 SessionReplicationContext.startCacheActivity(); 1131 1132 session.processSessionRepl(); 1133 } 1134 catch (Exception ex) 1135 { 1136 log_.debug("processSessionRepl(): failed with exception", ex); 1137 1138 try 1139 { 1140 if (notSession) 1144 tm.setRollbackOnly(); 1145 } 1146 catch (Exception exn) 1147 { 1148 log_.error("Caught exception rolling back transaction", exn); 1149 } 1150 1151 if (ex instanceof RuntimeException ) 1153 throw (RuntimeException ) ex; 1154 1155 throw new RuntimeException ("JBossCacheManager.processSessionRepl(): " + 1156 "failed to replicate session.", ex); 1157 } 1158 finally 1159 { 1160 try { 1161 if(doTx) 1162 endTransaction(session.getId()); 1163 } 1164 finally { 1165 SessionReplicationContext.finishCacheActivity(); 1166 } 1167 } 1168 } 1169 1170 protected void endTransaction(String id) 1171 { 1172 if (tm == null) 1173 { 1174 log_.warn("JBossCacheManager.endTransaction(): tm is null for id: " +id); 1175 return; 1176 } 1177 1178 1179 try 1180 { 1181 if(tm.getTransaction().getStatus() != Status.STATUS_MARKED_ROLLBACK) 1182 { 1183 tm.commit(); 1184 } 1185 else 1186 { 1187 log_.info("JBossCacheManager.endTransaction(): rolling back tx for id: " +id); 1188 tm.rollback(); 1189 } 1190 } 1191 catch (RollbackException re) 1192 { 1193 log_.warn("JBossCacheManager.endTransaction(): rolling back transaction with exception: " +re); 1195 } 1196 catch (Exception e) 1197 { 1198 throw new RuntimeException ("JBossCacheManager.endTransaction(): Exception for id: " +id, e); 1199 } 1200 } 1201 1202 1205 protected ClassLoader getWebappClassLoader() 1206 { 1207 return tcl_; 1208 } 1209 1210 1214 protected void processExpires() 1215 { 1216 if (maxInactiveInterval_ < 0) 1217 { 1218 return; 1219 } 1220 1221 if (log_.isTraceEnabled()) 1222 { 1223 log_.trace("processExpires():max active sessions = " + maxActive_); 1224 log_.trace("processExpires(): passivation mode = " + isPassivationEnabled()); 1225 log_.trace("processExpires(): Looking for sessions that have expired ..."); 1226 } 1227 try 1228 { 1229 Session sessions[] = findLocalSessions(); 1231 for (int i = 0; i < sessions.length; ++i) 1232 { 1233 try 1234 { 1235 ClusteredSession session = (ClusteredSession) sessions[i]; 1236 if(session == null) 1237 { 1238 log_.warn("processExpires(): processing null session at index " +i); 1239 continue; 1240 } 1241 1242 if (session.isOutdated() && !(session.isValid(false))) 1246 { 1247 loadSession(session.getRealId()); 1251 } 1252 1253 if (!session.isValid()) continue; 1260 1261 if (log_.isTraceEnabled()) 1263 { 1264 log_.trace("processExpires(): Checking passivation for session " + session.getId()); 1265 } 1266 if (isPassivationEnabled()) 1269 { 1270 long timeNow = System.currentTimeMillis(); 1271 int timeIdle = (int) ((timeNow - session.getLastAccessedTimeInternal()) / 1000L); 1272 if (passivationMaxIdleTime_ >= 0 && timeIdle > passivationMaxIdleTime_) 1275 { 1276 if(log_.isTraceEnabled()) 1277 { 1278 log_.trace("JBossCacheManager.processExpires() passivating session " + session.getRealId()); 1279 } 1280 processSessionPassivation(session.getRealId(), this.getContainer().getParent().getName()); 1281 } 1282 else if (maxActive_ > 0 && passivationMinIdleTime_ > 0 && sessions_.size()> maxActive_) 1286 { 1287 if(timeIdle > passivationMinIdleTime_) 1288 { 1289 if(log_.isTraceEnabled()) 1290 { 1291 log_.debug("JBossCacheManager.processExpires() passivating session " + session.getRealId()); 1292 } 1293 processSessionPassivation(session.getRealId(), this.getContainer().getParent().getName()); 1294 } 1295 } 1296 } 1297 1298 } 1299 catch (Exception ex) 1300 { 1301 log_.error("processExpires(): failed expiring " + 1302 sessions[i].getIdInternal() + " with exception: " + 1303 ex, ex); 1304 } 1305 } 1306 1307 1309 long now = System.currentTimeMillis(); 1310 Map unloaded = new HashMap (unloadedSessions_); 1311 Set entries = unloaded.entrySet(); 1312 for (Iterator it = entries.iterator(); it.hasNext(); ) 1313 { 1314 Map.Entry entry = (Map.Entry ) it.next(); 1315 OwnedSessionUpdate osu = (OwnedSessionUpdate) entry.getValue(); 1316 int elapsed = (int) ((now - osu.updateTime) / 1000L); 1317 if (elapsed >= maxInactiveInterval_) 1318 { 1319 String realId = (String ) entry.getKey(); 1320 try 1321 { 1322 proxy_.removeSessionLocal(realId, osu.owner); 1323 unloadedSessions_.remove(realId); 1324 } 1325 1326 catch (Exception ex) 1332 { 1333 log_.error("processExpire(): failed removing unloaded session " + 1334 realId + " with exception: " + 1335 ex, ex); 1336 } 1337 } 1338 } 1339 } 1340 catch (Exception ex) 1341 { 1342 log_.error("processExpires: failed with exception: " + ex, ex); 1343 } 1344 } 1345 1346 public void processRemoteAttributeRemoval(String realId, String attrKey) 1347 { 1348 1349 ClusteredSession session = findLocalSession(realId); 1350 if (session != null) 1351 { 1352 boolean localCall = false; boolean localOnly = true; boolean notify = false; 1358 ClassLoader prevTcl = Thread.currentThread().getContextClassLoader(); 1360 try 1361 { 1362 Thread.currentThread().setContextClassLoader(tcl_); 1363 synchronized (session) 1364 { 1365 session.removeAttributeInternal(attrKey, localCall, localOnly, notify); 1366 } 1367 if (log_.isTraceEnabled()) 1368 log_.trace("processRemoteAttributeRemoval: removed attribute " + 1369 attrKey + " from " + realId); 1370 } 1371 finally 1372 { 1373 Thread.currentThread().setContextClassLoader(prevTcl); 1374 } 1375 } 1376 } 1377 1378 public void processRemoteInvalidation(String realId) 1379 { 1380 ClusteredSession session = (ClusteredSession) sessions_.remove(realId); 1382 if (session == null) 1383 { 1384 if (unloadedSessions_.remove(realId) != null) 1387 { 1388 if (log_.isTraceEnabled()) 1389 log_.trace("Removed entry for session " + realId + " from unloaded session map"); 1390 } 1391 } 1392 else 1393 { 1394 boolean notify = false; boolean localCall = false; boolean localOnly = true; 1407 ClassLoader prevTcl = Thread.currentThread().getContextClassLoader(); 1409 try 1410 { 1411 Thread.currentThread().setContextClassLoader(tcl_); 1412 session.expire(notify, localCall, localOnly); 1413 } 1414 finally 1415 { 1416 Thread.currentThread().setContextClassLoader(prevTcl); 1417 } 1418 1419 stats_.removeStats(realId); 1421 1422 activeCounter_--; 1424 } 1425 } 1426 1427 public void processSessionPassivation(String realId, String dataOwner) 1428 { 1429 ClusteredSession session = findLocalSession(realId); 1431 if (session != null) 1435 { 1436 synchronized (session) 1437 { 1438 if (log_.isTraceEnabled()) 1439 { 1440 log_.trace("Passivating session with id: " + realId); 1441 } 1442 1443 try { 1444 SessionReplicationContext.startCacheActivity(); 1447 session.passivate(); 1448 proxy_.evictSession(realId); 1449 } 1450 finally { 1451 SessionReplicationContext.finishCacheActivity(); 1452 } 1453 1454 Object obj = unloadedSessions_.put(realId, 1455 new OwnedSessionUpdate(dataOwner, session.getLastAccessedTime())); 1456 if (log_.isTraceEnabled()) 1457 { 1458 if (obj == null) 1459 { 1460 log_.trace("New session " + realId + " added to unloaded session map"); 1461 } 1462 else 1463 { 1464 log_.trace("Updated timestamp for unloaded session " + realId); 1465 } 1466 } 1467 sessions_.remove(realId); 1468 stats_.removeStats(realId); 1469 } 1470 activeCounter_--; 1471 } 1472 } 1473 1474 1480 protected String getRealId(String id) 1481 { 1482 return (useJK_ ? Util.getRealId(id) : id); 1483 } 1484 1485 1493 protected void unloadedSessionChanged(String realId, String dataOwner) 1494 { 1495 Object obj = unloadedSessions_.put(realId, 1496 new OwnedSessionUpdate(dataOwner, System.currentTimeMillis())); 1497 if (log_.isTraceEnabled()) 1498 { 1499 if (obj == null) 1500 { 1501 log_.trace("New session " + realId + " added to unloaded session map"); 1502 } 1503 else 1504 { 1505 log_.trace("Updated timestamp for unloaded session " + realId); 1506 } 1507 } 1508 } 1509 1510 1516 protected boolean isPassivationEnabled() 1517 { 1518 return (passivationMode_ && proxy_.isCachePassivationEnabled()); 1519 } 1520 1522 1527 private void startEmbedded() throws LifecycleException 1528 { 1529 super.start(); 1530 1531 tcl_ = super.getContainer().getLoader().getClassLoader(); 1535 1536 proxy_.start(tcl_, this); 1537 1538 tm = proxy_.getTransactionManager(); 1539 if(tm == null) 1540 { 1541 throw new LifecycleException("JBossCacheManager.start(): Obtain null tm"); 1542 } 1543 1544 try 1545 { 1546 initializeUnloadedSessions(); 1547 1548 initSnapshotManager(); 1550 1551 installValves(); 1553 1554 log_.debug("start(): JBossCacheService started"); 1555 } 1556 catch (Exception e) 1557 { 1558 log_.error("Unable to start manager.", e); 1559 throw new LifecycleException(e); 1560 } 1561 } 1562 1563 1565 1568 private void startUnembedded() throws LifecycleException 1569 { 1570 if (started_) 1571 { 1572 return; 1573 } 1574 1575 log_.info("Manager is about to start"); 1576 1577 lifecycle_.fireLifecycleEvent(BEFORE_START_EVENT, this); 1579 1580 if (snapshotMode_ == null) 1581 { 1582 try 1585 { 1586 JBossCacheCluster cluster = (JBossCacheCluster) container_.getCluster(); 1587 cluster.configureManager(this); 1588 } 1589 catch (ClassCastException e) 1590 { 1591 String msg = "Cluster is not an instance of JBossCacheCluster"; 1592 log_.error(msg, e); 1593 throw new LifecycleException(msg, e); 1594 } 1595 } 1596 1597 1599 try 1600 { 1601 this.invalidateSessionPolicy_ = InvalidateSessionPolicy.fromString(replTriggerString_); 1602 } 1603 catch (IllegalArgumentException iae) 1604 { 1605 throw new LifecycleException("replication-trigger value set to a " + 1606 "non-valid value: '" + 1607 replTriggerString_ + 1608 "' (should be ['SET_AND_GET', " + 1609 "'SET_AND_NON_PRIMITIVE_GET', 'SET'])"); 1610 } 1611 1612 try 1613 { 1614 } 1615 catch (IllegalArgumentException iae) 1616 { 1617 throw new LifecycleException("replication-granularity value set to " + 1618 "a non-valid value: '" + 1619 replGranularityString_ + 1620 "' (should be ['SESSION', " + 1621 "'ATTRIBUTE' or 'FIELD'])"); 1622 } 1623 1624 try 1626 { 1627 proxy_ = new JBossCacheService(cacheObjectNameString_); 1628 1629 validateFieldMarshalling(); 1632 1633 tcl_ = container_.getLoader().getClassLoader(); 1636 proxy_.start(tcl_, this); 1637 } 1638 catch (Throwable t) 1639 { 1640 String str = "Problem starting JBossCacheService for Tomcat clustering"; 1641 log_.error(str, t); 1642 throw new LifecycleException(str, t); 1643 } 1644 1645 tm = proxy_.getTransactionManager(); 1646 if(tm == null) 1647 { 1648 throw new LifecycleException("JBossCacheManager.start(): Obtain null tm"); 1649 } 1650 1651 try 1652 { 1653 initializeUnloadedSessions(); 1654 1655 installValves(); 1657 1658 started_ = true; 1659 1660 lifecycle_.fireLifecycleEvent(AFTER_START_EVENT, this); 1662 1663 log_.debug("start(): JBossCacheService started"); 1664 } 1665 catch (Exception e) 1666 { 1667 log_.error("Unable to start manager.", e); 1668 throw new LifecycleException(e); 1669 } 1670 1671 try 1672 { 1673 registerMBeans(); 1674 } 1675 catch (Exception e) 1676 { 1677 log_.error("Could not register ManagerMBean with MBeanServer", e); 1678 } 1679 } 1680 1681 1684 private void registerMBeans() 1685 { 1686 try 1687 { 1688 MBeanServer server = getMBeanServer(); 1689 1690 String domain; 1691 if (container_ instanceof ContainerBase) 1692 { 1693 domain = ((ContainerBase) container_).getDomain(); 1694 } 1695 else 1696 { 1697 domain = server.getDefaultDomain(); 1698 } 1699 String hostName = ((Host) container_.getParent()).getName(); 1700 hostName = (hostName == null) ? "localhost" : hostName; 1701 ObjectName clusterName = new ObjectName (domain 1702 + ":service=ClusterManager,WebModule=//" + hostName 1703 + ((Context) container_).getPath()); 1704 1705 if (server.isRegistered(clusterName)) 1706 { 1707 log_.warn("MBean " + clusterName + " already registered"); 1708 return; 1709 } 1710 1711 objectName_ = clusterName; 1712 server.registerMBean(this, clusterName); 1713 1714 } 1715 catch (Exception ex) 1716 { 1717 log_.error(ex.getMessage(), ex); 1718 } 1719 } 1720 1721 1724 private void unregisterMBeans() 1725 { 1726 if (mserver_ != null) 1727 { 1728 try 1729 { 1730 mserver_.unregisterMBean(objectName_); 1731 } 1732 catch (Exception e) 1733 { 1734 log_.error(e); 1735 } 1736 } 1737 } 1738 1739 1745 private MBeanServer getMBeanServer() throws Exception 1746 { 1747 if (mserver_ == null) 1748 { 1749 mserver_ = MBeanServerLocator.locateJBoss(); 1750 } 1751 return (mserver_); 1752 } 1753 1754 1762 private void initializeUnloadedSessions() throws CacheException 1763 { 1764 Map sessions = proxy_.getSessionIds(); 1765 if (sessions != null) 1766 { 1767 long now = System.currentTimeMillis(); 1768 for (Iterator it = sessions.entrySet().iterator(); it.hasNext(); ) 1769 { 1770 Map.Entry entry = (Entry) it.next(); 1771 unloadedSessions_.put(entry.getKey(), 1772 new OwnedSessionUpdate((String ) entry.getValue(), now)); 1773 } 1774 } 1775 } 1776 1777 1783 private void installValves() 1784 { 1785 if (useJK_) 1786 { 1787 log_.info("We are using mod_jk(2) for load-balancing. " + 1788 "Will add JvmRouteValve."); 1789 1790 installContextValve(new JvmRouteValve(this)); 1791 } 1792 1793 if (replicationGranularity_ == ReplicationGranularity.FIELD && 1797 Boolean.TRUE.equals(replicationFieldBatchMode_)) 1798 { 1799 Valve batchValve = new BatchReplicationClusteredSessionValve(this); 1800 log_.debug("Adding BatchReplicationClusteredSessionValve for batch replication."); 1801 installContextValve(batchValve); 1802 } 1803 1804 ClusteredSessionValve valve = new ClusteredSessionValve(); 1806 installContextValve(valve); 1807 } 1808 1809 1812 private void initSnapshotManager() 1813 { 1814 String ctxPath = ((Context) container_).getPath(); 1815 if ("instant".equals(snapshotMode_) 1816 || replicationGranularity_== ReplicationGranularity.FIELD) 1817 { 1818 snapshotManager_ = new InstantSnapshotManager(this, ctxPath); 1819 } 1820 else if ("interval".equals(snapshotMode_)) 1821 { 1822 snapshotManager_ = new IntervalSnapshotManager(this, ctxPath, snapshotInterval_); 1823 } 1824 else 1825 { 1826 log_.error("Snapshot mode must be 'instant' or 'interval' - " + 1827 "using 'instant'"); 1828 snapshotManager_ = new InstantSnapshotManager(this, ctxPath); 1829 } 1830 1831 snapshotManager_.start(); 1832 } 1833 1834 private void installContextValve(Valve valve) 1835 { 1836 boolean installed = false; 1837 1838 if (embedded_ && getContextObjectName() != null) { 1843 try 1844 { 1845 getMBeanServer().invoke(getContextObjectName(), "addValve", 1846 new Object []{valve}, 1847 new String []{"org.apache.catalina.Valve"}); 1848 installed = true; 1849 } 1850 catch (Exception e) 1851 { 1852 log_.debug("Caught exception installing valve to Context", e); 1858 } 1859 } 1860 1861 if (!installed) 1862 { 1863 if (container_ instanceof ContainerBase) 1865 { 1866 ((ContainerBase) container_).addValve(valve); 1867 } 1868 else 1869 { 1870 container_.getPipeline().addValve(valve); 1872 } 1873 } 1874 } 1875 1876 1882 private void validateFieldMarshalling() 1883 { 1884 if (replicationGranularity_ == ReplicationGranularity.FIELD 1885 && !proxy_.isMarshallingAvailable()) 1886 { 1887 throw new IllegalStateException ("replication-granularity value is set to " + 1891 "'FIELD' but is not supported by the cache service configuration. " + 1892 "Must set 'UseRegionBasedMarshalling' to 'true' in the " + 1893 "tc6-cluster.sar jboss-service.xml"); 1894 } 1895 } 1896 1897 private ObjectName getContextObjectName() 1898 { 1899 String oname = container_.getObjectName(); 1900 try 1901 { 1902 return (oname == null) ? null : new ObjectName (oname); 1903 } 1904 catch (MalformedObjectNameException e) 1905 { 1906 log_.warn("Error creating object name from string " + oname, e); 1907 return null; 1908 } 1909 } 1910 1911 1912 private class OwnedSessionUpdate 1913 { 1914 String owner; 1915 long updateTime; 1916 1917 OwnedSessionUpdate(String owner, long updateTime) 1918 { 1919 this.owner = owner; 1920 this.updateTime = updateTime; 1921 } 1922 } 1923} 1924 | Popular Tags |