KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > mux > Multiplexer


1 package org.jgroups.mux;
2
3 import org.apache.commons.logging.Log;
4 import org.apache.commons.logging.LogFactory;
5 import org.jgroups.*;
6 import org.jgroups.TimeoutException;
7 import org.jgroups.protocols.pbcast.FLUSH;
8 import org.jgroups.stack.StateTransferInfo;
9 import org.jgroups.util.FIFOMessageQueue;
10 import org.jgroups.util.Promise;
11 import org.jgroups.util.Util;
12
13 import java.util.*;
14 import java.util.concurrent.*;
15 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
16
17 /**
18  * Used for dispatching incoming messages. The Multiplexer implements UpHandler and registers with the associated
19  * JChannel (there can only be 1 Multiplexer per JChannel). When up() is called with a message, the header of the
20  * message is removed and the MuxChannel corresponding to the header's service ID is retrieved from the map,
21  * and MuxChannel.up() is called with the message.
22  * @author Bela Ban
23  * @version $Id: Multiplexer.java,v 1.63 2007/07/13 06:31:59 belaban Exp $
24  */

25 public class Multiplexer implements UpHandler {
26     /** Map<String,MuxChannel>. Maintains the mapping between service IDs and their associated MuxChannels */
27     private final ConcurrentMap<String JavaDoc,MuxChannel> services=new ConcurrentHashMap<String JavaDoc,MuxChannel>();
28     private final JChannel channel;
29     static final Log log=LogFactory.getLog(Multiplexer.class);
30     static final String JavaDoc SEPARATOR="::";
31     static final short SEPARATOR_LEN=(short)SEPARATOR.length();
32     static final String JavaDoc NAME="MUX";
33
34     private boolean flush_present=true;
35     private boolean blocked=false;
36
37     /** Thread pool to concurrently process messages sent to different services */
38     private ExecutorService thread_pool;
39
40     /** To make sure messages sent to different services are processed concurrently (using the thread pool above), but
41      * messages to the same service are processed FIFO */

42     private FIFOMessageQueue<String JavaDoc,Runnable JavaDoc> fifo_queue=new FIFOMessageQueue<String JavaDoc,Runnable JavaDoc>();
43
44
45     /** Cluster view */
46     View view=null;
47
48     Address local_addr=null;
49
50     /** Map<String,Boolean>. Map of service IDs and booleans that determine whether getState() has already been called */
51     private final Map<String JavaDoc,Boolean JavaDoc> state_transfer_listeners=new HashMap<String JavaDoc,Boolean JavaDoc>();
52
53     /** Map<String,List<Address>>. A map of services as keys and lists of hosts as values */
54     private final Map<String JavaDoc,List<Address>> service_state=new HashMap<String JavaDoc,List<Address>>();
55
56     /** Used to wait on service state information */
57     private final Promise service_state_promise=new Promise();
58
59     /** Map<Address, Set<String>>. Keys are senders, values are a set of services hosted by that sender.
60      * Used to collect responses to LIST_SERVICES_REQ */

61     private final Map<Address, Set<String JavaDoc>> service_responses=new HashMap<Address, Set<String JavaDoc>>();
62
63     private long SERVICES_RSP_TIMEOUT=10000;
64
65     public Multiplexer() {
66         this.channel=null;
67         flush_present=isFlushPresent();
68         
69         // threadpool is enabled by default
70
if(Global.getPropertyAsBoolean(Global.MUX_ENABLED, true)){
71             thread_pool=createThreadPool();
72         }
73     }
74
75     public Multiplexer(JChannel channel) {
76         this.channel=channel;
77         this.channel.setUpHandler(this);
78         this.channel.setOpt(Channel.BLOCK, Boolean.TRUE); // we want to handle BLOCK events ourselves
79
flush_present=isFlushPresent();
80         
81         //threadpool is enabled by default
82
if(Global.getPropertyAsBoolean(Global.MUX_ENABLED, true)){
83             thread_pool=createThreadPool();
84         }
85     }
86
87     /**
88      * @deprecated Use ${link #getServiceIds()} instead
89      * @return The set of service IDs
90      */

91     public Set getApplicationIds() {
92         return services != null? Collections.unmodifiableSet(services.keySet()) : null;
93     }
94
95     public Set<String JavaDoc> getServiceIds() {
96         return services != null? Collections.unmodifiableSet(services.keySet()) : null;
97     }
98
99
100     public long getServicesResponseTimeout() {
101         return SERVICES_RSP_TIMEOUT;
102     }
103
104     public void setServicesResponseTimeout(long services_rsp_timeout) {
105         this.SERVICES_RSP_TIMEOUT=services_rsp_timeout;
106     }
107
108     /** Returns a copy of the current view <em>minus</em> the nodes on which service service_id is <em>not</em> running
109      *
110      * @param service_id
111      * @return The service view
112      */

113     public View getServiceView(String JavaDoc service_id) {
114         List hosts=service_state.get(service_id);
115         if(hosts == null) return null;
116         return generateServiceView(hosts);
117     }
118
119     public boolean stateTransferListenersPresent() {
120         return state_transfer_listeners != null && !state_transfer_listeners.isEmpty();
121     }
122     
123     public synchronized void registerForStateTransfer(String JavaDoc appl_id, String JavaDoc substate_id) {
124         String JavaDoc key=appl_id;
125         if(substate_id != null && substate_id.length() > 0)
126             key+=SEPARATOR + substate_id;
127         state_transfer_listeners.put(key, Boolean.FALSE);
128     }
129
130     public synchronized boolean getState(Address target, String JavaDoc id, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
131         if(state_transfer_listeners.isEmpty())
132             return false;
133         
134         for(Iterator<Map.Entry<String JavaDoc,Boolean JavaDoc>> it=state_transfer_listeners.entrySet().iterator(); it.hasNext();) {
135             Map.Entry<String JavaDoc, Boolean JavaDoc> entry = it.next();
136             String JavaDoc key=entry.getKey();
137             int index=key.indexOf(SEPARATOR);
138             boolean match;
139             if(index > -1) {
140                 String JavaDoc tmp=key.substring(0, index);
141                 match=id.equals(tmp);
142             }
143             else {
144                 match=id.equals(key);
145             }
146             if(match) {
147                 entry.setValue(Boolean.TRUE);
148                 break;
149             }
150         }
151
152         Collection values=state_transfer_listeners.values();
153         boolean all_true=Util.all(values, Boolean.TRUE);
154         if(!all_true)
155             return true; // pseudo
156

157         boolean rc=false;
158         Set<String JavaDoc> keys=new HashSet<String JavaDoc>(state_transfer_listeners.keySet());
159         rc=fetchServiceStates(target, keys, timeout);
160         state_transfer_listeners.clear();
161         return rc;
162     }
163
164     protected static ThreadPoolExecutor createThreadPool() {
165         int min_threads=1, max_threads=4;
166         long keep_alive=30000;
167
168         ThreadFactory factory=new ThreadFactory() {
169             AtomicInteger JavaDoc num = new AtomicInteger JavaDoc(1);
170
171             ThreadGroup JavaDoc mux_threads=new ThreadGroup JavaDoc(Util.getGlobalThreadGroup(), "MultiplexerThreads");
172
173             public Thread JavaDoc newThread(Runnable JavaDoc command) {
174                 Thread JavaDoc ret=new Thread JavaDoc(mux_threads, command, "Multiplexer-" + num.incrementAndGet());
175                 ret.setDaemon(true);
176                 return ret;
177             }
178         };
179
180         min_threads=Global.getPropertyAsInteger(Global.MUX_MIN_THREADS, min_threads);
181         max_threads=Global.getPropertyAsInteger(Global.MUX_MAX_THREADS, max_threads);
182         keep_alive=Global.getPropertyAsLong(Global.MUX_KEEPALIVE, keep_alive);
183
184         return new ThreadPoolExecutor(min_threads, max_threads, keep_alive, TimeUnit.MILLISECONDS,
185                                       new SynchronousQueue<Runnable JavaDoc>(), factory,
186                                       new ThreadPoolExecutor.CallerRunsPolicy());
187     }
188
189     protected void shutdownThreadPool() {
190         if(thread_pool != null && !thread_pool.isShutdown()) {
191             thread_pool.shutdownNow();
192             try {
193                 thread_pool.awaitTermination(Global.THREADPOOL_SHUTDOWN_WAIT_TIME, TimeUnit.MILLISECONDS);
194             }
195             catch(InterruptedException JavaDoc e) {
196             }
197         }
198     }
199
200     /** Fetches the app states for all service IDs in keys.
201      * The keys are a duplicate list, so it cannot be modified by the caller of this method
202      * @param keys
203      */

204     private boolean fetchServiceStates(Address target, Set<String JavaDoc> keys, long timeout) throws ChannelClosedException, ChannelNotConnectedException {
205         boolean rc, all_rcs=true;
206         for(String JavaDoc stateId: keys) {
207             rc=channel.getState(target, stateId, timeout);
208             if(!rc)
209                 all_rcs=false;
210         }
211         return all_rcs;
212     }
213
214
215     /**
216      * Fetches the map of services and hosts from the coordinator (Multiplexer). No-op if we are the coordinator
217      */

218     public void fetchServiceInformation() throws Exception JavaDoc {
219         while(true) {
220             Address coord=getCoordinator(), local_address=channel != null? channel.getLocalAddress() : null;
221             boolean is_coord=coord != null && local_address != null && local_address.equals(coord);
222             if(is_coord) {
223                 if(log.isTraceEnabled())
224                     log.trace("I'm coordinator, will not fetch service state information");
225                 break;
226             }
227
228             ServiceInfo si=new ServiceInfo(ServiceInfo.STATE_REQ, null, null, null);
229             MuxHeader hdr=new MuxHeader(si);
230             Message state_req=new Message(coord, null, null);
231             state_req.putHeader(NAME, hdr);
232             service_state_promise.reset();
233             channel.send(state_req);
234
235             try {
236                 byte[] state=(byte[])service_state_promise.getResultWithTimeout(2000);
237                 if(state != null) {
238                     Map<String JavaDoc,List<Address>> new_state=(Map<String JavaDoc,List<Address>>)Util.objectFromByteBuffer(state);
239                     synchronized(service_state) {
240                         service_state.clear();
241                         service_state.putAll(new_state);
242                     }
243                     if(log.isTraceEnabled())
244                         log.trace("service state was set successfully (" + service_state.size() + " entries)");
245                 }
246                 else {
247                     if(log.isWarnEnabled())
248                         log.warn("received service state was null");
249                 }
250                 break;
251             }
252             catch(TimeoutException e) {
253                 if(log.isTraceEnabled())
254                     log.trace("timed out waiting for service state from " + coord + ", retrying");
255             }
256         }
257     }
258
259
260
261     public void sendServiceUpMessage(String JavaDoc service, Address host,boolean bypassFlush) throws Exception JavaDoc {
262         sendServiceMessage(ServiceInfo.SERVICE_UP, service, host,bypassFlush, null);
263         if(local_addr != null && host != null && local_addr.equals(host))
264             handleServiceUp(service, host, false);
265     }
266
267
268     public void sendServiceDownMessage(String JavaDoc service, Address host,boolean bypassFlush) throws Exception JavaDoc {
269         sendServiceMessage(ServiceInfo.SERVICE_DOWN, service, host,bypassFlush, null);
270         if(local_addr != null && host != null && local_addr.equals(host))
271             handleServiceDown(service, host, false);
272     }
273
274
275
276
277     /**
278      * Remove mux header and dispatch to correct MuxChannel
279      * @param evt
280      * @return
281      */

282     public Object JavaDoc up(final Event evt) {
283         switch(evt.getType()) {
284             case Event.MSG:
285                 final Message msg=(Message)evt.getArg();
286                 final MuxHeader hdr=(MuxHeader)msg.getHeader(NAME);
287                 if(hdr == null) {
288                     log.error("MuxHeader not present - discarding message " + msg);
289                     return null;
290                 }
291
292                 Address sender=msg.getSrc();
293                 if(hdr.info != null) { // it is a service state request - not a default multiplex request
294
try {
295                         handleServiceStateRequest(hdr.info, sender);
296                     }
297                     catch(Exception JavaDoc e) {
298                         if(log.isErrorEnabled())
299                             log.error("failure in handling service state request", e);
300                     }
301                     break;
302                 }
303
304                 MuxChannel mux_ch=services.get(hdr.id);
305                 if(mux_ch == null) {
306                     log.warn("service " + hdr.id + " not currently running, discarding message " + msg);
307                     return null;
308                 }
309                 return passToMuxChannel(mux_ch, evt, fifo_queue, sender, hdr.id, false); // don't block !
310

311             case Event.VIEW_CHANGE:
312                 Vector old_members=view != null? view.getMembers() : null;
313                 view=(View)evt.getArg();
314                 Vector<Address> new_members=view != null? view.getMembers() : null;
315                 Vector<Address> left_members=Util.determineLeftMembers(old_members, new_members);
316
317                 if(view instanceof MergeView) {
318                     final MergeView temp_merge_view=(MergeView)view.clone();
319                     if(log.isTraceEnabled())
320                         log.trace("received a MergeView: " + temp_merge_view + ", adjusting the service view");
321                     try {
322                         Thread JavaDoc merge_handler=new Thread JavaDoc() {
323                             public void run() {
324                                 try {
325                                     handleMergeView(temp_merge_view);
326                                 }
327                                 catch(Exception JavaDoc e) {
328                                     if(log.isErrorEnabled())
329                                         log.error("problems handling merge view", e);
330                                 }
331                             }
332                         };
333                         merge_handler.setName("merge handler view_change");
334                         merge_handler.setDaemon(false);
335                         merge_handler.start();
336                     }
337                     catch(Exception JavaDoc e) {
338                         if(log.isErrorEnabled())
339                             log.error("failed handling merge view", e);
340                     }
341                 }
342                 else { // regular view
343
synchronized(service_responses) {
344                         service_responses.clear();
345                     }
346                 }
347                 if(!left_members.isEmpty())
348                     adjustServiceViews(left_members);
349                 break;
350
351             case Event.SUSPECT:
352                 Address suspected_mbr=(Address)evt.getArg();
353
354                 synchronized(service_responses) {
355                     service_responses.put(suspected_mbr, null);
356                     service_responses.notifyAll();
357                 }
358                 passToAllMuxChannels(evt);
359                 break;
360
361             case Event.GET_APPLSTATE:
362             case Event.STATE_TRANSFER_OUTPUTSTREAM:
363                 return handleStateRequest(evt);
364
365             case Event.GET_STATE_OK:
366             case Event.STATE_TRANSFER_INPUTSTREAM:
367                 handleStateResponse(evt);
368                 break;
369
370             case Event.SET_LOCAL_ADDRESS:
371                 local_addr=(Address)evt.getArg();
372                 passToAllMuxChannels(evt);
373                 break;
374
375             case Event.BLOCK:
376                 blocked=true;
377                 if(!services.isEmpty()) {
378                     passToAllMuxChannels(evt, true, true); // do block and bypass the thread pool
379
}
380                 waitUntilThreadPoolHasNoRunningTasks(1000);
381                 return null;
382
383             case Event.UNBLOCK: // process queued-up MergeViews
384
if(blocked)
385                     blocked=false;
386                 passToAllMuxChannels(evt);
387                 break;
388
389             default:
390                 passToAllMuxChannels(evt);
391                 break;
392         }
393         return null;
394     }
395
396     private int waitUntilThreadPoolHasNoRunningTasks(long timeout) {
397         int num_threads=0;
398         long end_time=System.currentTimeMillis() + timeout;
399
400         while(fifo_queue != null && (num_threads=fifo_queue.size()) > 0 && System.currentTimeMillis() < end_time) {
401             Util.sleep(100);
402         }
403         return num_threads;
404     }
405
406
407     public Channel createMuxChannel(JChannelFactory f, String JavaDoc id, String JavaDoc stack_name) throws Exception JavaDoc {
408         MuxChannel ch;
409         if(services.containsKey(id))
410             throw new Exception JavaDoc("service ID \"" + id + "\" is already registered, cannot register duplicate ID");
411         ch=new MuxChannel(f, channel, id, stack_name, this);
412         services.put(id, ch);
413         return ch;
414     }
415
416
417
418
419     private void passToAllMuxChannels(Event evt) {
420         passToAllMuxChannels(evt, false, true);
421     }
422
423
424     private void passToAllMuxChannels(Event evt, boolean block, boolean bypass_thread_pool) {
425         String JavaDoc service_name;
426         MuxChannel ch;
427         for(Map.Entry<String JavaDoc,MuxChannel> entry: services.entrySet()) {
428             service_name=entry.getKey();
429             ch=entry.getValue();
430             // these events are directly delivered, don't get added to any queue
431
passToMuxChannel(ch, evt, fifo_queue, null, service_name, block, bypass_thread_pool);
432         }
433     }
434
435     public MuxChannel remove(String JavaDoc id) {
436         return services.remove(id);
437     }
438
439
440
441     /** Closes the underlying JChannel if all MuxChannels have been disconnected */
442     public void disconnect() {
443         boolean all_disconnected=true;
444         for(MuxChannel mux_ch: services.values()) {
445             if(mux_ch.isConnected()) {
446                 all_disconnected=false;
447                 break;
448             }
449         }
450         if(all_disconnected) {
451             if(log.isTraceEnabled()) {
452                 log.trace("disconnecting underlying JChannel as all MuxChannels are disconnected");
453             }
454             channel.disconnect();
455         }
456     }
457
458
459     public void unregister(String JavaDoc appl_id) {
460         services.remove(appl_id);
461     }
462
463     public boolean close() {
464         boolean all_closed=true;
465         for(MuxChannel mux_ch: services.values()) {
466             if(mux_ch.isOpen()) {
467                 all_closed=false;
468                 break;
469             }
470         }
471         if(all_closed) {
472             if(log.isTraceEnabled()) {
473                 log.trace("closing underlying JChannel as all MuxChannels are closed");
474             }
475             channel.close();
476             services.clear();
477             shutdownThreadPool();
478         }
479         return all_closed;
480     }
481
482     public void closeAll() {
483         for(MuxChannel mux_ch: services.values()) {
484             mux_ch.setConnected(false);
485             mux_ch.setClosed(true);
486             mux_ch.closeMessageQueue(true);
487         }
488         shutdownThreadPool();
489     }
490
491     public boolean shutdown() {
492         boolean all_closed=true;
493         for(MuxChannel mux_ch: services.values()) {
494             if(mux_ch.isOpen()) {
495                 all_closed=false;
496                 break;
497             }
498         }
499         if(all_closed) {
500             if(log.isTraceEnabled()) {
501                 log.trace("shutting down underlying JChannel as all MuxChannels are closed");
502             }
503             channel.shutdown();
504             services.clear();
505             shutdownThreadPool();
506         }
507         return all_closed;
508     }
509
510
511     private boolean isFlushPresent() {
512         return channel.getProtocolStack().findProtocol("FLUSH") != null;
513     }
514
515
516     private void sendServiceState() throws Exception JavaDoc {
517         byte[] data=Util.objectToByteBuffer(new HashSet<String JavaDoc>(services.keySet()));
518         sendServiceMessage(ServiceInfo.LIST_SERVICES_RSP, null, channel.getLocalAddress(), true, data);
519     }
520
521
522     private Address getLocalAddress() {
523         if(local_addr != null)
524             return local_addr;
525         if(channel != null)
526             local_addr=channel.getLocalAddress();
527         return local_addr;
528     }
529
530     private Address getCoordinator() {
531         if(channel != null) {
532             View v=channel.getView();
533             if(v != null) {
534                 Vector members=v.getMembers();
535                 if(members != null && !members.isEmpty()) {
536                     return (Address)members.firstElement();
537                 }
538             }
539         }
540         return null;
541     }
542
543     /**
544     * Returns an Address of a state provider for a given service_id.
545     * If preferredTarget is a member of a service view for a given service_id
546     * then preferredTarget is returned. Otherwise, service view coordinator is
547     * returned if such node exists. If service view is empty for a given service_id
548     * null is returned.
549     *
550     * @param preferredTarget
551     * @param service_id
552     * @return
553     */

554     public Address getStateProvider(Address preferredTarget, String JavaDoc service_id) {
555         Address result = null;
556         List hosts=service_state.get(service_id);
557         if(hosts != null && !hosts.isEmpty()){
558            if(hosts.contains(preferredTarget)){
559               result = preferredTarget;
560            }
561            else{
562               result = (Address)hosts.get(0);
563            }
564         }
565         return result;
566     }
567
568     private void sendServiceMessage(byte type, String JavaDoc service, Address host,boolean bypassFlush, byte[] payload) throws Exception JavaDoc {
569         if(host == null)
570             host=getLocalAddress();
571         if(host == null) {
572             if(log.isWarnEnabled()) {
573                 log.warn("local_addr is null, cannot send ServiceInfo." + ServiceInfo.typeToString(type) + " message");
574             }
575             return;
576         }
577
578         ServiceInfo si=new ServiceInfo(type, service, host, payload);
579         MuxHeader hdr=new MuxHeader(si);
580         Message service_msg=new Message();
581         service_msg.putHeader(NAME, hdr);
582         if(bypassFlush && flush_present)
583            service_msg.putHeader(FLUSH.NAME, new FLUSH.FlushHeader(FLUSH.FlushHeader.FLUSH_BYPASS));
584         
585         channel.send(service_msg);
586     }
587
588
589
590     private Object JavaDoc handleStateRequest(Event evt) {
591         StateTransferInfo info=(StateTransferInfo)evt.getArg();
592         String JavaDoc id=info.state_id;
593         String JavaDoc original_id=id;
594         Address requester=info.target; // the sender of the state request
595

596         try {
597             int index=id.indexOf(SEPARATOR);
598             if(index > -1) {
599                 info.state_id=id.substring(index + SEPARATOR_LEN);
600                 id=id.substring(0, index); // similar reuse as above...
601
}
602             else {
603                 info.state_id=null;
604             }
605
606             MuxChannel mux_ch=services.get(id);
607             if(mux_ch == null)
608                 throw new IllegalArgumentException JavaDoc("didn't find service with ID=" + id + " to fetch state from");
609
610             // state_id will be null, get regular state from the service named state_id
611
StateTransferInfo ret=(StateTransferInfo)passToMuxChannel(mux_ch, evt, fifo_queue, requester, id, true);
612             if(ret != null)
613             ret.state_id=original_id;
614             return ret;
615         }
616         catch(Throwable JavaDoc ex) {
617             if(log.isErrorEnabled())
618                 log.error("failed returning the application state, will return null", ex);
619             return new StateTransferInfo(null, original_id, 0L, null);
620         }
621     }
622
623
624
625     private void handleStateResponse(Event evt) {
626         StateTransferInfo info=(StateTransferInfo)evt.getArg();
627         MuxChannel mux_ch;
628         Address state_sender=info.target;
629
630         String JavaDoc appl_id, substate_id, tmp;
631         tmp=info.state_id;
632
633         if(tmp == null) {
634             if(log.isTraceEnabled())
635                 log.trace("state is null, not passing up: " + info);
636             return;
637         }
638
639         int index=tmp.indexOf(SEPARATOR);
640         if(index > -1) {
641             appl_id=tmp.substring(0, index);
642             substate_id=tmp.substring(index+SEPARATOR_LEN);
643         }
644         else {
645             appl_id=tmp;
646             substate_id=null;
647         }
648
649         mux_ch=services.get(appl_id);
650         if(mux_ch == null) {
651             log.error("didn't find service with ID=" + appl_id + " to fetch state from");
652         }
653         else {
654             StateTransferInfo tmp_info=info.copy();
655             tmp_info.state_id=substate_id;
656             Event tmpEvt=new Event(evt.getType(), tmp_info);
657             passToMuxChannel(mux_ch, tmpEvt, fifo_queue, state_sender, appl_id, false);
658         }
659     }
660
661     private void handleServiceStateRequest(ServiceInfo info, Address sender) throws Exception JavaDoc {
662         switch(info.type) {
663             case ServiceInfo.STATE_REQ:
664                 byte[] state;
665                 synchronized(service_state) {
666                     state=Util.objectToByteBuffer(service_state);
667                 }
668                 ServiceInfo si=new ServiceInfo(ServiceInfo.STATE_RSP, null, null, state);
669                 MuxHeader hdr=new MuxHeader(si);
670                 Message state_rsp=new Message(sender);
671                 state_rsp.putHeader(NAME, hdr);
672                 channel.send(state_rsp);
673                 break;
674             case ServiceInfo.STATE_RSP:
675                 service_state_promise.setResult(info.state);
676                 break;
677             case ServiceInfo.SERVICE_UP:
678                 handleServiceUp(info.service, info.host, true);
679                 break;
680             case ServiceInfo.SERVICE_DOWN:
681                 handleServiceDown(info.service, info.host, true);
682                 break;
683             case ServiceInfo.LIST_SERVICES_RSP:
684                 handleServicesRsp(sender, info.state);
685                 break;
686             default:
687                 if(log.isErrorEnabled())
688                     log.error("service request type " + info.type + " not known");
689                 break;
690         }
691     }
692
693     private void handleServicesRsp(Address sender, byte[] state) throws Exception JavaDoc {
694         Set<String JavaDoc> s=(Set<String JavaDoc>) Util.objectFromByteBuffer(state);
695
696         synchronized(service_responses) {
697             Set<String JavaDoc> tmp=service_responses.get(sender);
698             if(tmp == null)
699                 tmp=new HashSet<String JavaDoc>();
700             tmp.addAll(s);
701
702             service_responses.put(sender, tmp);
703             if(log.isTraceEnabled())
704                 log.trace("received service response: " + sender + "(" + s.toString() + ")");
705             service_responses.notifyAll();
706         }
707     }
708
709
710     private void handleServiceDown(String JavaDoc service, Address host, boolean received) {
711         List<Address> hosts, hosts_copy;
712         boolean removed=false;
713
714         // discard if we sent this message
715
if(received && host != null && local_addr != null && local_addr.equals(host)) {
716             return;
717         }
718
719         synchronized(service_state) {
720             hosts=service_state.get(service);
721             if(hosts == null)
722                 return;
723             removed=hosts.remove(host);
724             hosts_copy=new ArrayList<Address>(hosts); // make a copy so we don't modify hosts in generateServiceView()
725
}
726
727         if(removed) {
728             View service_view=generateServiceView(hosts_copy);
729             if(service_view != null) {
730                 MuxChannel ch=services.get(service);
731                 if(ch != null) {
732                     Event view_evt=new Event(Event.VIEW_CHANGE, service_view);
733                     // ch.up(view_evt);
734
if(ch.isConnected())
735                         passToMuxChannel(ch, view_evt, fifo_queue, null, service, false);
736                 }
737                 else {
738                     if(log.isTraceEnabled())
739                         log.trace("service " + service + " not found, cannot dispatch service view " + service_view);
740                 }
741             }
742         }
743
744         Address local_address=getLocalAddress();
745         if(local_address != null && host != null && host.equals(local_address))
746             unregister(service);
747     }
748
749
750     private void handleServiceUp(String JavaDoc service, Address host, boolean received) {
751         List<Address> hosts, hosts_copy;
752         boolean added=false;
753
754
755
756
757         // discard if we sent this message
758
if(received && host != null && local_addr != null && local_addr.equals(host)) {
759             return;
760         }
761
762         synchronized(service_state) {
763             hosts=service_state.get(service);
764             if(hosts == null) {
765                 hosts=new ArrayList<Address>();
766                 service_state.put(service, hosts);
767             }
768             if(!hosts.contains(host)) {
769                 hosts.add(host);
770                 added=true;
771             }
772             hosts_copy=new ArrayList<Address>(hosts); // make a copy so we don't modify hosts in generateServiceView()
773
}
774
775         if(added) {
776             View service_view=generateServiceView(hosts_copy);
777             if(service_view != null) {
778                 MuxChannel ch=services.get(service);
779                 if(ch != null) {
780                     Event view_evt=new Event(Event.VIEW_CHANGE, service_view);
781                     // ch.up(view_evt);
782
passToMuxChannel(ch, view_evt, fifo_queue, null, service, false);
783                 }
784                 else {
785                     if(log.isTraceEnabled())
786                         log.trace("service " + service + " not found, cannot dispatch service view " + service_view); }
787             }
788         }
789     }
790
791
792     /**
793      * Fetches the service states from everyone else in the cluster. Once all states have been received and inserted into
794      * service_state, compute a service view (a copy of MergeView) for each service and pass it up
795      * @param view
796      */

797     private void handleMergeView(MergeView view) throws Exception JavaDoc {
798         long time_to_wait=SERVICES_RSP_TIMEOUT, start;
799         int num_members=view.size(); // include myself
800
Map<Address, Set<String JavaDoc>> copy=null;
801
802         sendServiceState();
803
804         synchronized(service_responses) {
805             start=System.currentTimeMillis();
806             try {
807                 while(time_to_wait > 0 && numResponses(service_responses) < num_members) {
808                     service_responses.wait(time_to_wait);
809                     time_to_wait-=System.currentTimeMillis() - start;
810                 }
811                 copy=new HashMap<Address, Set<String JavaDoc>>(service_responses);
812             }
813             catch(Exception JavaDoc ex) {
814                 if(log.isErrorEnabled())
815                     log.error("failed fetching a list of services from other members in the cluster, cannot handle merge view " + view, ex);
816             }
817         }
818
819         if(log.isTraceEnabled())
820             log.trace("merging service state, my service_state: " + service_state + ", received responses: " + copy);
821
822         // merges service_responses with service_state and emits MergeViews for the services affected (MuxChannel)
823
mergeServiceState(view, copy);
824         service_responses.clear();
825     }
826
827     private static int numResponses(Map m) {
828         int num=0;
829         Collection values=m.values();
830         for(Iterator it=values.iterator(); it.hasNext();) {
831             if(it.next() != null)
832                 num++;
833         }
834
835         return num;
836     }
837
838
839     private void mergeServiceState(MergeView view, Map<Address, Set<String JavaDoc>> copy) {
840         Set<String JavaDoc> modified_services=new HashSet<String JavaDoc>();
841         synchronized(service_state) {
842             for(Iterator <Map.Entry<Address, Set<String JavaDoc>>> it=copy.entrySet().iterator(); it.hasNext();) {
843                 Map.Entry<Address, Set<String JavaDoc>> entry = it.next();
844                 Address host=entry.getKey();
845                 Set<String JavaDoc> service_list=entry.getValue();
846                 if(service_list == null)
847                     continue;
848
849                 for(String JavaDoc service:service_list) {
850                     List<Address> my_services=service_state.get(service);
851                     if(my_services == null) {
852                         my_services=new ArrayList<Address>();
853                         service_state.put(service, my_services);
854                     }
855
856                     boolean was_modified=my_services.add(host);
857                     if(was_modified) {
858                         modified_services.add(service);
859                     }
860                 }
861             }
862         }
863
864         // now emit MergeViews for all services which were modified
865
for(String JavaDoc service:modified_services) {
866             MuxChannel ch=services.get(service);
867             List<Address> hosts=service_state.get(service);
868             Vector<Address> membersCopy = new Vector<Address>(view.getMembers());
869             membersCopy.retainAll(hosts);
870             MergeView v=new MergeView(view.getVid(), membersCopy, view.getSubgroups());
871             // ch.up(evt);
872
passToMuxChannel(ch, new Event(Event.VIEW_CHANGE, v), fifo_queue, null, service, false);
873         }
874     }
875
876     private void adjustServiceViews(Vector left_members) {
877         if(left_members != null)
878             for(int i=0; i < left_members.size(); i++) {
879                 try {
880                     adjustServiceView((Address)left_members.elementAt(i));
881                 }
882                 catch(Throwable JavaDoc t) {
883                     if(log.isErrorEnabled())
884                         log.error("failed adjusting service views", t);
885                 }
886             }
887     }
888
889     private void adjustServiceView(Address host) {
890
891         synchronized(service_state) {
892             for(Iterator<Map.Entry<String JavaDoc,List<Address>>> it=service_state.entrySet().iterator(); it.hasNext();) {
893                 Map.Entry<String JavaDoc, List<Address>> entry = it.next();
894                 String JavaDoc service=entry.getKey();
895                 List<Address> hosts = entry.getValue();
896                 if(hosts == null)
897                     continue;
898
899                 if(hosts.remove(host)) {
900                     // make a copy so we don't modify hosts in generateServiceView()
901
View service_view=generateServiceView(new ArrayList<Address>(hosts));
902                     if(service_view != null) {
903                         MuxChannel ch=services.get(service);
904                         if(ch != null) {
905                             Event view_evt=new Event(Event.VIEW_CHANGE, service_view);
906                             // ch.up(view_evt);
907
if(ch.isConnected())
908                                 passToMuxChannel(ch, view_evt, fifo_queue, null, service, false);
909                         }
910                         else {
911                             if(log.isTraceEnabled())
912                                 log.trace("service " + service + " not found, cannot dispatch service view " + service_view);
913                         }
914                     }
915                 }
916                 Address local_address=getLocalAddress();
917                 if(local_address != null && host != null && host.equals(local_address))
918                     unregister(service);
919             }
920         }
921     }
922
923
924     /**
925      * Create a copy of view which contains only members which are present in hosts. Call viewAccepted() on the MuxChannel
926      * which corresponds with service. If no members are removed or added from/to view, this is a no-op.
927      * @param hosts List<Address>
928      * @return the servicd view (a modified copy of the real view), or null if the view was not modified
929      */

930     private View generateServiceView(List hosts) {
931         Vector<Address> members=new Vector<Address>(view.getMembers());
932         members.retainAll(hosts);
933         return new View(view.getVid(), members);
934     }
935
936     private Object JavaDoc passToMuxChannel(MuxChannel ch, Event evt, final FIFOMessageQueue<String JavaDoc,Runnable JavaDoc> queue,
937                                          final Address sender, final String JavaDoc dest, boolean block) {
938         return passToMuxChannel(ch, evt, queue, sender, dest, block, false);
939     }
940
941
942
943     private Object JavaDoc passToMuxChannel(MuxChannel ch, Event evt, final FIFOMessageQueue<String JavaDoc,Runnable JavaDoc> queue,
944                                     final Address sender, final String JavaDoc dest, boolean block, boolean bypass_thread_pool) {
945         if(thread_pool == null || bypass_thread_pool) {
946             return ch.up(evt);
947         }
948
949         Task task=new Task(ch, evt, queue, sender, dest, block);
950         ExecuteTask execute_task=new ExecuteTask(fifo_queue); // takes Task from queue and executes it
951

952         try {
953             fifo_queue.put(sender, dest, task);
954             thread_pool.execute(execute_task);
955             if(block) {
956                 try {
957                     return task.exchanger.exchange(null);
958                 }
959                 catch(InterruptedException JavaDoc e) {
960                     Thread.currentThread().interrupt();
961                 }
962             }
963         }
964         catch(InterruptedException JavaDoc e) {
965             Thread.currentThread().interrupt();
966         }
967         return null;
968     }
969
970     public void addServiceIfNotPresent(String JavaDoc id, MuxChannel ch) {
971         services.putIfAbsent(id, ch);
972     }
973
974
975     private static class Task implements Runnable JavaDoc {
976         Exchanger<Object JavaDoc> exchanger;
977         MuxChannel channel;
978         Event evt;
979         FIFOMessageQueue<String JavaDoc,Runnable JavaDoc> queue;
980         Address sender;
981         String JavaDoc dest;
982
983         Task(MuxChannel channel, Event evt, FIFOMessageQueue<String JavaDoc,Runnable JavaDoc> queue, Address sender, String JavaDoc dest, boolean result_expected) {
984             this.channel=channel;
985             this.evt=evt;
986             this.queue=queue;
987             this.sender=sender;
988             this.dest=dest;
989             if(result_expected)
990                 exchanger=new Exchanger<Object JavaDoc>();
991         }
992
993         public void run() {
994             Object JavaDoc retval;
995             try {
996                 retval=channel.up(evt);
997                 if(exchanger != null)
998                     exchanger.exchange(retval);
999             }
1000            catch(InterruptedException JavaDoc e) {
1001                Thread.currentThread().interrupt(); // let the thread pool handle the interrupt - we're done anyway
1002
}
1003            finally {
1004                queue.done(sender, dest);
1005            }
1006        }
1007    }
1008
1009
1010    private static class ExecuteTask implements Runnable JavaDoc {
1011        FIFOMessageQueue<String JavaDoc,Runnable JavaDoc> queue;
1012
1013        public ExecuteTask(FIFOMessageQueue<String JavaDoc,Runnable JavaDoc> queue) {
1014            this.queue=queue;
1015        }
1016
1017        public void run() {
1018            try {
1019                Runnable JavaDoc task=queue.take();
1020                task.run();
1021            }
1022            catch(InterruptedException JavaDoc e) {
1023            }
1024        }
1025    }
1026
1027
1028}
1029
Popular Tags