1 16 package org.mortbay.j2ee.session; 17 18 import java.lang.reflect.Method ; 20 import java.util.HashSet ; 21 import java.util.Iterator ; 22 import java.util.Set ; 23 import java.util.Timer ; 24 import java.util.TimerTask ; 25 import java.util.Vector ; 26 27 import org.jfox.ioc.logger.Logger; 28 import org.jgroups.Address; 29 import org.jgroups.Channel; 30 import org.jgroups.JChannel; 31 import org.jgroups.MembershipListener; 32 import org.jgroups.MergeView; 33 import org.jgroups.Message; 34 import org.jgroups.MessageListener; 35 import org.jgroups.View; 36 import org.jgroups.blocks.GroupRequest; 37 import org.jgroups.blocks.MethodCall; 38 import org.jgroups.blocks.RpcDispatcher; 39 40 44 46 49 54 56 58 67 public class 68 JGStore 69 extends AbstractReplicatedStore 70 implements MessageListener, MembershipListener 71 { 72 protected Logger _log=Logger.getLogger(JGStore.class); 73 74 protected String _protocolStack=""+ 76 "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;ucast_recv_buf_size=16000;ucast_send_buf_size=16000;mcast_send_buf_size=32000;mcast_recv_buf_size=64000;loopback=true):"+ 77 "PING(timeout=2000;num_initial_members=3):"+ 78 "MERGE2(min_interval=5000;max_interval=10000):"+ 79 "FD_SOCK:VERIFY_SUSPECT(timeout=1500):"+ 80 "pbcast.STABLE(desired_avg_gossip=20000):"+ 81 "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):"+ 82 "UNICAST(timeout=2000):"+ 83 "FRAG(frag_size=8192;down_thread=false;up_thread=false):"+ 84 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true):"+ 85 "pbcast.STATE_TRANSFER"; 86 87 public String getProtocolStack() {return _protocolStack;} 88 public void setProtocolStack(String protocolStack) {_protocolStack=protocolStack;} 89 90 protected String _subClusterName="DefaultSubCluster"; 91 public String getSubClusterName() {return _subClusterName;} 92 public void setSubClusterName(String subClusterName) {_subClusterName=subClusterName;} 93 94 protected int _retrievalTimeOut=20000; public int getRetrievalTimeOut() {return _retrievalTimeOut;} 96 public void setRetrievalTimeOut(int retrievalTimeOut) {_retrievalTimeOut=retrievalTimeOut;} 97 98 protected int _distributionModeInternal=GroupRequest.GET_ALL; protected int getDistributionModeInternal() {return _distributionModeInternal;} 100 protected void 101 setDistributionModeInternal(String distributionMode) 102 { 103 try 104 { 105 _distributionModeInternal=GroupRequest.class.getDeclaredField(distributionMode).getInt(GroupRequest.class); 106 } 107 catch (Exception e) 108 { 109 _log.error("could not convert "+distributionMode+" to GroupRequest field", e); 110 } 111 } 112 113 protected String _distributionMode="GET_ALL"; public String getDistributionMode() {return _distributionMode;} 115 public void 116 setDistributionMode(String distributionMode) 117 { 118 _distributionMode=distributionMode; 119 setDistributionModeInternal(_distributionMode); 120 } 121 122 protected int _distributionTimeOut=5000; public int getDistributionTimeOut() {return _distributionTimeOut;} 124 public void setDistributionTimeOut(int distributionTimeOut) {_distributionTimeOut=distributionTimeOut;} 125 126 public Object 127 clone() 128 { 129 _log.trace("cloning..."); 130 JGStore jgs=(JGStore)super.clone(); 131 jgs.setProtocolStack(getProtocolStack()); 132 jgs.setSubClusterName(getSubClusterName()); 133 jgs.setRetrievalTimeOut(getRetrievalTimeOut()); 134 jgs.setDistributionMode(getDistributionMode()); 135 jgs.setDistributionTimeOut(getDistributionTimeOut()); 136 _log.trace("...cloned"); 137 138 return jgs; 139 } 140 141 143 protected Channel _channel; 144 protected RpcDispatcher _dispatcher; 145 protected Vector _members; 146 147 public String [] 148 getMembers() 149 { 150 Address[] addresses; 151 synchronized (_members) 152 { 153 addresses = (Address[]) _members.toArray(new Address[_members.size()]); 154 } 155 156 String [] members = new String [1+addresses.length]; 157 members[0] = _channel.getLocalAddress().toString(); 158 for (int i = 0; i < addresses.length; ++i) 159 members[1+i] = addresses[i].toString(); 160 return members; 161 } 162 163 166 protected void 167 init() 168 { 169 _log=Logger.getLogger(JGStore.class.getName()+"#"+getContextPath()); 170 _log.trace("initialising..."); 171 172 try 173 { 174 _channel=new JChannel(getProtocolStack()); 177 MessageListener messageListener=this; 178 MembershipListener membershipListener=this; 179 Object serverObject=this; 180 _dispatcher=new RpcDispatcher(_channel, messageListener, membershipListener, serverObject); 181 _dispatcher.setMarshaller(new RpcDispatcher.Marshaller() { 182 public Object 183 objectFromByteBuffer(byte[] buf) 184 { 185 ClassLoader oldLoader=Thread.currentThread().getContextClassLoader(); 186 try 187 { 188 Thread.currentThread().setContextClassLoader(getLoader()); 189 return MarshallingInterceptor.demarshal(buf); 190 } 191 catch (Exception e) 192 { 193 _log.error("could not demarshal incoming update", e); 194 } 195 finally 196 { 197 Thread.currentThread().setContextClassLoader(oldLoader); 198 } 199 return null; 200 } 201 202 public byte[] 203 objectToByteBuffer(Object obj) 204 { 205 try 206 { 207 return MarshallingInterceptor.marshal(obj); 208 } 209 catch (Exception e) 210 { 211 _log.error("could not marshal outgoing update", e); 212 } 213 return null; 214 } 215 }); 216 _log.debug("JGroups RpcDispatcher initialised"); 217 218 _channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 219 _log.debug("JGroups Channel initialised"); 220 221 View view=_channel.getView(); 222 if (view!=null) 223 _members=(Vector )view.getMembers().clone(); 224 225 _members=(_members==null)?new Vector ():(Vector )_members.clone(); if (_log.isDebugEnabled()) _log.debug("JGroups View: "+_members); 227 _members.remove(_channel.getLocalAddress()); 228 } 229 catch (Exception e) 230 { 231 _log.error("could not initialise JGroups Channel and Dispatcher", e); 232 } 233 234 _log.trace("...initialised"); 235 } 236 237 public String 238 getChannelName() 239 { 240 return "JETTY_HTTPSESSION_DISTRIBUTION:"+getContextPath()+"-"+getSubClusterName(); 241 } 242 243 public void 244 start() 245 throws Exception 246 { 247 _log.trace("starting..."); 248 _log.info("starting JGroups "+org.jgroups.Version.version); 249 250 super.start(); 251 252 init(); 253 254 String channelName=getChannelName(); 255 if (_log.isDebugEnabled()) _log.debug("starting JGroups...: ("+channelName+")"); 256 257 _channel.connect(channelName); _log.trace("JGroups Channel connected"); 259 _dispatcher.start(); 260 _log.trace("JGroups Dispatcher started"); 261 262 if (!_channel.getState(null, getRetrievalTimeOut())) 263 _log.info("cluster state is null - this must be the first node"); 264 265 _log.debug("...JGroups started"); 266 _log.trace("...started"); 267 } 268 269 public void 270 stop() 271 { 272 _log.trace("stopping..."); 273 _timer.cancel(); 274 _log.trace("Touch Timer stopped"); 275 276 if (_log.isDebugEnabled()) _log.debug("stopping JGroups...: ("+getChannelName()+")"); 277 _dispatcher.stop(); 278 _log.trace("JGroups RpcDispatcher stopped"); 279 _channel.disconnect(); 280 _log.trace("JGroups Channel disconnected"); 281 _log.debug("...JGroups stopped"); 282 283 super.stop(); 284 _log.trace("...stopped"); 285 } 286 287 public void 288 destroy() 289 { 290 _log.trace("destroying..."); 291 _timer=null; 292 _dispatcher=null; 293 _channel=null; 294 295 super.destroy(); 296 _log.trace("...destroyed"); 297 } 298 299 302 protected Object _idsLock =new Object (); 303 protected Set _ids =new HashSet (); 304 protected Timer _timer =new Timer (); 305 protected long _period =0; 306 307 protected class TouchTimerTask extends TimerTask 308 { 309 protected Set _oldIds=null; 310 protected Set _newIds=new HashSet (); 311 312 public void 313 run() 314 { 315 synchronized (_idsLock) 316 { 317 _oldIds=_ids; 318 _ids=_newIds; _newIds=null; 320 } 321 322 publish(null, TOUCH_SESSIONS, new Object [] {_oldIds.toArray(new String [_oldIds.size()]), new Long (System.currentTimeMillis()+_period)}); 324 _oldIds.clear(); 325 _newIds=_oldIds; _oldIds=null; 327 } 328 } 329 330 331 public long getBatchPeriod(){return _period;} 332 public void setBatchPeriod(long period){_period=period;} 333 334 protected void 335 publish(String id, Method method, Object [] argInstances) 336 { 337 if (_log.isTraceEnabled()) 338 { 339 String args=""; 340 for (int i=0; i<argInstances.length; i++) 341 args+=(i>0?",":"")+argInstances[i]; 342 if (_log.isTraceEnabled()) _log.trace("publishing method...: "+id+"."+method.getName()+"("+args+")"); 343 } 344 345 if (_period>0) 346 { 347 if (method.equals(SET_LAST_ACCESSED_TIME)) 348 { 349 synchronized (_idsLock) 351 { 352 if (_ids.size()==0) 355 { 356 _timer.schedule(new TouchTimerTask(), _period); _log.debug("Touch Timer scheduled: _period"); 358 } 359 360 _ids.add(id); 361 }; 362 return; 363 } 364 else if (method.equals(DESTROY_SESSION)) 365 { 366 String tmp=(String )argInstances[0]; synchronized (_idsLock) 370 { 371 _ids.remove(tmp); 372 } 373 } 374 } 375 376 try 377 { 378 Class [] tmp={String .class, Integer .class, Object [].class}; 379 MethodCall mc = new MethodCall(getClass().getMethod("dispatch",tmp)); 380 Object [] args = new Object [3]; 381 args[0] = id; 382 args[1] = _methodToInteger.get(method.getName()); 383 args[2] = argInstances; 384 mc.setArgs(args); 385 386 _dispatcher.callRemoteMethods(_members, 388 mc, 389 getDistributionModeInternal(), 390 getDistributionTimeOut()); 391 _log.trace("...method published"); 392 } 393 catch(Exception e) 394 { 395 _log.error("problem publishing change in state over JGroups", e); 396 } 397 } 398 399 public void 401 dispatch(String id, Integer method, Object [] argInstances) 402 { 403 Method m=_integerToMethod[method.intValue()]; 404 if (_log.isTraceEnabled()) 405 { 406 String args=""; 407 for (int i=0; i<argInstances.length; i++) 408 args+=(i>0?",":"")+argInstances[i]; 409 if (_log.isTraceEnabled()) _log.trace("dispatching method... : "+id+"."+_integerToMethod[method.intValue()].getName()+"("+args+")"); 410 } 411 412 if (m.equals(DESTROY_SESSION)) 413 { 414 String tmp=(String )argInstances[0]; synchronized (_idsLock) 418 { 419 _ids.remove(tmp); 420 } 421 } 422 423 425 427 ClassLoader oldLoader=Thread.currentThread().getContextClassLoader(); 428 try 429 { 430 Thread.currentThread().setContextClassLoader(getLoader()); 431 super.dispatch(id, method, argInstances); 432 } 433 finally 434 { 435 Thread.currentThread().setContextClassLoader(oldLoader); 436 } 437 _log.trace("...method dispatched"); 438 } 439 440 443 448 public void 449 receive(Message msg) 450 { 451 byte[] buf=msg.getBuffer(); 453 } 454 455 460 461 463 public synchronized byte[] 464 getState() 465 { 466 ClassLoader oldLoader=Thread.currentThread().getContextClassLoader(); 467 try 468 { 469 Thread.currentThread().setContextClassLoader(getLoader()); 470 _log.info("initialising another store from our current state"); 471 472 LocalState[] state; 475 synchronized (_sessions) 476 { 477 _log.info("sending "+_sessions.size()+" sessions"); 478 479 state=new LocalState[_sessions.size()]; 480 int j=0; 481 for (Iterator i=_sessions.values().iterator(); i.hasNext();) 482 state[j++]=(LocalState)i.next(); 483 } 484 485 Object [] data={new Long (System.currentTimeMillis()), state}; 486 try 487 { 488 return MarshallingInterceptor.marshal(data); 489 } 490 catch (Exception e) 491 { 492 _log.error ("Unable to getState from JGroups: ", e); 493 return null; 494 } 495 } 496 finally 497 { 498 Thread.currentThread().setContextClassLoader(oldLoader); 499 } 500 } 501 502 507 public synchronized void 508 setState (byte[] tmp) 509 { 510 if (tmp!=null) 511 { 512 _log.info("initialising our state from another Store"); 513 514 Object [] data = null; 515 try 516 { 517 data=(Object [])MarshallingInterceptor.demarshal(tmp); 520 } 521 catch (Exception e) 522 { 523 _log.error ("Unable to setState from JGroups: ", e); 524 return; 525 } 526 527 try 528 { 529 AbstractReplicatedStore.setReplicating(true); 530 531 long remoteTime=((Long )data[0]).longValue(); 532 long localTime=System.currentTimeMillis(); 533 long disparity=(localTime-remoteTime)/1000; 534 _log.info("time disparity: "+disparity+" secs"); 535 536 LocalState[] state=(LocalState[])data[1]; 537 _log.info("receiving "+state.length+" sessions..."); 538 539 for (int i=0; i<state.length; i++) 540 { 541 LocalState ls=state[i]; 542 _sessions.put(ls.getId(), ls); 543 getManager().getHttpSession(ls.getId()); } 545 } 546 finally 547 { 548 AbstractReplicatedStore.setReplicating(false); 549 } 550 } 551 } 552 553 556 public void 558 block() 559 { 560 _log.trace("handling JGroups block()..."); 561 _log.trace("...JGroups block() handled"); 562 } 563 564 public synchronized void 566 suspect(Address suspected_mbr) 567 { 568 if (_log.isTraceEnabled()) _log.trace("handling JGroups suspect("+suspected_mbr+")..."); 569 _log.warn("cluster suspects member may have been lost: "+suspected_mbr); 570 _log.trace("...JGroups suspect() handled"); 571 } 572 573 public synchronized void 575 viewAccepted(View newView) 576 { 577 if (_log.isTraceEnabled()) _log.trace("handling JGroups viewAccepted("+newView+")..."); 578 579 if(newView instanceof MergeView) 583 _log.warn("NYI - merging: view is " + newView); 584 585 Vector newMembers=newView.getMembers(); 586 587 if (newMembers != null) 588 { 589 _members.clear(); 590 _members.addAll(newMembers); 591 _log.info("JGroups View: "+_members); 592 _members.remove(_channel.getLocalAddress()); 593 } 594 595 _log.trace("...JGroups viewAccepted() handled"); 596 } 597 } 598 | Popular Tags |