KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mortbay > j2ee > session > JGStore


1 // ========================================================================
2
// $Id: JGStore.java,v 1.7 2004/06/22 16:23:44 jules_gosnell Exp $
3
// Copyright 2002-2004 Mort Bay Consulting Pty. Ltd.
4
// ------------------------------------------------------------------------
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
// ========================================================================
15

16 package org.mortbay.j2ee.session;
17
18 //----------------------------------------
19
import java.lang.reflect.Method JavaDoc;
20 import java.util.HashSet JavaDoc;
21 import java.util.Iterator JavaDoc;
22 import java.util.Set JavaDoc;
23 import java.util.Timer JavaDoc;
24 import java.util.TimerTask JavaDoc;
25 import java.util.Vector JavaDoc;
26
27 import org.jfox.ioc.logger.Logger;
28 import org.jgroups.Address;
29 import org.jgroups.Channel;
30 import org.jgroups.JChannel;
31 import org.jgroups.MembershipListener;
32 import org.jgroups.MergeView;
33 import org.jgroups.Message;
34 import org.jgroups.MessageListener;
35 import org.jgroups.View;
36 import org.jgroups.blocks.GroupRequest;
37 import org.jgroups.blocks.MethodCall;
38 import org.jgroups.blocks.RpcDispatcher;
39
40 //----------------------------------------
41
// what happens if a member drops away for a while then comes back -
42
// can we deal with it ?
43

44 // quite a lot left to do:
45

46 // how do we bring ourselves or others up to date on startup whilst
47
// not missing any updates ? - talk to Bela
48

49 //how do we avoid the deserialisation cost like Sacha - store updates
50
//serialised and deserialise lazily (we would need a custom class so
51
//we don't get confused by a user storing their own Serialised objects
52
//?
53

54 // Talk to Sacha...
55

56 // It will be VERY important that nodes using this Store have their clocks synched...
57

58 /**
59  * publish changes to our state, receive and dispatch notification of
60  * changes in other states, initialise our state from other members,
61  * allow other members to initialise their state from us - all via
62  * JGroups...
63  *
64  * @author <a HREF="mailto:jules@mortbay.com">Jules Gosnell</a>
65  * @version 1.0
66  */

67 public class
68   JGStore
69   extends AbstractReplicatedStore
70   implements MessageListener, MembershipListener
71 {
72   protected Logger _log=Logger.getLogger(JGStore.class);
73
74   // this should be XML in the dd...
75
protected String JavaDoc _protocolStack=""+
76     "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;ucast_recv_buf_size=16000;ucast_send_buf_size=16000;mcast_send_buf_size=32000;mcast_recv_buf_size=64000;loopback=true):"+
77     "PING(timeout=2000;num_initial_members=3):"+
78     "MERGE2(min_interval=5000;max_interval=10000):"+
79     "FD_SOCK:VERIFY_SUSPECT(timeout=1500):"+
80     "pbcast.STABLE(desired_avg_gossip=20000):"+
81     "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):"+
82     "UNICAST(timeout=2000):"+
83     "FRAG(frag_size=8192;down_thread=false;up_thread=false):"+
84     "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true):"+
85     "pbcast.STATE_TRANSFER";
86
87   public String JavaDoc getProtocolStack() {return _protocolStack;}
88   public void setProtocolStack(String JavaDoc protocolStack) {_protocolStack=protocolStack;}
89
90   protected String JavaDoc _subClusterName="DefaultSubCluster";
91   public String JavaDoc getSubClusterName() {return _subClusterName;}
92   public void setSubClusterName(String JavaDoc subClusterName) {_subClusterName=subClusterName;}
93
94   protected int _retrievalTimeOut=20000; // 20 seconds
95
public int getRetrievalTimeOut() {return _retrievalTimeOut;}
96   public void setRetrievalTimeOut(int retrievalTimeOut) {_retrievalTimeOut=retrievalTimeOut;}
97
98   protected int _distributionModeInternal=GroupRequest.GET_ALL; // synchronous/non-sticky
99
protected int getDistributionModeInternal() {return _distributionModeInternal;}
100   protected void
101     setDistributionModeInternal(String JavaDoc distributionMode)
102     {
103       try
104       {
105     _distributionModeInternal=GroupRequest.class.getDeclaredField(distributionMode).getInt(GroupRequest.class);
106       }
107       catch (Exception JavaDoc e)
108       {
109     _log.error("could not convert "+distributionMode+" to GroupRequest field", e);
110       }
111     }
112
113   protected String JavaDoc _distributionMode="GET_ALL"; // synchronous/non-sticky
114
public String JavaDoc getDistributionMode() {return _distributionMode;}
115   public void
116     setDistributionMode(String JavaDoc distributionMode)
117     {
118       _distributionMode=distributionMode;
119       setDistributionModeInternal(_distributionMode);
120     }
121
122   protected int _distributionTimeOut=5000; // 5 seconds
123
public int getDistributionTimeOut() {return _distributionTimeOut;}
124   public void setDistributionTimeOut(int distributionTimeOut) {_distributionTimeOut=distributionTimeOut;}
125
126   public Object JavaDoc
127     clone()
128     {
129       _log.trace("cloning...");
130       JGStore jgs=(JGStore)super.clone();
131       jgs.setProtocolStack(getProtocolStack());
132       jgs.setSubClusterName(getSubClusterName());
133       jgs.setRetrievalTimeOut(getRetrievalTimeOut());
134       jgs.setDistributionMode(getDistributionMode());
135       jgs.setDistributionTimeOut(getDistributionTimeOut());
136       _log.trace("...cloned");
137
138       return jgs;
139     }
140
141   //----------------------------------------
142

143   protected Channel _channel;
144   protected RpcDispatcher _dispatcher;
145   protected Vector JavaDoc _members;
146
147   public String JavaDoc[]
148     getMembers()
149     {
150       Address[] addresses;
151       synchronized (_members)
152       {
153         addresses = (Address[]) _members.toArray(new Address[_members.size()]);
154       }
155
156       String JavaDoc[] members = new String JavaDoc[1+addresses.length];
157       members[0] = _channel.getLocalAddress().toString();
158       for (int i = 0; i < addresses.length; ++i)
159         members[1+i] = addresses[i].toString();
160       return members;
161     }
162
163   //----------------------------------------
164
// Store API - Store LifeCycle
165

166   protected void
167     init()
168     {
169       _log=Logger.getLogger(JGStore.class.getName()+"#"+getContextPath());
170       _log.trace("initialising...");
171
172       try
173       {
174     // start up our channel...
175
_channel=new JChannel(getProtocolStack()); // channel should be JBoss or new Jetty channel
176

177     MessageListener messageListener=this;
178     MembershipListener membershipListener=this;
179     Object JavaDoc serverObject=this;
180     _dispatcher=new RpcDispatcher(_channel, messageListener, membershipListener, serverObject);
181     _dispatcher.setMarshaller(new RpcDispatcher.Marshaller() {
182         public Object JavaDoc
183           objectFromByteBuffer(byte[] buf)
184         {
185           ClassLoader JavaDoc oldLoader=Thread.currentThread().getContextClassLoader();
186           try
187           {
188         Thread.currentThread().setContextClassLoader(getLoader());
189         return MarshallingInterceptor.demarshal(buf);
190           }
191           catch (Exception JavaDoc e)
192           {
193         _log.error("could not demarshal incoming update", e);
194           }
195           finally
196           {
197         Thread.currentThread().setContextClassLoader(oldLoader);
198           }
199           return null;
200         }
201
202         public byte[]
203           objectToByteBuffer(Object JavaDoc obj)
204         {
205           try
206           {
207         return MarshallingInterceptor.marshal(obj);
208           }
209           catch (Exception JavaDoc e)
210           {
211         _log.error("could not marshal outgoing update", e);
212           }
213           return null;
214         }
215       });
216     _log.debug("JGroups RpcDispatcher initialised");
217
218     _channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
219     _log.debug("JGroups Channel initialised");
220
221     View view=_channel.getView();
222     if (view!=null)
223       _members=(Vector JavaDoc)view.getMembers().clone();
224
225     _members=(_members==null)?new Vector JavaDoc():(Vector JavaDoc)_members.clone(); // we don't own it
226
if (_log.isDebugEnabled()) _log.debug("JGroups View: "+_members);
227     _members.remove(_channel.getLocalAddress());
228       }
229       catch (Exception JavaDoc e)
230       {
231     _log.error("could not initialise JGroups Channel and Dispatcher", e);
232       }
233
234       _log.trace("...initialised");
235     }
236
237   public String JavaDoc
238     getChannelName()
239     {
240       return "JETTY_HTTPSESSION_DISTRIBUTION:"+getContextPath()+"-"+getSubClusterName();
241     }
242
243   public void
244     start()
245     throws Exception JavaDoc
246     {
247       _log.trace("starting...");
248       _log.info("starting JGroups "+org.jgroups.Version.version);
249
250       super.start();
251
252       init();
253
254       String JavaDoc channelName=getChannelName();
255       if (_log.isDebugEnabled()) _log.debug("starting JGroups...: ("+channelName+")");
256
257       _channel.connect(channelName); // group should be on a per-context basis
258
_log.trace("JGroups Channel connected");
259       _dispatcher.start();
260       _log.trace("JGroups Dispatcher started");
261
262       if (!_channel.getState(null, getRetrievalTimeOut()))
263     _log.info("cluster state is null - this must be the first node");
264
265       _log.debug("...JGroups started");
266       _log.trace("...started");
267     }
268
269   public void
270     stop()
271     {
272       _log.trace("stopping...");
273       _timer.cancel();
274       _log.trace("Touch Timer stopped");
275
276       if (_log.isDebugEnabled()) _log.debug("stopping JGroups...: ("+getChannelName()+")");
277       _dispatcher.stop();
278       _log.trace("JGroups RpcDispatcher stopped");
279       _channel.disconnect();
280       _log.trace("JGroups Channel disconnected");
281       _log.debug("...JGroups stopped");
282
283       super.stop();
284       _log.trace("...stopped");
285     }
286
287   public void
288     destroy()
289     {
290       _log.trace("destroying...");
291       _timer=null;
292       _dispatcher=null;
293       _channel=null;
294
295       super.destroy();
296       _log.trace("...destroyed");
297     }
298
299   //----------------------------------------
300
// AbstractReplicatedStore API
301

302   protected Object JavaDoc _idsLock =new Object JavaDoc();
303   protected Set JavaDoc _ids =new HashSet JavaDoc();
304   protected Timer JavaDoc _timer =new Timer JavaDoc();
305   protected long _period =0;
306
307   protected class TouchTimerTask extends TimerTask JavaDoc
308   {
309     protected Set JavaDoc _oldIds=null;
310     protected Set JavaDoc _newIds=new HashSet JavaDoc();
311
312     public void
313       run()
314     {
315       synchronized (_idsLock)
316       {
317     _oldIds=_ids;
318     _ids=_newIds; // empty
319
_newIds=null;
320       }
321
322       // _log.info("TOUCHING SESSIONS: "+_oldIds);
323
publish(null, TOUCH_SESSIONS, new Object JavaDoc[] {_oldIds.toArray(new String JavaDoc[_oldIds.size()]), new Long JavaDoc(System.currentTimeMillis()+_period)});
324       _oldIds.clear();
325       _newIds=_oldIds; // recycle Set for next distribution
326
_oldIds=null;
327     }
328   }
329
330
331   public long getBatchPeriod(){return _period;}
332   public void setBatchPeriod(long period){_period=period;}
333
334   protected void
335     publish(String JavaDoc id, Method JavaDoc method, Object JavaDoc[] argInstances)
336     {
337       if (_log.isTraceEnabled())
338       {
339     String JavaDoc args="";
340     for (int i=0; i<argInstances.length; i++)
341       args+=(i>0?",":"")+argInstances[i];
342     if (_log.isTraceEnabled()) _log.trace("publishing method...: "+id+"."+method.getName()+"("+args+")");
343       }
344
345       if (_period>0)
346       {
347     if (method.equals(SET_LAST_ACCESSED_TIME))
348     {
349       // push into set to be touched when a timer expires...
350
synchronized (_idsLock)
351       {
352         // kick off timer as soon as something that needs publishing
353
// appears...
354
if (_ids.size()==0)
355         {
356           _timer.schedule(new TouchTimerTask(), _period); // TODO - reuse old task
357
_log.debug("Touch Timer scheduled: _period");
358         }
359
360         _ids.add(id);
361       };
362       return;
363     }
364     else if (method.equals(DESTROY_SESSION))
365     {
366       String JavaDoc tmp=(String JavaDoc)argInstances[0]; // id in factory methods
367
// System.out.println("LOCAL DESTRUCTION : "+tmp); // arg[0] is the id
368
// this session has been destroyed locally...
369
synchronized (_idsLock)
370       {
371         _ids.remove(tmp);
372       }
373     }
374       }
375
376       try
377       {
378     Class JavaDoc[] tmp={String JavaDoc.class, Integer JavaDoc.class, Object JavaDoc[].class};
379     MethodCall mc = new MethodCall(getClass().getMethod("dispatch",tmp));
380         Object JavaDoc[] args = new Object JavaDoc[3];
381         args[0] = id;
382         args[1] = _methodToInteger.get(method.getName());
383         args[2] = argInstances;
384         mc.setArgs(args);
385
386     // we need some way of synchronising _member read/write-ing...
387
_dispatcher.callRemoteMethods(_members,
388                       mc,
389                       getDistributionModeInternal(),
390                       getDistributionTimeOut());
391     _log.trace("...method published");
392       }
393       catch(Exception JavaDoc e)
394       {
395     _log.error("problem publishing change in state over JGroups", e);
396       }
397     }
398
399   // JG doesn't find this method in our superclass ...
400
public void
401     dispatch(String JavaDoc id, Integer JavaDoc method, Object JavaDoc[] argInstances)
402     {
403       Method JavaDoc m=_integerToMethod[method.intValue()];
404       if (_log.isTraceEnabled())
405       {
406     String JavaDoc args="";
407     for (int i=0; i<argInstances.length; i++)
408       args+=(i>0?",":"")+argInstances[i];
409     if (_log.isTraceEnabled()) _log.trace("dispatching method... : "+id+"."+_integerToMethod[method.intValue()].getName()+"("+args+")");
410       }
411
412       if (m.equals(DESTROY_SESSION))
413       {
414     String JavaDoc tmp=(String JavaDoc)argInstances[0]; // id in factory methods
415
// System.out.println("REMOTE DESTRUCTION : "+tmp); // arg[0] is the id
416
// this session has been destroyed remotely...
417
synchronized (_idsLock)
418     {
419       _ids.remove(tmp);
420     }
421       }
422
423       // _log.info("dispatching: "+id+" - "+_integerToMethod[method.intValue()]);
424

425       // we should check the context name here, or not bother sending it...
426

427       ClassLoader JavaDoc oldLoader=Thread.currentThread().getContextClassLoader();
428       try
429       {
430     Thread.currentThread().setContextClassLoader(getLoader());
431     super.dispatch(id, method, argInstances);
432       }
433       finally
434       {
435     Thread.currentThread().setContextClassLoader(oldLoader);
436       }
437       _log.trace("...method dispatched");
438     }
439
440   //----------------------------------------
441
// 'MessageListener' API
442

443   /**
444    * receive notification of someone else's change in state
445    *
446    * @param msg a <code>Message</code> value
447    */

448   public void
449     receive(Message msg)
450     {
451       // _log.info("**************** RECEIVE CALLED *********************");
452
byte[] buf=msg.getBuffer();
453     }
454
455   /**
456    * copy our state to be used to initialise another store...
457    *
458    * @return an <code>Object</code> value
459    */

460
461   // should we cache the state, in case two new nodes come up together ?
462

463   public synchronized byte[]
464                 getState()
465     {
466       ClassLoader JavaDoc oldLoader=Thread.currentThread().getContextClassLoader();
467       try
468       {
469     Thread.currentThread().setContextClassLoader(getLoader());
470     _log.info("initialising another store from our current state");
471
472     // this is a bit problematic - since we really need to freeze
473
// every session before we can dump them... - TODO
474
LocalState[] state;
475     synchronized (_sessions)
476     {
477       _log.info("sending "+_sessions.size()+" sessions");
478
479       state=new LocalState[_sessions.size()];
480       int j=0;
481       for (Iterator JavaDoc i=_sessions.values().iterator(); i.hasNext();)
482         state[j++]=(LocalState)i.next();
483     }
484
485     Object JavaDoc[] data={new Long JavaDoc(System.currentTimeMillis()), state};
486     try
487     {
488       return MarshallingInterceptor.marshal(data);
489     }
490     catch (Exception JavaDoc e)
491     {
492       _log.error ("Unable to getState from JGroups: ", e);
493       return null;
494     }
495       }
496       finally
497       {
498     Thread.currentThread().setContextClassLoader(oldLoader);
499       }
500     }
501
502   /**
503    * initialise ourself from the current state of another store...
504    *
505    * @param new_state an <code>Object</code> value
506    */

507   public synchronized void
508     setState (byte[] tmp)
509     {
510       if (tmp!=null)
511       {
512     _log.info("initialising our state from another Store");
513
514     Object JavaDoc[] data = null;
515     try
516     {
517       // TODO - this needs to be loaded into webapps ClassLoader,
518
// then we can lose the MarshallingInterceptor...
519
data=(Object JavaDoc[])MarshallingInterceptor.demarshal(tmp);
520     }
521     catch (Exception JavaDoc e)
522     {
523       _log.error ("Unable to setState from JGroups: ", e);
524       return;
525     }
526
527     try
528     {
529     AbstractReplicatedStore.setReplicating(true);
530
531     long remoteTime=((Long JavaDoc)data[0]).longValue();
532     long localTime=System.currentTimeMillis();
533     long disparity=(localTime-remoteTime)/1000;
534     _log.info("time disparity: "+disparity+" secs");
535
536     LocalState[] state=(LocalState[])data[1];
537     _log.info("receiving "+state.length+" sessions...");
538
539     for (int i=0; i<state.length; i++)
540     {
541       LocalState ls=state[i];
542       _sessions.put(ls.getId(), ls);
543       getManager().getHttpSession(ls.getId()); // should cause creation of corresponding InterceptorStack
544
}
545       }
546      finally
547      {
548         AbstractReplicatedStore.setReplicating(false);
549      }
550      }
551     }
552
553   //----------------------------------------
554
// 'MembershipListener' API
555

556   // Block sending and receiving of messages until viewAccepted() is called
557
public void
558     block()
559     {
560       _log.trace("handling JGroups block()...");
561       _log.trace("...JGroups block() handled");
562     }
563
564   // Called when a member is suspected
565
public synchronized void
566     suspect(Address suspected_mbr)
567     {
568       if (_log.isTraceEnabled()) _log.trace("handling JGroups suspect("+suspected_mbr+")...");
569       _log.warn("cluster suspects member may have been lost: "+suspected_mbr);
570       _log.trace("...JGroups suspect() handled");
571     }
572
573   // Called when channel membership changes
574
public synchronized void
575     viewAccepted(View newView)
576     {
577       if (_log.isTraceEnabled()) _log.trace("handling JGroups viewAccepted("+newView+")...");
578
579       // this is meant to happen if a network split is healed and two
580
// clusters try to reconcile their separate states into one -
581
// an unlikely event.
582
if(newView instanceof MergeView)
583     _log.warn("NYI - merging: view is " + newView);
584
585       Vector JavaDoc newMembers=newView.getMembers();
586
587       if (newMembers != null)
588       {
589     _members.clear();
590     _members.addAll(newMembers);
591     _log.info("JGroups View: "+_members);
592     _members.remove(_channel.getLocalAddress());
593       }
594
595       _log.trace("...JGroups viewAccepted() handled");
596     }
597 }
598
Popular Tags