1 22 package org.jboss.ha.hasessionstate.server; 23 24 import java.io.ByteArrayInputStream ; 25 import java.io.ByteArrayOutputStream ; 26 import java.io.IOException ; 27 import java.io.ObjectInputStream ; 28 import java.io.ObjectOutputStream ; 29 import java.io.Serializable ; 30 import java.util.ArrayList ; 31 import java.util.Enumeration ; 32 import java.util.HashMap ; 33 import java.util.Hashtable ; 34 import java.util.Iterator ; 35 import java.util.Vector ; 36 import java.util.zip.Deflater ; 37 import java.util.zip.DeflaterOutputStream ; 38 import java.util.zip.InflaterInputStream ; 39 40 import javax.naming.Context ; 41 import javax.naming.InitialContext ; 42 import javax.naming.Name ; 43 import javax.naming.NameNotFoundException ; 44 import javax.naming.Reference ; 45 import javax.naming.StringRefAddr ; 46 47 import org.jboss.ha.framework.interfaces.HAPartition; 48 import org.jboss.ha.hasessionstate.interfaces.PackagedSession; 49 import org.jboss.logging.Logger; 50 import org.jboss.naming.NonSerializableFactory; 51 import org.jboss.system.server.ServerConfigUtil; 52 53 import EDU.oswego.cs.dl.util.concurrent.Mutex; 54 55 70 71 public class HASessionStateImpl 72 implements org.jboss.ha.hasessionstate.interfaces.HASessionState, 73 HAPartition.HAPartitionStateTransfer 74 { 75 76 protected String _sessionStateName; 77 protected Logger log; 78 protected HAPartition hapGeneral; 79 protected String sessionStateIdentifier; 80 protected String myNodeName; 81 82 protected long beanCleaningDelay; 83 protected String haPartitionName; 84 protected String haPartitionJndiName; 85 86 protected final String DEFAULT_PARTITION_JNDI_NAME = ServerConfigUtil.getDefaultPartitionName(); 87 protected final String JNDI_FOLDER_NAME_FOR_HASESSIONSTATE = org.jboss.metadata.ClusterConfigMetaData.JNDI_PREFIX_FOR_SESSION_STATE; 88 protected final String JNDI_FOLDER_NAME_FOR_HAPARTITION = "/HAPartition/"; 89 protected final long MAX_DELAY_BEFORE_CLEANING_UNRECLAIMED_STATE = 30L * 60L * 1000L; protected static final String HA_SESSION_STATE_STATE_TRANSFER = "HASessionStateTransfer"; 91 92 protected HashMap locks = new HashMap (); 93 94 public HASessionStateImpl () 95 {} 96 97 public HASessionStateImpl (String sessionStateName, 98 HAPartition partition, 99 long beanCleaningDelay) 100 { 101 this(sessionStateName, partition.getPartitionName(), beanCleaningDelay); 102 this.hapGeneral = partition; 103 } 104 105 public HASessionStateImpl (String sessionStateName, 106 String mainHAPartitionName, 107 long beanCleaningDelay) 108 { 109 if (sessionStateName == null) 110 this._sessionStateName = org.jboss.metadata.ClusterConfigMetaData.DEFAULT_SESSION_STATE_NAME; 111 else 112 this._sessionStateName = sessionStateName; 113 114 this.sessionStateIdentifier = "SessionState-'" + this._sessionStateName + "'"; 115 116 if (mainHAPartitionName == null) 117 haPartitionName = DEFAULT_PARTITION_JNDI_NAME; 118 else 119 haPartitionName = mainHAPartitionName; 120 121 haPartitionJndiName = JNDI_FOLDER_NAME_FOR_HAPARTITION + haPartitionName; 122 123 if (beanCleaningDelay > 0) 124 this.beanCleaningDelay = beanCleaningDelay; 125 else 126 this.beanCleaningDelay = MAX_DELAY_BEFORE_CLEANING_UNRECLAIMED_STATE; 127 128 } 129 130 public void init () throws Exception 131 { 132 this.log = Logger.getLogger(HASessionStateImpl.class.getName() + "." + this._sessionStateName); 133 134 if (this.hapGeneral == null) 139 { 140 Context ctx = new InitialContext (); 141 this.hapGeneral = (HAPartition)ctx.lookup (haPartitionJndiName); 142 } 143 144 if (hapGeneral == null) 145 log.error ("Unable to get default HAPartition under name '" + haPartitionJndiName + "'."); 146 147 this.hapGeneral.registerRPCHandler (this.sessionStateIdentifier, this); 148 this.hapGeneral.subscribeToStateTransferEvents (HA_SESSION_STATE_STATE_TRANSFER, this); 149 } 150 151 public void start () throws Exception 152 { 153 this.myNodeName = this.hapGeneral.getNodeName (); 154 log.debug ("HASessionState node name : " + this.myNodeName ); 155 156 Context ctx = new InitialContext (); 159 this.bind (this._sessionStateName, this, HASessionStateImpl.class, ctx); 160 } 161 162 protected void bind (String jndiName, Object who, Class classType, Context ctx) throws Exception 163 { 164 NonSerializableFactory.bind (jndiName, who); 167 Name n = ctx.getNameParser ("").parse (jndiName); 168 while (n.size () > 1) 169 { 170 String ctxName = n.get (0); 171 try 172 { 173 ctx = (Context )ctx.lookup (ctxName); 174 } 175 catch (NameNotFoundException e) 176 { 177 log.debug ("creating Subcontext" + ctxName); 178 ctx = ctx.createSubcontext (ctxName); 179 } 180 n = n.getSuffix (1); 181 } 182 183 StringRefAddr addr = new StringRefAddr ("nns", jndiName); 187 Reference ref = new Reference ( classType.getName (), addr, NonSerializableFactory.class.getName (), null); 188 ctx.bind (n.get (0), ref); 189 } 190 191 public void stop () throws Exception 192 { 193 purgeState(); 194 195 try 197 { 198 Context ctx = new InitialContext (); 199 ctx.unbind (this._sessionStateName); 200 NonSerializableFactory.unbind (this._sessionStateName); 201 } 202 catch (Exception ignored) 203 {} 204 } 205 206 public void destroy() throws Exception 207 { 208 this.hapGeneral.unregisterRPCHandler(this.sessionStateIdentifier, this); 210 this.hapGeneral.unsubscribeFromStateTransferEvents(HA_SESSION_STATE_STATE_TRANSFER, this); 211 } 212 213 public String getNodeName () 214 { 215 return this.myNodeName ; 216 } 217 218 public Serializable getCurrentState () 221 { 222 log.debug ("Building and returning state of HASessionState"); 223 224 if (this.appSessions == null) 225 this.appSessions = new Hashtable (); 226 227 Serializable result = null; 228 229 synchronized (this.lockAppSession) 230 { 231 this.purgeState (); 232 233 try 234 { 235 result = deflate (this.appSessions); 236 } 237 catch (Exception e) 238 { 239 log.error("operation failed", e); 240 } 241 } 242 return result; 243 } 244 245 public void setCurrentState (Serializable newState) 246 { 247 log.debug ("Receiving state of HASessionState"); 248 249 if (this.appSessions == null) 250 this.appSessions = new Hashtable (); 251 252 synchronized (this.lockAppSession) 253 { 254 try 255 { 256 this.appSessions.clear (); this.appSessions = (Hashtable )inflate ((byte[])newState); 258 } 259 catch (Exception e) 260 { 261 log.error("operation failed", e); 262 } 263 } 264 } 265 266 public void purgeState () 267 { 268 synchronized (this.lockAppSession) 269 { 270 for (Enumeration keyEnum = this.appSessions.keys (); keyEnum.hasMoreElements ();) 271 { 272 Object key = keyEnum.nextElement (); 275 Hashtable value = (Hashtable )this.appSessions.get (key); 276 long currentTime = System.currentTimeMillis (); 277 278 for (Iterator iterSessions = value.values ().iterator (); iterSessions.hasNext ();) 279 { 280 PackagedSession ps = (PackagedSession)iterSessions.next (); 281 if ( (currentTime - ps.unmodifiedExistenceInVM ()) > beanCleaningDelay ) 282 iterSessions.remove (); 283 } 284 } 285 } 286 287 } 288 289 protected byte[] deflate (Object object) throws IOException 290 { 291 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 292 Deflater def = new Deflater (java.util.zip.Deflater.BEST_COMPRESSION); 293 DeflaterOutputStream dos = new DeflaterOutputStream (baos, def); 294 295 ObjectOutputStream out = new ObjectOutputStream (dos); 296 out.writeObject (object); 297 out.close (); 298 dos.finish (); 299 dos.close (); 300 301 return baos.toByteArray (); 302 } 303 304 protected Object inflate (byte[] compressedContent) throws IOException 305 { 306 if (compressedContent==null) 307 return null; 308 309 try 310 { 311 ObjectInputStream in = new ObjectInputStream (new InflaterInputStream (new ByteArrayInputStream (compressedContent))); 312 313 Object object = in.readObject (); 314 in.close (); 315 return object; 316 } 317 catch (Exception e) 318 { 319 throw new IOException (e.toString ()); 320 } 321 } 322 323 protected Hashtable appSessions = new Hashtable (); 324 protected Object lockAppSession = new Object (); 325 326 protected Hashtable getHashtableForApp (String appName) 327 { 328 if (this.appSessions == null) 329 this.appSessions = new Hashtable (); 331 Hashtable result = null; 332 333 synchronized (this.lockAppSession) 334 { 335 result = (Hashtable )this.appSessions.get (appName); 336 if (result == null) 337 { 338 result = new Hashtable (); 339 this.appSessions.put (appName, result); 340 } 341 } 342 return result; 343 } 344 345 public void createSession (String appName, Object keyId) 346 { 347 this._createSession (appName, keyId); 348 } 349 350 public PackagedSessionImpl _createSession (String appName, Object keyId) 351 { 352 Hashtable app = this.getHashtableForApp (appName); 353 PackagedSessionImpl result = new PackagedSessionImpl ((Serializable )keyId, null, this.myNodeName); 354 app.put (keyId, result); 355 return result; 356 } 357 358 public void setState (String appName, Object keyId, byte[] state) 359 throws java.rmi.RemoteException 360 { 361 Hashtable app = this.getHashtableForApp (appName); 362 PackagedSession ps = (PackagedSession)app.get (keyId); 363 364 if (ps == null) 365 { 366 ps = _createSession (appName, keyId); 367 } 368 369 boolean isStateIdentical = false; 370 371 Mutex mtx = getLock (appName, keyId); 372 try { 373 if (!mtx.attempt (0)) 374 throw new java.rmi.RemoteException ("Concurent calls on session object."); 375 } 376 catch (InterruptedException ie) { log.info (ie); return; } 377 378 try 379 { 380 isStateIdentical = ps.setState(state); 381 if (!isStateIdentical) 382 { 383 Object [] args = 384 {appName, ps}; 385 try 386 { 387 this.hapGeneral.callMethodOnCluster (this.sessionStateIdentifier, 388 "_setState", 389 args, 390 new Class []{String .class, PackagedSession.class}, true); 391 } 392 catch (Exception e) 393 { 394 log.error("operation failed", e); 395 } 396 } 397 } 398 finally 399 { 400 mtx.release (); 401 } 402 } 403 404 419 420 public void _setState (String appName, PackagedSession session) 421 { 422 Hashtable app = this.getHashtableForApp (appName); 423 PackagedSession ps = (PackagedSession)app.get (session.getKey ()); 424 425 if (ps == null) 426 { 427 ps = session; 428 synchronized (app) 429 { 430 app.put (ps.getKey (), ps); 431 } 432 } 433 else 434 { 435 Mutex mtx = getLock (appName, session.getKey ()); 436 try { mtx.acquire (); } catch (InterruptedException ie) { log.info (ie); return; } 437 438 try 439 { 440 if (ps.getOwner ().equals (this.myNodeName)) 441 { 442 ownedObjectExternallyModified (appName, session.getKey (), ps, session); 445 } 446 ps.update (session); 447 } 448 finally 449 { 450 mtx.release (); 451 } 452 } 453 454 } 455 456 public PackagedSession getState (String appName, Object keyId) 457 { 458 Hashtable app = this.getHashtableForApp (appName); 459 return (PackagedSession)app.get (keyId); 460 } 461 462 public PackagedSession getStateWithOwnership (String appName, Object keyId) throws java.rmi.RemoteException 463 { 464 return this.localTakeOwnership (appName, keyId); 465 } 466 467 public PackagedSession localTakeOwnership (String appName, Object keyId) throws java.rmi.RemoteException 468 { 469 Hashtable app = this.getHashtableForApp (appName); 470 PackagedSession ps = (PackagedSession)app.get (keyId); 471 472 if (ps == null) 476 return null; 477 478 Mutex mtx = getLock (appName, keyId); 479 480 try { 481 if (!mtx.attempt (0)) 482 throw new java.rmi.RemoteException ("Concurent calls on session object."); 483 } 484 catch (InterruptedException ie) { log.info (ie); return null; } 485 486 try 487 { 488 if (!ps.getOwner ().equals (this.myNodeName)) 489 { 490 Object [] args = 491 {appName, keyId, this.myNodeName, new Long (ps.getVersion ())}; 492 ArrayList answers = null; 493 try 494 { 495 answers = this.hapGeneral.callMethodOnCluster (this.sessionStateIdentifier, 496 "_setOwnership", 497 args, 498 new Class []{String .class, Object .class, 499 String .class, Long .class}, 500 true); 501 } 502 catch (Exception e) 503 { 504 log.error("operation failed", e); 505 } 506 507 if (answers != null && answers.contains (Boolean.FALSE)) 508 throw new java.rmi.RemoteException ("Concurent calls on session object."); 509 else 510 { 511 ps.setOwner (this.myNodeName); 512 return ps; 513 } 514 } 515 else 516 return ps; 517 } 518 finally 519 { 520 mtx.release (); 521 } 522 } 523 524 public Boolean _setOwnership (String appName, Object keyId, String newOwner, Long remoteVersion) 525 { 526 Hashtable app = this.getHashtableForApp (appName); 527 PackagedSession ps = (PackagedSession)app.get (keyId); 528 Boolean answer = Boolean.TRUE; 529 Mutex mtx = getLock (appName, keyId); 530 531 try { 532 if (!mtx.attempt (0)) 533 return Boolean.FALSE; 534 } 535 catch (InterruptedException ie) { log.info (ie); return Boolean.FALSE; } 536 537 try 538 { 539 if (!ps.getOwner ().equals (this.myNodeName)) 540 { 541 answer = Boolean.TRUE; 547 } 548 else if (ps.getVersion () > remoteVersion.longValue ()) 549 { 550 answer = Boolean.FALSE; 555 } 556 else 557 { 558 ps.setOwner (newOwner); 562 ownedObjectExternallyModified (appName, keyId, ps, ps); 563 answer = Boolean.TRUE; 564 } 565 } 566 finally 567 { 568 mtx.release (); 569 } 570 return answer; 571 } 572 573 public void takeOwnership (String appName, Object keyId) throws java.rmi.RemoteException 574 { 575 this.localTakeOwnership (appName, keyId); 576 } 577 578 public void removeSession (String appName, Object keyId) 579 { 580 Hashtable app = this.getHashtableForApp (appName); 581 if (app != null) 582 { 583 PackagedSession ps = (PackagedSession)app.remove (keyId); 584 if (ps != null) 585 { 586 removeLock (appName, keyId); 587 Object [] args = 588 { appName, keyId }; 589 try 590 { 591 this.hapGeneral.callMethodOnCluster (this.sessionStateIdentifier, 592 "_removeSession", 593 args, 594 new Class []{String .class, Object .class}, 595 true); 596 } 597 catch (Exception e) 598 { log.error("operation failed", e); } 599 } 600 } 601 } 602 603 public void _removeSession (String appName, Object keyId) 604 { 605 Hashtable app = this.getHashtableForApp (appName); 606 PackagedSession ps = null; 607 ps = (PackagedSession)app.remove (keyId); 608 if (ps != null && ps.getOwner ().equals (this.myNodeName)) 609 ownedObjectExternallyModified (appName, keyId, ps, ps); 610 611 removeLock (appName, keyId); 612 } 613 614 protected Hashtable listeners = new Hashtable (); 615 616 public synchronized void subscribe (String appName, HASessionStateListener listener) 617 { 618 Vector members = (Vector )listeners.get (appName); 619 if (members == null) 620 { 621 members = new Vector (); 622 listeners.put (appName, members); 623 } 624 if (!members.contains (listener)) 625 { 626 members.add (listener); 627 } 628 629 } 630 631 public synchronized void unsubscribe (String appName, HASessionStateListener listener) 632 { 633 Vector members = (Vector )listeners.get (appName); 634 if ((members != null) && members.contains (listener)) 635 members.remove (listener); 636 } 637 638 public void ownedObjectExternallyModified (String appName, Object key, PackagedSession oldSession, PackagedSession newSession) 639 { 640 Vector members = (Vector )listeners.get (appName); 641 if (members != null) 642 for (int i=0; i<members.size (); i++) 643 try 644 { 645 ((HASessionStateListener)members.elementAt (i)).sessionExternallyModified (newSession); 646 } 647 catch (Throwable t) 648 { 649 log.debug (t); 650 } 651 } 652 653 public HAPartition getCurrentHAPartition () 654 { 655 return this.hapGeneral; 656 } 657 658 659 protected boolean lockExists (String appName, Object key) 660 { 661 synchronized (this.locks) 662 { 663 HashMap ls = (HashMap )this.locks.get (appName); 664 if (ls == null) 665 return false; 666 667 return (ls.get(key)!=null); 668 } 669 } 670 671 protected Mutex getLock (String appName, Object key) 672 { 673 synchronized (this.locks) 674 { 675 HashMap ls = (HashMap )this.locks.get (appName); 676 if (ls == null) 677 { 678 ls = new HashMap (); 679 this.locks.put (appName, ls); 680 } 681 682 Mutex mutex = (Mutex)ls.get(key); 683 if (mutex == null) 684 { 685 mutex = new Mutex (); 686 ls.put (key, mutex); 687 } 688 689 return mutex; 690 } 691 } 692 693 protected void removeLock (String appName, Object key) 694 { 695 synchronized (this.locks) 696 { 697 HashMap ls = (HashMap )this.locks.get (appName); 698 if (ls == null) 699 return; 700 ls.remove (key); 701 } 702 } 703 704 } 705 | Popular Tags |