KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > JChannel


1 // $Id: JChannel.java,v 1.31 2005/04/14 16:12:49 belaban Exp $
2

3 package org.jgroups;
4
5 import org.apache.commons.logging.Log;
6 import org.apache.commons.logging.LogFactory;
7 import org.jgroups.conf.ConfiguratorFactory;
8 import org.jgroups.conf.ProtocolStackConfigurator;
9 import org.jgroups.stack.ProtocolStack;
10 import org.jgroups.stack.StateTransferInfo;
11 import org.jgroups.util.*;
12 import org.w3c.dom.Element JavaDoc;
13
14 import java.io.File JavaDoc;
15 import java.io.Serializable JavaDoc;
16 import java.net.URL JavaDoc;
17 import java.util.HashMap JavaDoc;
18 import java.util.Map JavaDoc;
19 import java.util.Vector JavaDoc;
20
21 /**
22  * JChannel is a pure Java implementation of Channel
23  * When a JChannel object is instantiated it automatically sets up the
24  * protocol stack
25  * @author Bela Ban
26  * @author Filip Hanik
27  * @version $Revision: 1.31 $
28  */

29 public class JChannel extends Channel {
30
31     /**
32      * The default protocol stack used by the default constructor.
33      */

34     public static final String JavaDoc DEFAULT_PROTOCOL_STACK=
35             "UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32):" +
36             "PING(timeout=3000;num_initial_members=6):" +
37             "FD(timeout=3000):" +
38             "VERIFY_SUSPECT(timeout=1500):" +
39             "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800):" +
40             "UNICAST(timeout=600,1200,2400,4800):" +
41             "pbcast.STABLE(desired_avg_gossip=10000):" +
42             "FRAG:" +
43             "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
44             "shun=true;print_local_addr=true)";
45
46     static final String JavaDoc FORCE_PROPS="force.properties";
47
48     /* the protocol stack configuration string */
49     private String JavaDoc props=null;
50
51     /*the address of this JChannel instance*/
52     private Address local_addr=null;
53     /*the channel (also know as group) name*/
54     private String JavaDoc channel_name=null; // group name
55
/*the latest view of the group membership*/
56     private View my_view=null;
57     /*the queue that is used to receive messages (events) from the protocol stack*/
58     private final Queue mq=new Queue();
59     /*the protocol stack, used to send and receive messages from the protocol stack*/
60     private ProtocolStack prot_stack=null;
61
62     /** Thread responsible for closing a channel and potentially reconnecting to it (e.g. when shunned) */
63     protected CloserThread closer=null;
64
65     /** To wait until a local address has been assigned */
66     private final Promise local_addr_promise=new Promise();
67
68     /** To wait until we have connected successfully */
69     private final Promise connect_promise=new Promise();
70
71     /** To wait until we have been disconnected from the channel */
72     private final Promise disconnect_promise=new Promise();
73
74     private final Promise state_promise=new Promise();
75
76     // private final Promise flow_control_promise=new Promise();
77
// private final Object flow_control_mutex=new Object();
78

79     /** wait until we have a non-null local_addr */
80     private long LOCAL_ADDR_TIMEOUT=30000; //=Long.parseLong(System.getProperty("local_addr.timeout", "30000"));
81
/*if the states is fetched automatically, this is the default timeout, 5 secs*/
82     private static final long GET_STATE_DEFAULT_TIMEOUT=5000;
83     /*flag to indicate whether to receive views from the protocol stack*/
84     private boolean receive_views=true;
85     /*flag to indicate whether to receive suspect messages*/
86     private boolean receive_suspects=true;
87     /*flag to indicate whether to receive blocks, if this is set to true, receive_views is set to true*/
88     private boolean receive_blocks=false;
89     /*flag to indicate whether to receive local messages
90      *if this is set to false, the JChannel will not receive messages sent by itself*/

91     private boolean receive_local_msgs=true;
92     /*flag to indicate whether to receive a state message or not*/
93     private boolean receive_get_states=false;
94     /*flag to indicate whether the channel will reconnect (reopen) when the exit message is received*/
95     private boolean auto_reconnect=false;
96     /*flag t indicate whether the state is supposed to be retrieved after the channel is reconnected
97      *setting this to true, automatically forces auto_reconnect to true*/

98     private boolean auto_getstate=false;
99     /*channel connected flag*/
100     private boolean connected=false;
101
102     /** block send()/down() if true (unlocked by UNBLOCK_SEND event) */
103     private final CondVar block_sending=new CondVar("block_sending", Boolean.FALSE);
104
105     /*channel closed flag*/
106     private boolean closed=false; // close() has been called, channel is unusable
107

108     /** True if a state transfer protocol is available, false otherwise */
109     private boolean state_transfer_supported=false; // set by CONFIG event from STATE_TRANSFER protocol
110

111     /** Used to maintain additional data across channel disconnects/reconnects. This is a kludge and will be remove
112      * as soon as JGroups supports logical addresses
113      */

114     private byte[] additional_data=null;
115
116     protected final Log log=LogFactory.getLog(getClass());
117
118     /**
119      * Constructs a <code>JChannel</code> instance with the protocol stack
120      * specified by the <code>DEFAULT_PROTOCOL_STACK</code> member.
121      *
122      * @throws ChannelException if problems occur during the initialization of
123      * the protocol stack.
124      */

125     public JChannel() throws ChannelException {
126         this(DEFAULT_PROTOCOL_STACK);
127     }
128
129     /**
130      * Constructs a <code>JChannel</code> instance with the protocol stack
131      * configuration contained by the specified file.
132      *
133      * @param properties a file containing a JGroups XML protocol stack
134      * configuration.
135      *
136      * @throws ChannelException if problems occur during the configuration or
137      * initialization of the protocol stack.
138      */

139     public JChannel(File JavaDoc properties) throws ChannelException {
140         this(ConfiguratorFactory.getStackConfigurator(properties));
141     }
142
143     /**
144      * Constructs a <code>JChannel</code> instance with the protocol stack
145      * configuration contained by the specified XML element.
146      *
147      * @param properties a XML element containing a JGroups XML protocol stack
148      * configuration.
149      *
150      * @throws ChannelException if problems occur during the configuration or
151      * initialization of the protocol stack.
152      */

153     public JChannel(Element JavaDoc properties) throws ChannelException {
154         this(ConfiguratorFactory.getStackConfigurator(properties));
155     }
156
157     /**
158      * Constructs a <code>JChannel</code> instance with the protocol stack
159      * configuration indicated by the specified URL.
160      *
161      * @param properties a URL pointing to a JGroups XML protocol stack
162      * configuration.
163      *
164      * @throws ChannelException if problems occur during the configuration or
165      * initialization of the protocol stack.
166      */

167     public JChannel(URL JavaDoc properties) throws ChannelException {
168         this(ConfiguratorFactory.getStackConfigurator(properties));
169     }
170
171     /**
172      * Constructs a <code>JChannel</code> instance with the protocol stack
173      * configuration based upon the specified properties parameter.
174      *
175      * @param properties an old style property string, a string representing a
176      * system resource containing a JGroups XML configuration,
177      * a string representing a URL pointing to a JGroups XML
178      * XML configuration, or a string representing a file name
179      * that contains a JGroups XML configuration.
180      *
181      * @throws ChannelException if problems occur during the configuration and
182      * initialization of the protocol stack.
183      */

184     public JChannel(String JavaDoc properties) throws ChannelException {
185         this(ConfiguratorFactory.getStackConfigurator(properties));
186     }
187
188     /**
189      * Constructs a <code>JChannel</code> instance with the protocol stack
190      * configuration contained by the protocol stack configurator parameter.
191      * <p>
192      * All of the public constructors of this class eventually delegate to this
193      * method.
194      *
195      * @param configurator a protocol stack configurator containing a JGroups
196      * protocol stack configuration.
197      *
198      * @throws ChannelException if problems occur during the initialization of
199      * the protocol stack.
200      */

201     protected JChannel(ProtocolStackConfigurator configurator) throws ChannelException {
202         props = configurator.getProtocolStackString();
203
204         /*create the new protocol stack*/
205         prot_stack=new ProtocolStack(this, props);
206
207         /* Setup protocol stack (create layers, queues between them */
208         try {
209             prot_stack.setup();
210         }
211         catch(Throwable JavaDoc e) {
212             throw new ChannelException("unable to setup the protocol stack", e);
213         }
214     }
215
216     /**
217      * Creates a new JChannel with the protocol stack as defined in the properties
218      * parameter. an example of this parameter is<BR>
219      * "UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE"<BR>
220      * Other examples can be found in the ./conf directory<BR>
221      * @param properties the protocol stack setup, if null, the default protocol stack will be used
222      * @param properties the properties can also be a java.net.URL object or a string that is a URL spec.
223      * The JChannel will validate any URL object and String object to see if they are a URL.
224      * In case of the parameter being a url, the JChannel will try to load the xml from there.
225      * In case properties is a org.w3c.dom.Element, the ConfiguratorFactory will parse the
226      * DOM tree with the element as its root element.
227      * @deprecated Use the constructors with specific parameter types instead.
228      */

229     public JChannel(Object JavaDoc properties) throws ChannelException {
230         if (properties == null) {
231             properties = DEFAULT_PROTOCOL_STACK;
232         }
233
234         try {
235             ProtocolStackConfigurator c=ConfiguratorFactory.getStackConfigurator(properties);
236             props=c.getProtocolStackString();
237         }
238         catch(Exception JavaDoc x) {
239             String JavaDoc strace=Util.getStackTrace(x);
240             if(log.isErrorEnabled()) log.error(strace);
241             throw new ChannelException("unable to load protocol stack: {" + x.getMessage() + ';' + strace + '}');
242         }
243
244         /*create the new protocol stack*/
245         prot_stack=new ProtocolStack(this, props);
246
247         /* Setup protocol stack (create layers, queues between them */
248         try {
249             prot_stack.setup();
250         }
251         catch(Throwable JavaDoc e) {
252             throw new ChannelException("JChannel(): " + e);
253         }
254     }
255
256
257     /**
258      * Returns the protocol stack.
259      * Currently used by Debugger.
260      * Specific to JChannel, therefore
261      * not visible in Channel
262      */

263     public ProtocolStack getProtocolStack() {
264         return prot_stack;
265     }
266
267
268     /**
269      * returns the protocol stack configuration in string format.
270      * an example of this property is<BR>
271      * "UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE"
272      */

273     public String JavaDoc getProperties() {
274         return props;
275     }
276
277
278     /**
279      * Returns a pretty-printed form of all the protocols. If include_properties is set,
280      * the properties for each protocol will also be printed.
281      */

282     public String JavaDoc printProtocolSpec(boolean include_properties) {
283         return prot_stack != null ? prot_stack.printProtocolSpec(include_properties) : null;
284     }
285
286
287     /**
288      * Connects the channel to a group.<BR>
289      * If the channel is already connected, an error message will be printed to the error log<BR>
290      * If the channel is closed a ChannelClosed exception will be thrown<BR>
291      * This method starts the protocol stack by calling ProtocolStack.start<BR>
292      * then it sends an Event.CONNECT event down the stack and waits to receive a CONNECT_OK event<BR>
293      * Once the CONNECT_OK event arrives from the protocol stack, any channel listeners are notified<BR>
294      * and the channel is considered connected<BR>
295      *
296      * @param channel_name A <code>String</code> denoting the group name. Cannot be null.
297      * @exception ChannelException The protocol stack cannot be started
298      * @exception ChannelClosedException The channel is closed and therefore cannot be used any longer.
299      * A new channel has to be created first.
300      */

301     public synchronized void connect(String JavaDoc channel_name) throws ChannelException, ChannelClosedException {
302         /*make sure the channel is not closed*/
303         checkClosed();
304
305         /*if we already are connected, then ignore this*/
306         if(connected) {
307             if(log.isErrorEnabled()) log.error("already connected to " + channel_name);
308             return;
309         }
310
311         /*make sure we have a valid channel name*/
312         if(channel_name == null) {
313             if(log.isInfoEnabled()) log.info("channel_name is null, assuming unicast channel");
314         }
315         else
316             this.channel_name=channel_name;
317
318         try {
319             prot_stack.startStack(); // calls start() in all protocols, from top to bottom
320
}
321         catch(Throwable JavaDoc e) {
322             if(log.isErrorEnabled()) log.error("exception: " + e);
323             throw new ChannelException(e.toString());
324         }
325
326         /* try to get LOCAL_ADDR_TIMEOUT. Catch SecurityException if called in an untrusted environment (e.g. using JNLP) */
327         try {
328             LOCAL_ADDR_TIMEOUT=Long.parseLong(System.getProperty("local_addr.timeout","30000"));
329         }
330         catch (SecurityException JavaDoc e1) {
331             /* Use the default value specified above*/
332         }
333
334         /* Wait LOCAL_ADDR_TIMEOUT milliseconds for local_addr to have a non-null value (set by SET_LOCAL_ADDRESS) */
335         local_addr=(Address)local_addr_promise.getResult(LOCAL_ADDR_TIMEOUT);
336         if(local_addr == null) {
337             log.fatal("local_addr is null; cannot connect");
338             throw new ChannelException("local_addr is null");
339         }
340
341
342         /*create a temporary view, assume this channel is the only member and
343          *is the coordinator*/

344         Vector JavaDoc t=new Vector JavaDoc(1);
345         t.addElement(local_addr);
346         my_view=new View(local_addr, 0, t); // create a dummy view
347

348         // only connect if we are not a unicast channel
349
if(channel_name != null) {
350             connect_promise.reset();
351             Event connect_event=new Event(Event.CONNECT, channel_name);
352             down(connect_event);
353             connect_promise.getResult(); // waits forever until connected (or channel is closed)
354
}
355
356         /*notify any channel listeners*/
357         connected=true;
358         if(channel_listener != null)
359             channel_listener.channelConnected(this);
360     }
361
362
363     /**
364      * Disconnects the channel if it is connected. If the channel is closed, this operation is ignored<BR>
365      * Otherwise the following actions happen in the listed order<BR>
366      * <ol>
367      * <li> The JChannel sends a DISCONNECT event down the protocol stack<BR>
368      * <li> Blocks until the channel to receives a DISCONNECT_OK event<BR>
369      * <li> Sends a STOP_QUEING event down the stack<BR>
370      * <li> Stops the protocol stack by calling ProtocolStack.stop()<BR>
371      * <li> Notifies the listener, if the listener is available<BR>
372      * </ol>
373      */

374     public synchronized void disconnect() {
375         if(closed) return;
376
377         if(connected) {
378
379             if(channel_name != null) {
380
381                 /* Send down a DISCONNECT event. The DISCONNECT event travels down to the GMS, where a
382                 * DISCONNECT_OK response is generated and sent up the stack. JChannel blocks until a
383                 * DISCONNECT_OK has been received, or until timeout has elapsed.
384                 */

385                 Event disconnect_event=new Event(Event.DISCONNECT, local_addr);
386                 disconnect_promise.reset();
387                 down(disconnect_event); // DISCONNECT is handled by each layer
388
disconnect_promise.getResult(); // wait for DISCONNECT_OK
389
}
390
391             // Just in case we use the QUEUE protocol and it is still blocked...
392
down(new Event(Event.STOP_QUEUEING));
393
394             connected=false;
395             try {
396                 prot_stack.stopStack(); // calls stop() in all protocols, from top to bottom
397
}
398             catch(Exception JavaDoc e) {
399                 if(log.isErrorEnabled()) log.error("exception: " + e);
400             }
401
402             if(channel_listener != null)
403                 channel_listener.channelDisconnected(this);
404
405             init(); // sets local_addr=null; changed March 18 2003 (bela) -- prevented successful rejoining
406
}
407     }
408
409
410     /**
411      * Destroys the channel.<BR>
412      * After this method has been called, the channel us unusable.<BR>
413      * This operation will disconnect the channel and close the channel receive queue immediately<BR>
414      */

415     public synchronized void close() {
416         _close(true, true); // by default disconnect before closing channel and close mq
417
}
418
419
420     /**
421      * Opens the channel.<BR>
422      * this does the following actions<BR>
423      * 1. Resets the receiver queue by calling Queue.reset<BR>
424      * 2. Sets up the protocol stack by calling ProtocolStack.setup<BR>
425      * 3. Sets the closed flag to false.<BR>
426      */

427     public synchronized void open() throws ChannelException {
428         if(!closed)
429             throw new ChannelException("JChannel.open(): channel is already open");
430
431         try {
432             mq.reset();
433
434             // new stack is created on open() - bela June 12 2003
435
prot_stack=new ProtocolStack(this, props);
436             prot_stack.setup();
437             closed=false;
438         }
439         catch(Exception JavaDoc e) {
440             throw new ChannelException("JChannel().open(): " + e.getMessage());
441         }
442     }
443
444     /**
445      * returns true if the Open operation has been called successfully
446      */

447     public boolean isOpen() {
448         return !closed;
449     }
450
451
452     /**
453      * returns true if the Connect operation has been called successfully
454      */

455     public boolean isConnected() {
456         return connected;
457     }
458
459     public int getNumMessages() {
460         return mq != null? mq.size() : -1;
461     }
462
463
464     /**
465      * implementation of the Transport interface.<BR>
466      * Sends a message through the protocol stack<BR>
467      * @param msg the message to be sent through the protocol stack,
468      * the destination of the message is specified inside the message itself
469      * @exception ChannelNotConnectedException
470      * @exception ChannelClosedException
471      */

472     public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException {
473         checkClosed();
474         checkNotConnected();
475         down(new Event(Event.MSG, msg));
476     }
477
478
479     /**
480      * creates a new message with the destination address, and the source address
481      * and the object as the message value
482      * @param dst - the destination address of the message, null for all members
483      * @param src - the source address of the message
484      * @param obj - the value of the message
485      * @exception ChannelNotConnectedException
486      * @exception ChannelClosedException
487      * @see JChannel#send
488      */

489     public void send(Address dst, Address src, Serializable JavaDoc obj) throws ChannelNotConnectedException, ChannelClosedException {
490         send(new Message(dst, src, obj));
491     }
492
493
494     /**
495      * Blocking receive method.
496      * This method returns the object that was first received by this JChannel and that has not been
497      * received before. After the object is received, it is removed from the receive queue.<BR>
498      * If you only want to inspect the object received without removing it from the queue call
499      * JChannel.peek<BR>
500      * If no messages are in the receive queue, this method blocks until a message is added or the operation times out<BR>
501      * By specifying a timeout of 0, the operation blocks forever, or until a message has been received.
502      * @param timeout the number of milliseconds to wait if the receive queue is empty. 0 means wait forever
503      * @exception TimeoutException if a timeout occured prior to a new message was received
504      * @exception ChannelNotConnectedException
505      * @exception ChannelClosedException
506      * @see JChannel#peek
507      */

508     public Object JavaDoc receive(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException {
509         Object JavaDoc retval=null;
510         Event evt;
511
512         checkClosed();
513         checkNotConnected();
514
515         try {
516             evt=(timeout <= 0) ? (Event)mq.remove() : (Event)mq.remove(timeout);
517             retval=getEvent(evt);
518             evt=null;
519             return retval;
520         }
521         catch(QueueClosedException queue_closed) {
522             throw new ChannelClosedException();
523         }
524         catch(TimeoutException t) {
525             throw t;
526         }
527         catch(Exception JavaDoc e) {
528             if(log.isErrorEnabled()) log.error("exception: " + e);
529             return null;
530         }
531     }
532
533
534     /**
535      * Just peeks at the next message, view or block. Does <em>not</em> install
536      * new view if view is received<BR>
537      * Does the same thing as JChannel.receive but doesn't remove the object from the
538      * receiver queue
539      */

540     public Object JavaDoc peek(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException {
541         Object JavaDoc retval=null;
542         Event evt;
543
544         checkClosed();
545         checkNotConnected();
546
547         try {
548             evt=(timeout <= 0) ? (Event)mq.peek() : (Event)mq.peek(timeout);
549             retval=getEvent(evt);
550             evt=null;
551             return retval;
552         }
553         catch(QueueClosedException queue_closed) {
554             if(log.isErrorEnabled()) log.error("exception: " + queue_closed);
555             return null;
556         }
557         catch(TimeoutException t) {
558             return null;
559         }
560         catch(Exception JavaDoc e) {
561             if(log.isErrorEnabled()) log.error("exception: " + e);
562             return null;
563         }
564     }
565
566
567
568
569     /**
570      * returns the current view.<BR>
571      * if the channel is not connected or if it is closed it will return null<BR>
572      * @return returns the current group view, or null if the channel is closed or disconnected
573      */

574     public View getView() {
575         return closed || !connected ? null : my_view;
576     }
577
578
579     /**
580      * returns the local address of the channel
581      * returns null if the channel is closed
582      */

583     public Address getLocalAddress() {
584         return closed ? null : local_addr;
585     }
586
587
588     /**
589      * returns the name of the channel
590      * if the channel is not connected or if it is closed it will return null
591      */

592     public String JavaDoc getChannelName() {
593         return closed ? null : !connected ? null : channel_name;
594     }
595
596
597     /**
598      * sets a channel option
599      * the options can be either
600      * <PRE>
601      * Channel.BLOCK
602      * Channel.VIEW
603      * Channel.SUSPECT
604      * Channel.LOCAL
605      * Channel.GET_STATE_EVENTS
606      * Channel.AUTO_RECONNECT
607      * Channel.AUTO_GETSTATE
608      * </PRE>
609      * There are certain dependencies between the options that you can set, I will try to describe them here<BR>
610      * Option: Channel.VIEW option<BR>
611      * Value: java.lang.Boolean<BR>
612      * Result: set to true the JChannel will receive VIEW change events<BR>
613      *<BR>
614      * Option: Channel.SUSPECT<BR>
615      * Value: java.lang.Boolean<BR>
616      * Result: set to true the JChannel will receive SUSPECT events<BR>
617      *<BR>
618      * Option: Channel.BLOCK<BR>
619      * Value: java.lang.Boolean<BR>
620      * Result: set to true will set setOpt(VIEW, true) and the JChannel will receive BLOCKS and VIEW events<BR>
621      *<BR>
622      * Option: GET_STATE_EVENTS<BR>
623      * Value: java.lang.Boolean<BR>
624      * Result: set to true the JChannel will receive state events<BR>
625      *<BR>
626      * Option: LOCAL<BR>
627      * Value: java.lang.Boolean<BR>
628      * Result: set to true the JChannel will receive messages that it self sent out.<BR>
629      *<BR>
630      * Option: AUTO_RECONNECT<BR>
631      * Value: java.lang.Boolean<BR>
632      * Result: set to true and the JChannel will try to reconnect when it is being closed<BR>
633      *<BR>
634      * Option: AUTO_GETSTATE<BR>
635      * Value: java.lang.Boolean<BR>
636      * Result: set to true, the AUTO_RECONNECT will be set to true and the JChannel will try to get the state after a close and reconnect happens<BR>
637      * <BR>
638      *
639      * @param option the parameter option Channel.VIEW, Channel.SUSPECT, etc
640      * @param value the value to set for this option
641      *
642      */

643     public void setOpt(int option, Object JavaDoc value) {
644         if(closed) {
645             if(log.isWarnEnabled()) log.warn("channel is closed; option not set !");
646             return;
647         }
648
649         switch(option) {
650             case VIEW:
651                 if(value instanceof Boolean JavaDoc)
652                     receive_views=((Boolean JavaDoc)value).booleanValue();
653                 else
654                     if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
655                                                      " (" + value + "): value has to be Boolean");
656                 break;
657             case SUSPECT:
658                 if(value instanceof Boolean JavaDoc)
659                     receive_suspects=((Boolean JavaDoc)value).booleanValue();
660                 else
661                     if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
662                                                      " (" + value + "): value has to be Boolean");
663                 break;
664             case BLOCK:
665                 if(value instanceof Boolean JavaDoc)
666                     receive_blocks=((Boolean JavaDoc)value).booleanValue();
667                 else
668                     if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
669                                                      " (" + value + "): value has to be Boolean");
670                 if(receive_blocks)
671                     receive_views=true;
672                 break;
673
674             case GET_STATE_EVENTS:
675                 if(value instanceof Boolean JavaDoc)
676                     receive_get_states=((Boolean JavaDoc)value).booleanValue();
677                 else
678                     if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
679                                                      " (" + value + "): value has to be Boolean");
680                 break;
681
682
683             case LOCAL:
684                 if(value instanceof Boolean JavaDoc)
685                     receive_local_msgs=((Boolean JavaDoc)value).booleanValue();
686                 else
687                     if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
688                                                      " (" + value + "): value has to be Boolean");
689                 break;
690
691             case AUTO_RECONNECT:
692                 if(value instanceof Boolean JavaDoc)
693                     auto_reconnect=((Boolean JavaDoc)value).booleanValue();
694                 else
695                     if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
696                                                      " (" + value + "): value has to be Boolean");
697                 break;
698
699             case AUTO_GETSTATE:
700                 if(value instanceof Boolean JavaDoc) {
701                     auto_getstate=((Boolean JavaDoc)value).booleanValue();
702                     if(auto_getstate)
703                         auto_reconnect=true;
704                 }
705                 else
706                     if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
707                                                      " (" + value + "): value has to be Boolean");
708                 break;
709
710             default:
711                 if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) + " not known");
712                 break;
713         }
714     }
715
716
717     /**
718      * returns the value of an option.
719      * @param option the option you want to see the value for
720      * @return the object value, in most cases java.lang.Boolean
721      * @see JChannel#setOpt
722      */

723     public Object JavaDoc getOpt(int option) {
724         switch(option) {
725             case VIEW:
726 // return Boolean.valueOf(receive_views);
727
return receive_views ? Boolean.TRUE : Boolean.FALSE;
728             case BLOCK:
729 // return Boolean.valueOf(receive_blocks);
730
return receive_blocks ? Boolean.TRUE : Boolean.FALSE;
731             case SUSPECT:
732 // return Boolean.valueOf(receive_suspects);
733
return receive_suspects ? Boolean.TRUE : Boolean.FALSE;
734             case GET_STATE_EVENTS:
735 // return Boolean.valueOf(receive_get_states);
736
return receive_get_states ? Boolean.TRUE : Boolean.FALSE;
737             case LOCAL:
738 // return Boolean.valueOf(receive_local_msgs);
739
return receive_local_msgs ? Boolean.TRUE : Boolean.FALSE;
740             default:
741                 if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) + " not known");
742                 return null;
743         }
744     }
745
746
747     /**
748      * Called to acknowledge a block() (callback in <code>MembershipListener</code> or
749      * <code>BlockEvent</code> received from call to <code>receive()</code>).
750      * After sending blockOk(), no messages should be sent until a new view has been received.
751      * Calling this method on a closed channel has no effect.
752      */

753     public void blockOk() {
754         down(new Event(Event.BLOCK_OK));
755         down(new Event(Event.START_QUEUEING));
756     }
757
758
759     /**
760      * Retrieves the current group state. Sends GET_STATE event down to STATE_TRANSFER layer.
761      * Blocks until STATE_TRANSFER sends up a GET_STATE_OK event or until <code>timeout</code>
762      * milliseconds have elapsed. The argument of GET_STATE_OK should be a single object.
763      * @param target - the target member to receive the state from. if null, state is retrieved from coordinator
764      * @param timeout - the number of milliseconds to wait for the operation to complete successfully
765      * @return true of the state was received, false if the operation timed out
766      */

767     public boolean getState(Address target, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
768         StateTransferInfo info=new StateTransferInfo(StateTransferInfo.GET_FROM_SINGLE, target);
769         info.timeout=timeout;
770         return _getState(new Event(Event.GET_STATE, info), timeout);
771     }
772
773
774     /**
775      * Retrieves the current group state. Sends GET_STATE event down to STATE_TRANSFER layer.
776      * Blocks until STATE_TRANSFER sends up a GET_STATE_OK event or until <code>timeout</code>
777      * milliseconds have elapsed. The argument of GET_STATE_OK should be a vector of objects.
778      * @param targets - the target members to receive the state from ( an Address list )
779      * @param timeout - the number of milliseconds to wait for the operation to complete successfully
780      * @return true of the state was received, false if the operation timed out
781      */

782     public boolean getAllStates(Vector JavaDoc targets, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
783         StateTransferInfo info=new StateTransferInfo(StateTransferInfo.GET_FROM_MANY, targets);
784         return _getState(new Event(Event.GET_STATE, info), timeout);
785     }
786
787
788     /**
789      * Called by the application is response to receiving a <code>getState()</code> object when
790      * calling <code>receive()</code>.<br>
791      * When the application receives a getState() message on the receive() method,
792      * it should call returnState() to reply with the state of the application
793      * @param state The state of the application as a byte buffer
794      * (to send over the network).
795      */

796     public void returnState(byte[] state) {
797         down(new Event(Event.GET_APPLSTATE_OK, state));
798     }
799
800
801
802
803
804     /**
805      * Callback method <BR>
806      * Called by the ProtocolStack when a message is received.
807      * It will be added to the message queue from which subsequent
808      * <code>Receive</code>s will dequeue it.
809      * @param evt the event carrying the message from the protocol stack
810      */

811     public void up(Event evt) {
812         int type=evt.getType();
813         Message msg;
814
815         /*if the queue is not available, there is no point in
816          *processing the message at all*/

817         if(mq == null) {
818             if(log.isErrorEnabled()) log.error("message queue is null");
819             return;
820         }
821
822         switch(type) {
823
824             case Event.MSG:
825                 msg=(Message)evt.getArg();
826                 if(!receive_local_msgs) { // discard local messages (sent by myself to me)
827
if(local_addr != null && msg.getSrc() != null)
828                         if(local_addr.equals(msg.getSrc()))
829                             return;
830                 }
831                 break;
832
833             case Event.VIEW_CHANGE:
834                 my_view=(View)evt.getArg();
835
836                 // crude solution to bug #775120: if we get our first view *before* the CONNECT_OK,
837
// we simply set the state to connected
838
if(connected == false) {
839                     connected=true;
840                     connect_promise.setResult(Boolean.TRUE);
841                 }
842
843                 // unblock queueing of messages due to previous BLOCK event:
844
down(new Event(Event.STOP_QUEUEING));
845                 if(!receive_views) // discard if client has not set receving views to on
846
return;
847                 //if(connected == false)
848
// my_view=(View)evt.getArg();
849
break;
850
851             case Event.SUSPECT:
852                 if(!receive_suspects)
853                     return;
854                 break;
855
856             case Event.GET_APPLSTATE: // return the application's state
857
if(!receive_get_states) { // if not set to handle state transfers, send null state
858
down(new Event(Event.GET_APPLSTATE_OK, null));
859                     return;
860                 }
861                 break;
862
863             case Event.CONFIG:
864                 HashMap JavaDoc config=(HashMap JavaDoc)evt.getArg();
865                 if(config != null && config.containsKey("state_transfer"))
866                     state_transfer_supported=((Boolean JavaDoc)config.get("state_transfer")).booleanValue();
867                 break;
868
869             case Event.BLOCK:
870                 // If BLOCK is received by application, then we trust the application to not send
871
// any more messages until a VIEW_CHANGE is received. Otherwise (BLOCKs are disabled),
872
// we queue any messages sent until the next VIEW_CHANGE (they will be sent in the
873
// next view)
874

875                 if(!receive_blocks) { // discard if client has not set 'receiving blocks' to 'on'
876
down(new Event(Event.BLOCK_OK));
877                     down(new Event(Event.START_QUEUEING));
878                     return;
879                 }
880                 break;
881
882             case Event.CONNECT_OK:
883                 connect_promise.setResult(Boolean.TRUE);
884                 break;
885
886             case Event.DISCONNECT_OK:
887                 disconnect_promise.setResult(Boolean.TRUE);
888                 break;
889
890             case Event.GET_STATE_OK:
891                 try {
892                     mq.add(new Event(Event.STATE_RECEIVED, evt.getArg()));
893                 }
894                 catch(Exception JavaDoc e) {
895                 }
896                 state_promise.setResult(evt.getArg());
897                 break;
898
899             case Event.SET_LOCAL_ADDRESS:
900                 local_addr_promise.setResult(evt.getArg());
901                 break;
902
903             case Event.EXIT:
904                 handleExit(evt);
905                 return; // no need to pass event up; already done in handleExit()
906

907             case Event.BLOCK_SEND: // emitted by FLOW_CONTROL
908
if(log.isInfoEnabled()) log.info("received BLOCK_SEND");
909                 block_sending.set(Boolean.TRUE);
910                 break;
911
912             case Event.UNBLOCK_SEND: // emitted by FLOW_CONTROL
913
if(log.isInfoEnabled()) log.info("received UNBLOCK_SEND");
914                 block_sending.set(Boolean.FALSE);
915                 break;
916
917             default:
918                 break;
919         }
920
921
922         // If UpHandler is installed, pass all events to it and return (UpHandler is e.g. a building block)
923
if(up_handler != null) {
924             up_handler.up(evt);
925             return;
926         }
927
928         if(type == Event.MSG || type == Event.VIEW_CHANGE || type == Event.SUSPECT ||
929                 type == Event.GET_APPLSTATE || type == Event.BLOCK) {
930             try {
931                 mq.add(evt);
932             }
933             catch(Exception JavaDoc e) {
934                 if(log.isErrorEnabled()) log.error("exception: " + e);
935             }
936         }
937     }
938
939
940     /**
941      * Sends a message through the protocol stack if the stack is available
942      * @param evt the message to send down, encapsulated in an event
943      */

944     public void down(Event evt) {
945         if(evt == null) return;
946
947         // only block for messages; all other events are passed through
948
// we use double-checked locking; it is okay to 'lose' one or more messages because block_sending changes
949
// to true after an initial false value
950
if(evt.getType() == Event.MSG && block_sending.get().equals(Boolean.TRUE)) {
951             if(log.isTraceEnabled()) log.trace("down() blocks because block_sending == true");
952             block_sending.waitUntil(Boolean.FALSE);
953         }
954
955         // handle setting of additional data (kludge, will be removed soon)
956
if(evt.getType() == Event.CONFIG) {
957             try {
958                 Map JavaDoc m=(Map JavaDoc)evt.getArg();
959                 if(m != null && m.containsKey("additional_data")) {
960                     additional_data=(byte[])m.get("additional_data");
961                 }
962             }
963             catch(Throwable JavaDoc t) {
964                 if(log.isErrorEnabled()) log.error("CONFIG event did not contain a hashmap: " + t);
965             }
966         }
967
968         if(prot_stack != null)
969             prot_stack.down(evt);
970         else
971             if(log.isErrorEnabled()) log.error("no protocol stack available");
972     }
973
974
975     public String JavaDoc toString(boolean details) {
976         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
977         sb.append("local_addr=").append(local_addr).append('\n');
978         sb.append("channel_name=").append(channel_name).append('\n');
979         sb.append("my_view=").append(my_view).append('\n');
980         sb.append("connected=").append(connected).append('\n');
981         sb.append("closed=").append(closed).append('\n');
982         if(mq != null)
983             sb.append("incoming queue size=").append(mq.size()).append('\n');
984         if(details) {
985             sb.append("block_sending=").append(block_sending).append('\n');
986             sb.append("receive_views=").append(receive_views).append('\n');
987             sb.append("receive_suspects=").append(receive_suspects).append('\n');
988             sb.append("receive_blocks=").append(receive_blocks).append('\n');
989             sb.append("receive_local_msgs=").append(receive_local_msgs).append('\n');
990             sb.append("receive_get_states=").append(receive_get_states).append('\n');
991             sb.append("auto_reconnect=").append(auto_reconnect).append('\n');
992             sb.append("auto_getstate=").append(auto_getstate).append('\n');
993             sb.append("state_transfer_supported=").append(state_transfer_supported).append('\n');
994             sb.append("props=").append(props).append('\n');
995         }
996
997         return sb.toString();
998     }
999
1000
1001    /* ----------------------------------- Private Methods ------------------------------------- */
1002
1003
1004    /**
1005     * Initializes all variables. Used after <tt>close()</tt> or <tt>disconnect()</tt>,
1006     * to be ready for new <tt>connect()</tt>
1007     */

1008    private void init() {
1009        local_addr=null;
1010        channel_name=null;
1011        my_view=null;
1012
1013        // changed by Bela Sept 25 2003
1014
//if(mq != null && mq.closed())
1015
// mq.reset();
1016

1017        connect_promise.reset();
1018        disconnect_promise.reset();
1019        connected=false;
1020        block_sending.set(Boolean.FALSE);
1021    }
1022
1023
1024    /**
1025     * health check.<BR>
1026     * throws a ChannelNotConnected exception if the channel is not connected
1027     */

1028    private final void checkNotConnected() throws ChannelNotConnectedException {
1029        if(!connected)
1030            throw new ChannelNotConnectedException();
1031    }
1032
1033    /**
1034     * health check<BR>
1035     * throws a ChannelClosed exception if the channel is closed
1036     */

1037    private final void checkClosed() throws ChannelClosedException {
1038        if(closed)
1039            throw new ChannelClosedException();
1040    }
1041
1042    /**
1043     * returns the value of the event<BR>
1044     * These objects will be returned<BR>
1045     * <PRE>
1046     * <B>Event Type - Return Type</B>
1047     * Event.MSG - returns a Message object
1048     * Event.VIEW_CHANGE - returns a View object
1049     * Event.SUSPECT - returns a SuspectEvent object
1050     * Event.BLOCK - returns a new BlockEvent object
1051     * Event.GET_APPLSTATE - returns a GetStateEvent object
1052     * Event.STATE_RECEIVED- returns a SetStateEvent object
1053     * Event.Exit - returns an ExitEvent object
1054     * All other - return the actual Event object
1055     * </PRE>
1056     * @param evt - the event of which you want to extract the value
1057     * @return the event value if it matches the select list,
1058     * returns null if the event is null
1059     * returns the event itself if a match (See above) can not be made of the event type
1060     */

1061    static Object JavaDoc getEvent(Event evt) {
1062        if(evt == null)
1063            return null; // correct ?
1064

1065        switch(evt.getType()) {
1066            case Event.MSG:
1067                return evt.getArg();
1068            case Event.VIEW_CHANGE:
1069                return evt.getArg();
1070            case Event.SUSPECT:
1071                return new SuspectEvent(evt.getArg());
1072            case Event.BLOCK:
1073                return new BlockEvent();
1074            case Event.GET_APPLSTATE:
1075                return new GetStateEvent(evt.getArg());
1076            case Event.STATE_RECEIVED:
1077                return new SetStateEvent((byte[])evt.getArg());
1078            case Event.EXIT:
1079                return new ExitEvent();
1080            default:
1081                return evt;
1082        }
1083    }
1084
1085
1086    /**
1087     * Receives the state from the group and modifies the JChannel.state object<br>
1088     * This method initializes the local state variable to null, and then sends the state
1089     * event down the stack. It waits for a GET_STATE_OK event to bounce back
1090     * @param evt the get state event, has to be of type Event.GET_STATE
1091     * @param timeout the number of milliseconds to wait for the GET_STATE_OK response
1092     * @return true of the state was received, false if the operation timed out
1093     */

1094    boolean _getState(Event evt, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
1095        checkClosed();
1096        checkNotConnected();
1097        if(!state_transfer_supported) {
1098            log.error("fetching state will fail as state transfer is not supported. "
1099                      + "Add one of the STATE_TRANSFER protocols to your protocol configuration");
1100            return false;
1101        }
1102
1103        state_promise.reset();
1104        down(evt);
1105        byte[] state=(byte[])state_promise.getResult(timeout);
1106        if(state != null) // state set by GET_STATE_OK event
1107
return true;
1108        else
1109            return false;
1110    }
1111
1112
1113    /**
1114     * Disconnects and closes the channel.
1115     * This method does the folloing things
1116     * 1. Calls <code>this.disconnect</code> if the disconnect parameter is true
1117     * 2. Calls <code>Queue.close</code> on mq if the close_mq parameter is true
1118     * 3. Calls <code>ProtocolStack.stop</code> on the protocol stack
1119     * 4. Calls <code>ProtocolStack.destroy</code> on the protocol stack
1120     * 5. Sets the channel closed and channel connected flags to true and false
1121     * 6. Notifies any channel listener of the channel close operation
1122     */

1123    void _close(boolean disconnect, boolean close_mq) {
1124        if(closed)
1125            return;
1126
1127        if(disconnect)
1128            disconnect(); // leave group if connected
1129

1130        if(close_mq) {
1131            try {
1132                if(mq != null)
1133                    mq.close(false); // closes and removes all messages
1134
}
1135            catch(Exception JavaDoc e) {
1136                if(log.isErrorEnabled()) log.error("exception: " + e);
1137            }
1138        }
1139
1140        if(prot_stack != null) {
1141            try {
1142                prot_stack.stopStack();
1143                prot_stack.destroy();
1144            }
1145            catch(Exception JavaDoc e) {
1146                if(log.isErrorEnabled()) log.error("exception: " + e);
1147            }
1148        }
1149        closed=true;
1150        connected=false;
1151        if(channel_listener != null)
1152            channel_listener.channelClosed(this);
1153        init(); // sets local_addr=null; changed March 18 2003 (bela) -- prevented successful rejoining
1154
}
1155
1156
1157    /**
1158     * Creates a separate thread to close the protocol stack.
1159     * This is needed because the thread that called JChannel.up() with the EXIT event would
1160     * hang waiting for up() to return, while up() actually tries to kill that very thread.
1161     * This way, we return immediately and allow the thread to terminate.
1162     */

1163    void handleExit(Event evt) {
1164        if(channel_listener != null)
1165            channel_listener.channelShunned();
1166
1167        if(closer != null && !closer.isAlive())
1168            closer=null;
1169        if(closer == null) {
1170            if(log.isInfoEnabled())
1171                log.info("received an EXIT event, will leave the channel");
1172            closer=new CloserThread(evt);
1173            closer.start();
1174        }
1175    }
1176
1177    /* ------------------------------- End of Private Methods ---------------------------------- */
1178
1179
1180    class CloserThread extends Thread JavaDoc {
1181        final Event evt;
1182        final Thread JavaDoc t=null;
1183
1184
1185        CloserThread(Event evt) {
1186            this.evt=evt;
1187            setName("CloserThread");
1188            setDaemon(true);
1189        }
1190
1191
1192        public void run() {
1193            try {
1194                String JavaDoc old_channel_name=channel_name; // remember because close() will null it
1195
if(log.isInfoEnabled())
1196                    log.info("closing the channel");
1197                _close(false, false); // do not disconnect before closing channel, do not close mq (yet !)
1198

1199                if(up_handler != null)
1200                    up_handler.up(this.evt);
1201                else {
1202                    try {
1203                        mq.add(this.evt);
1204                    }
1205                    catch(Exception JavaDoc ex) {
1206                        if(log.isErrorEnabled()) log.error("exception: " + ex);
1207                    }
1208                }
1209
1210                if(mq != null) {
1211                    Util.sleep(500); // give the mq thread a bit of time to deliver EXIT to the application
1212
try {
1213                        mq.close(false);
1214                    }
1215                    catch(Exception JavaDoc ex) {
1216                    }
1217                }
1218
1219                if(auto_reconnect) {
1220                    try {
1221                        if(log.isInfoEnabled()) log.info("reconnecting to group " + old_channel_name);
1222                        open();
1223                    }
1224                    catch(Exception JavaDoc ex) {
1225                        if(log.isErrorEnabled()) log.error("failure reopening channel: " + ex);
1226                        return;
1227                    }
1228                    try {
1229                        if(additional_data != null) {
1230                            // set previously set additional data
1231
Map JavaDoc m=new HashMap JavaDoc(11);
1232                            m.put("additional_data", additional_data);
1233                            down(new Event(Event.CONFIG, m));
1234                        }
1235                        connect(old_channel_name);
1236                        if(channel_listener != null)
1237                            channel_listener.channelReconnected(local_addr);
1238                    }
1239                    catch(Exception JavaDoc ex) {
1240                        if(log.isErrorEnabled()) log.error("failure reconnecting to channel: " + ex);
1241                        return;
1242                    }
1243                }
1244
1245                if(auto_getstate) {
1246                    if(log.isInfoEnabled())
1247                        log.info("fetching the state (auto_getstate=true)");
1248                    boolean rc=JChannel.this.getState(null, GET_STATE_DEFAULT_TIMEOUT);
1249                    if(rc)
1250                        if(log.isInfoEnabled()) log.info("state was retrieved successfully");
1251                    else
1252                        if(log.isInfoEnabled()) log.info("state transfer failed");
1253                }
1254
1255            }
1256            catch(Exception JavaDoc ex) {
1257                if(log.isErrorEnabled()) log.error("exception: " + ex);
1258            }
1259            finally {
1260                closer=null;
1261            }
1262        }
1263    }
1264
1265}
1266
Popular Tags