KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > TP


1 package org.jgroups.protocols;
2
3
4 import org.jgroups.*;
5 import org.jgroups.Channel;
6 import org.jgroups.annotations.GuardedBy;
7 import org.jgroups.stack.IpAddress;
8 import org.jgroups.stack.Protocol;
9 import org.jgroups.util.*;
10 import org.jgroups.util.Queue;
11
12 import java.io.DataInputStream JavaDoc;
13 import java.io.IOException JavaDoc;
14 import java.io.DataOutputStream JavaDoc;
15 import java.net.*;
16 import java.text.NumberFormat JavaDoc;
17 import java.util.*;
18 import java.util.List JavaDoc;
19 import java.util.concurrent.*;
20 import java.util.concurrent.locks.ReentrantLock JavaDoc;
21
22
23 /**
24  * Generic transport - specific implementations should extend this abstract class.
25  * Features which are provided to the subclasses include
26  * <ul>
27  * <li>version checking
28  * <li>marshalling and unmarshalling
29  * <li>message bundling (handling single messages, and message lists)
30  * <li>incoming packet handler
31  * <li>loopback
32  * </ul>
33  * A subclass has to override
34  * <ul>
35  * <li>{@link #sendToAllMembers(byte[], int, int)}
36  * <li>{@link #sendToSingleMember(org.jgroups.Address, byte[], int, int)}
37  * <li>{@link #init()}
38  * <li>{@link #start()}: subclasses <em>must</em> call super.start() <em>after</em> they initialize themselves
39  * (e.g., created their sockets).
40  * <li>{@link #stop()}: subclasses <em>must</em> call super.stop() after they deinitialized themselves
41  * <li>{@link #destroy()}
42  * </ul>
43  * The create() or start() method has to create a local address.<br>
44  * The {@link #receive(Address, Address, byte[], int, int)} method must
45  * be called by subclasses when a unicast or multicast message has been received.
46  * @author Bela Ban
47  * @version $Id: TP.java,v 1.148 2007/07/02 10:30:54 belaban Exp $
48  */

49 public abstract class TP extends Protocol {
50
51     /** The address (host and port) of this member */
52     protected Address local_addr=null;
53
54     /** The name of the group to which this member is connected */
55     protected String JavaDoc channel_name=null;
56
57     /** The interface (NIC) which should be used by this transport */
58     protected InetAddress bind_addr=null;
59
60     /** Overrides bind_addr, -Djgroups.bind_addr and -Dbind.address: let's the OS return the local host address */
61     boolean use_local_host=false;
62
63     /** If true, the transport should use all available interfaces to receive multicast messages */
64     boolean receive_on_all_interfaces=false;
65
66     /** List<NetworkInterface> of interfaces to receive multicasts on. The multicast receive socket will listen
67      * on all of these interfaces. This is a comma-separated list of IP addresses or interface names. E.g.
68      * "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded; we only bind to an interface once.
69      * If this property is set, it override receive_on_all_interfaces.
70      */

71     List JavaDoc<NetworkInterface> receive_interfaces=null;
72
73     /** If true, the transport should use all available interfaces to send multicast messages. This means
74      * the same multicast message is sent N times, so use with care */

75     boolean send_on_all_interfaces=false;
76
77     /** List<NetworkInterface> of interfaces to send multicasts on. The multicast send socket will send the
78      * same multicast message on all of these interfaces. This is a comma-separated list of IP addresses or
79      * interface names. E.g. "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded.
80      * If this property is set, it override send_on_all_interfaces.
81      */

82     List JavaDoc<NetworkInterface> send_interfaces=null;
83
84
85     /** The port to which the transport binds. 0 means to bind to any (ephemeral) port */
86     int bind_port=0;
87     int port_range=1; // 27-6-2003 bgooren, Only try one port by default
88

89     /** The members of this group (updated when a member joins or leaves) */
90     final protected Vector<Address> members=new Vector<Address>(11);
91
92     protected View view=null;
93
94
95     final ExposedByteArrayInputStream in_stream=new ExposedByteArrayInputStream(new byte[]{'0'});
96     final DataInputStream JavaDoc dis=new DataInputStream JavaDoc(in_stream);
97
98
99     /** If true, messages sent to self are treated specially: unicast messages are
100      * looped back immediately, multicast messages get a local copy first and -
101      * when the real copy arrives - it will be discarded. Useful for Window
102      * media (non)sense */

103     boolean loopback=false;
104
105
106     /** Discard packets with a different version. Usually minor version differences are okay. Setting this property
107      * to true means that we expect the exact same version on all incoming packets */

108     protected boolean discard_incompatible_packets=false;
109
110     /** Sometimes receivers are overloaded (they have to handle de-serialization etc).
111      * Packet handler is a separate thread taking care of de-serialization, receiver
112      * thread(s) simply put packet in queue and return immediately. Setting this to
113      * true adds one more thread */

114     boolean use_incoming_packet_handler=true;
115
116     /** Used by packet handler to store incoming DatagramPackets */
117     Queue incoming_packet_queue=null;
118
119     /** Dequeues DatagramPackets from packet_queue, unmarshalls them and
120      * calls <tt>handleIncomingUdpPacket()</tt> */

121     IncomingPacketHandler incoming_packet_handler=null;
122
123
124     /** Used by packet handler to store incoming Messages */
125     Queue incoming_msg_queue=null;
126
127     IncomingMessageHandler incoming_msg_handler;
128
129
130     boolean use_concurrent_stack=true;
131     ThreadGroup JavaDoc pool_thread_group=new ThreadGroup JavaDoc(Util.getGlobalThreadGroup(), "Thread Pools");
132
133     /**
134      * Names the current thread. Valid values are "pcl":
135      * p: include the previous (original) name, e.g. "Incoming thread-1", "UDP ucast receiver"
136      * c: include the cluster name, e.g. "MyCluster"
137      * l: include the local address of the current member, e.g. "192.168.5.1:5678"
138      */

139     protected ThreadNamingPattern thread_naming_pattern=new ThreadNamingPattern("cl");
140
141
142     /** ================================== OOB thread pool ============================== */
143     /** The thread pool which handles OOB messages */
144     Executor oob_thread_pool;
145     boolean oob_thread_pool_enabled=true;
146     int oob_thread_pool_min_threads=2;
147     int oob_thread_pool_max_threads=10;
148     /** Number of milliseconds after which an idle thread is removed */
149     long oob_thread_pool_keep_alive_time=30000;
150
151     long num_oob_msgs_received=0;
152
153     /** Used if oob_thread_pool is a ThreadPoolExecutor and oob_thread_pool_queue_enabled is true */
154     BlockingQueue<Runnable JavaDoc> oob_thread_pool_queue=null;
155     /** Whether of not to use a queue with ThreadPoolExecutor (ignored with direct executor) */
156     boolean oob_thread_pool_queue_enabled=true;
157     /** max number of elements in queue (bounded) */
158     int oob_thread_pool_queue_max_size=500;
159     /** Possible values are "Abort", "Discard", "DiscardOldest" and "Run". These values might change once we switch to
160      * JDK 5's java.util.concurrent package */

161     String JavaDoc oob_thread_pool_rejection_policy="Run";
162
163
164 /** ================================== Regular thread pool ============================== */
165
166     /** The thread pool which handles unmarshalling, version checks and dispatching of regular messages */
167     Executor thread_pool;
168     boolean thread_pool_enabled=true;
169     int thread_pool_min_threads=2;
170     int thread_pool_max_threads=10;
171     /** Number of milliseconds after which an idle thread is removed */
172     long thread_pool_keep_alive_time=30000;
173
174     long num_incoming_msgs_received=0;
175
176     /** Used if thread_pool is a ThreadPoolExecutor and thread_pool_queue_enabled is true */
177     BlockingQueue<Runnable JavaDoc> thread_pool_queue=null;
178     /** Whether of not to use a queue with ThreadPoolExecutor (ignored with directE decutor) */
179     boolean thread_pool_queue_enabled=true;
180     /** max number of elements in queue (bounded) */
181     int thread_pool_queue_max_size=500;
182     /** Possible values are "Abort", "Discard", "DiscardOldest" and "Run". These values might change once we switch to
183      * JDK 5's java.util.concurrent package */

184     String JavaDoc thread_pool_rejection_policy="Run";
185
186
187     /** If set it will be added to <tt>local_addr</tt>. Used to implement
188      * for example transport independent addresses */

189     byte[] additional_data=null;
190
191     /** Maximum number of bytes for messages to be queued until they are sent. This value needs to be smaller
192         than the largest datagram packet size in case of UDP */

193     int max_bundle_size=65535;
194
195     /** Max number of milliseconds until queued messages are sent. Messages are sent when max_bundle_size or
196      * max_bundle_timeout has been exceeded (whichever occurs faster)
197      */

198     long max_bundle_timeout=20;
199
200     /** Enabled bundling of smaller messages into bigger ones */
201     boolean enable_bundling=false;
202
203     private Bundler bundler=null;
204
205     protected TimeScheduler timer=null;
206
207     private DiagnosticsHandler diag_handler=null;
208     boolean enable_diagnostics=true;
209     String JavaDoc diagnostics_addr="224.0.0.75";
210     int diagnostics_port=7500;
211
212     TpHeader header;
213     final String JavaDoc name=getName();
214
215     static final byte LIST = 1; // we have a list of messages rather than a single message when set
216
static final byte MULTICAST = 2; // message is a multicast (versus a unicast) message when set
217
static final byte OOB = 4; // message has OOB flag set (Message.OOB)
218

219     long num_msgs_sent=0, num_msgs_received=0, num_bytes_sent=0, num_bytes_received=0;
220
221     static NumberFormat JavaDoc f;
222     private static final int INITIAL_BUFSIZE=1024;
223
224     static {
225         f=NumberFormat.getNumberInstance();
226         f.setGroupingUsed(false);
227         f.setMaximumFractionDigits(2);
228     }
229
230
231
232
233     /**
234      * Creates the TP protocol, and initializes the
235      * state variables, does however not start any sockets or threads.
236      */

237     protected TP() {
238     }
239
240     /**
241      * debug only
242      */

243     public String JavaDoc toString() {
244         return name + "(local address: " + local_addr + ')';
245     }
246
247     public void resetStats() {
248         num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=0;
249         num_oob_msgs_received=num_incoming_msgs_received=0;
250     }
251
252     public long getNumMessagesSent() {return num_msgs_sent;}
253     public long getNumMessagesReceived() {return num_msgs_received;}
254     public long getNumBytesSent() {return num_bytes_sent;}
255     public long getNumBytesReceived() {return num_bytes_received;}
256     public String JavaDoc getBindAddress() {return bind_addr != null? bind_addr.toString() : "null";}
257     public void setBindAddress(String JavaDoc bind_addr) throws UnknownHostException {
258         this.bind_addr=InetAddress.getByName(bind_addr);
259     }
260     /** @deprecated Use {@link #isReceiveOnAllInterfaces()} instead */
261     public boolean getBindToAllInterfaces() {return receive_on_all_interfaces;}
262     public void setBindToAllInterfaces(boolean flag) {this.receive_on_all_interfaces=flag;}
263
264     public boolean isReceiveOnAllInterfaces() {return receive_on_all_interfaces;}
265     public java.util.List JavaDoc getReceiveInterfaces() {return receive_interfaces;}
266     public boolean isSendOnAllInterfaces() {return send_on_all_interfaces;}
267     public java.util.List JavaDoc getSendInterfaces() {return send_interfaces;}
268     public boolean isDiscardIncompatiblePackets() {return discard_incompatible_packets;}
269     public void setDiscardIncompatiblePackets(boolean flag) {discard_incompatible_packets=flag;}
270     public boolean isEnableBundling() {return enable_bundling;}
271     public void setEnableBundling(boolean flag) {enable_bundling=flag;}
272     public int getMaxBundleSize() {return max_bundle_size;}
273     public void setMaxBundleSize(int size) {max_bundle_size=size;}
274     public long getMaxBundleTimeout() {return max_bundle_timeout;}
275     public void setMaxBundleTimeout(long timeout) {max_bundle_timeout=timeout;}
276     public Address getLocalAddress() {return local_addr;}
277     public String JavaDoc getChannelName() {return channel_name;}
278     public boolean isLoopback() {return loopback;}
279     public void setLoopback(boolean b) {loopback=b;}
280     public boolean isUseIncomingPacketHandler() {return use_incoming_packet_handler;}
281
282
283
284
285
286     public int getOOBMinPoolSize() {
287         return oob_thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)oob_thread_pool).getCorePoolSize() : 0;
288     }
289
290     public void setOOBMinPoolSize(int size) {
291         if(oob_thread_pool instanceof ThreadPoolExecutor)
292             ((ThreadPoolExecutor)oob_thread_pool).setCorePoolSize(size);
293     }
294
295     public int getOOBMaxPoolSize() {
296         return oob_thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)oob_thread_pool).getMaximumPoolSize() : 0;
297     }
298
299     public void setOOBMaxPoolSize(int size) {
300         if(oob_thread_pool instanceof ThreadPoolExecutor)
301             ((ThreadPoolExecutor)oob_thread_pool).setMaximumPoolSize(size);
302     }
303
304     public int getOOBPoolSize() {
305         return oob_thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)oob_thread_pool).getPoolSize() : 0;
306     }
307
308     public long getOOBKeepAliveTime() {
309         return oob_thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)oob_thread_pool).getKeepAliveTime(TimeUnit.MILLISECONDS) : 0;
310     }
311
312     public void setOOBKeepAliveTime(long time) {
313         if(oob_thread_pool instanceof ThreadPoolExecutor)
314             ((ThreadPoolExecutor)oob_thread_pool).setKeepAliveTime(time, TimeUnit.MILLISECONDS);
315     }
316
317     public long getOOBMessages() {
318         return num_oob_msgs_received;
319     }
320
321     public int getOOBQueueSize() {
322         return oob_thread_pool_queue.size();
323     }
324
325     public int getOOBMaxQueueSize() {
326         return oob_thread_pool_queue_max_size;
327     }
328
329
330
331
332     public int getIncomingMinPoolSize() {
333         return thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)thread_pool).getCorePoolSize() : 0;
334     }
335
336     public void setIncomingMinPoolSize(int size) {
337         if(thread_pool instanceof ThreadPoolExecutor)
338             ((ThreadPoolExecutor)thread_pool).setCorePoolSize(size);
339     }
340
341     public int getIncomingMaxPoolSize() {
342         return thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)thread_pool).getMaximumPoolSize() : 0;
343     }
344
345     public void setIncomingMaxPoolSize(int size) {
346         if(thread_pool instanceof ThreadPoolExecutor)
347             ((ThreadPoolExecutor)thread_pool).setMaximumPoolSize(size);
348     }
349
350     public int getIncomingPoolSize() {
351         return thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)thread_pool).getPoolSize() : 0;
352     }
353
354     public long getIncomingKeepAliveTime() {
355         return thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)thread_pool).getKeepAliveTime(TimeUnit.MILLISECONDS) : 0;
356     }
357
358     public void setIncomingKeepAliveTime(long time) {
359         if(thread_pool instanceof ThreadPoolExecutor)
360             ((ThreadPoolExecutor)thread_pool).setKeepAliveTime(time, TimeUnit.MILLISECONDS);
361     }
362
363     public long getIncomingMessages() {
364         return num_incoming_msgs_received;
365     }
366
367     public int getIncomingQueueSize() {
368         return thread_pool_queue.size();
369     }
370
371     public int getIncomingMaxQueueSize() {
372         return thread_pool_queue_max_size;
373     }
374
375
376
377
378
379
380     public Map<String JavaDoc,Object JavaDoc> dumpStats() {
381         Map<String JavaDoc,Object JavaDoc> retval=super.dumpStats();
382         if(retval == null)
383             retval=new HashMap<String JavaDoc,Object JavaDoc>();
384         retval.put("num_msgs_sent", new Long JavaDoc(num_msgs_sent));
385         retval.put("num_msgs_received", new Long JavaDoc(num_msgs_received));
386         retval.put("num_bytes_sent", new Long JavaDoc(num_bytes_sent));
387         retval.put("num_bytes_received", new Long JavaDoc(num_bytes_received));
388         return retval;
389     }
390
391
392     /**
393      * Send to all members in the group. UDP would use an IP multicast message, whereas TCP would send N
394      * messages, one for each member
395      * @param data The data to be sent. This is not a copy, so don't modify it
396      * @param offset
397      * @param length
398      * @throws Exception
399      */

400     public abstract void sendToAllMembers(byte[] data, int offset, int length) throws Exception JavaDoc;
401
402     /**
403      * Send to all members in the group. UDP would use an IP multicast message, whereas TCP would send N
404      * messages, one for each member
405      * @param dest Must be a non-null unicast address
406      * @param data The data to be sent. This is not a copy, so don't modify it
407      * @param offset
408      * @param length
409      * @throws Exception
410      */

411     public abstract void sendToSingleMember(Address dest, byte[] data, int offset, int length) throws Exception JavaDoc;
412
413     public abstract String JavaDoc getInfo();
414
415     public abstract void postUnmarshalling(Message msg, Address dest, Address src, boolean multicast);
416
417     public abstract void postUnmarshallingList(Message msg, Address dest, boolean multicast);
418
419
420     private StringBuffer JavaDoc _getInfo() {
421         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
422         sb.append(local_addr).append(" (").append(channel_name).append(") ").append("\n");
423         sb.append("local_addr=").append(local_addr).append("\n");
424         sb.append("group_name=").append(channel_name).append("\n");
425         sb.append("version=").append(Version.description).append(", cvs=\"").append(Version.cvs).append("\"\n");
426         sb.append("view: ").append(view).append('\n');
427         sb.append(getInfo());
428         return sb;
429     }
430
431
432     private void handleDiagnosticProbe(SocketAddress sender, DatagramSocket sock, String JavaDoc request) {
433         try {
434             StringTokenizer tok=new StringTokenizer(request);
435             String JavaDoc req=tok.nextToken();
436             StringBuffer JavaDoc info=new StringBuffer JavaDoc("n/a");
437             if(req.trim().toLowerCase().startsWith("query")) {
438                 ArrayList<String JavaDoc> l=new ArrayList<String JavaDoc>(tok.countTokens());
439                 while(tok.hasMoreTokens())
440                     l.add(tok.nextToken().trim().toLowerCase());
441
442                 info=_getInfo();
443
444                 if(l.contains("jmx")) {
445                     Channel ch=stack.getChannel();
446                     if(ch != null) {
447                         Map m=ch.dumpStats();
448                         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
449                         sb.append("stats:\n");
450                         for(Iterator it=m.entrySet().iterator(); it.hasNext();) {
451                             sb.append(it.next()).append("\n");
452                         }
453                         info.append(sb);
454                     }
455                 }
456                 if(l.contains("props")) {
457                     String JavaDoc p=stack.printProtocolSpecAsXML();
458                     info.append("\nprops:\n").append(p);
459                 }
460             }
461
462
463             byte[] diag_rsp=info.toString().getBytes();
464             if(log.isDebugEnabled())
465                 log.debug("sending diag response to " + sender);
466             sendResponse(sock, sender, diag_rsp);
467         }
468         catch(Throwable JavaDoc t) {
469             if(log.isErrorEnabled())
470                 log.error("failed sending diag rsp to " + sender, t);
471         }
472     }
473
474     private static void sendResponse(DatagramSocket sock, SocketAddress sender, byte[] buf) throws IOException JavaDoc {
475         DatagramPacket p=new DatagramPacket(buf, 0, buf.length, sender);
476         sock.send(p);
477     }
478
479     /* ------------------------------------------------------------------------------- */
480
481
482
483     /*------------------------------ Protocol interface ------------------------------ */
484
485
486     public void init() throws Exception JavaDoc {
487         super.init();
488         if(bind_addr != null) {
489             Map<String JavaDoc,Object JavaDoc> m=new HashMap<String JavaDoc,Object JavaDoc>(1);
490             m.put("bind_addr", bind_addr);
491             up_prot.up(new Event(Event.CONFIG, m));
492         }
493     }
494
495
496
497     /**
498      * Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads
499      */

500     public void start() throws Exception JavaDoc {
501         timer=stack.timer;
502         if(timer == null)
503             throw new Exception JavaDoc("timer is null");
504
505         if(enable_diagnostics) {
506             diag_handler=new DiagnosticsHandler();
507             diag_handler.start();
508         }
509
510         if(use_incoming_packet_handler && !use_concurrent_stack) {
511             incoming_packet_queue=new Queue();
512             incoming_packet_handler=new IncomingPacketHandler();
513             incoming_packet_handler.start();
514         }
515
516
517         // ========================================== OOB thread pool ==============================
518
if(oob_thread_pool_enabled) { // create a ThreadPoolExecutor for the unmarshaller thread pool
519
if(oob_thread_pool_queue_enabled)
520                 oob_thread_pool_queue=new LinkedBlockingQueue<Runnable JavaDoc>(oob_thread_pool_queue_max_size);
521             else
522                 oob_thread_pool_queue=new SynchronousQueue<Runnable JavaDoc>();
523             oob_thread_pool=createThreadPool(oob_thread_pool_min_threads, oob_thread_pool_max_threads, oob_thread_pool_keep_alive_time,
524                                              oob_thread_pool_rejection_policy, oob_thread_pool_queue, "OOB", "OOB Thread");
525         }
526         else { // otherwise use the caller's thread to unmarshal the byte buffer into a message
527
oob_thread_pool=new DirectExecutor();
528         }
529
530
531         // ====================================== Regular thread pool ===========================
532
if(thread_pool_enabled) { // create a ThreadPoolExecutor for the unmarshaller thread pool
533
if(thread_pool_queue_enabled)
534                 thread_pool_queue=new LinkedBlockingQueue<Runnable JavaDoc>(thread_pool_queue_max_size);
535             else
536                 thread_pool_queue=new SynchronousQueue<Runnable JavaDoc>();
537             thread_pool=createThreadPool(thread_pool_min_threads, thread_pool_max_threads, thread_pool_keep_alive_time,
538                                          thread_pool_rejection_policy, thread_pool_queue, "Incoming", "Incoming Thread");
539         }
540         else { // otherwise use the caller's thread to unmarshal the byte buffer into a message
541
thread_pool=new DirectExecutor();
542         }
543
544
545         if(loopback && !use_concurrent_stack) {
546             incoming_msg_queue=new Queue();
547             incoming_msg_handler=new IncomingMessageHandler();
548             incoming_msg_handler.start();
549         }
550
551         if(enable_bundling) {
552             bundler=new Bundler();
553         }
554
555         up_prot.up(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
556     }
557
558
559     public void stop() {
560         if(diag_handler != null) {
561             diag_handler.stop();
562             diag_handler=null;
563         }
564
565         // 1. Stop the incoming packet handler thread
566
if(incoming_packet_handler != null)
567             incoming_packet_handler.stop();
568
569
570         // 2. Stop the incoming message handler
571
if(incoming_msg_handler != null)
572             incoming_msg_handler.stop();
573
574         // 3. Stop the thread pools
575

576         if(oob_thread_pool instanceof ThreadPoolExecutor) {
577             shutdownThreadPool((ThreadPoolExecutor)oob_thread_pool);
578         }
579
580         if(thread_pool instanceof ThreadPoolExecutor) {
581             shutdownThreadPool((ThreadPoolExecutor)thread_pool);
582         }
583     }
584
585
586
587
588     /**
589      * Setup the Protocol instance according to the configuration string
590      * @return true if no other properties are left.
591      * false if the properties still have data in them, ie ,
592      * properties are left over and not handled by the protocol stack
593      */

594     public boolean setProperties(Properties props) {
595         super.setProperties(props);
596
597         boolean ignore_systemprops=Util.isBindAddressPropertyIgnored();
598         String JavaDoc str=Util.getProperty(new String JavaDoc[]{Global.BIND_ADDR, Global.BIND_ADDR_OLD}, props, "bind_addr",
599                                     ignore_systemprops, null);
600
601         if(str != null) {
602             try {
603                 bind_addr=InetAddress.getByName(str);
604             }
605             catch(UnknownHostException unknown) {
606                 if(log.isFatalEnabled()) log.fatal("(bind_addr): host " + str + " not known");
607                 return false;
608             }
609             props.remove("bind_addr");
610         }
611
612         str=props.getProperty("use_local_host");
613         if(str != null) {
614             use_local_host=Boolean.parseBoolean(str);
615             props.remove("use_local_host");
616         }
617
618         str=props.getProperty("bind_to_all_interfaces");
619         if(str != null) {
620             receive_on_all_interfaces=Boolean.parseBoolean(str);
621             props.remove("bind_to_all_interfaces");
622             log.warn("bind_to_all_interfaces has been deprecated; use receive_on_all_interfaces instead");
623         }
624
625         str=props.getProperty("receive_on_all_interfaces");
626         if(str != null) {
627             receive_on_all_interfaces=Boolean.parseBoolean(str);
628             props.remove("receive_on_all_interfaces");
629         }
630
631         str=props.getProperty("receive_interfaces");
632         if(str != null) {
633             try {
634                 receive_interfaces=Util.parseInterfaceList(str);
635                 props.remove("receive_interfaces");
636             }
637             catch(Exception JavaDoc e) {
638                 log.error("error determining interfaces (" + str + ")", e);
639                 return false;
640             }
641         }
642
643         str=props.getProperty("send_on_all_interfaces");
644         if(str != null) {
645             send_on_all_interfaces=Boolean.parseBoolean(str);
646             props.remove("send_on_all_interfaces");
647         }
648
649         str=props.getProperty("send_interfaces");
650         if(str != null) {
651             try {
652                 send_interfaces=Util.parseInterfaceList(str);
653                 props.remove("send_interfaces");
654             }
655             catch(Exception JavaDoc e) {
656                 log.error("error determining interfaces (" + str + ")", e);
657                 return false;
658             }
659         }
660
661         str=props.getProperty("bind_port");
662         if(str != null) {
663             bind_port=Integer.parseInt(str);
664             props.remove("bind_port");
665         }
666
667         str=props.getProperty("port_range");
668         if(str != null) {
669             port_range=Integer.parseInt(str);
670             props.remove("port_range");
671         }
672
673         str=props.getProperty("loopback");
674         if(str != null) {
675             loopback=Boolean.valueOf(str).booleanValue();
676             props.remove("loopback");
677         }
678
679         str=props.getProperty("discard_incompatible_packets");
680         if(str != null) {
681             discard_incompatible_packets=Boolean.valueOf(str).booleanValue();
682             props.remove("discard_incompatible_packets");
683         }
684
685         // this is deprecated, just left for compatibility (use use_incoming_packet_handler)
686
str=props.getProperty("use_packet_handler");
687         if(str != null) {
688             use_incoming_packet_handler=Boolean.valueOf(str).booleanValue();
689             props.remove("use_packet_handler");
690             if(log.isWarnEnabled()) log.warn("'use_packet_handler' is deprecated; use 'use_incoming_packet_handler' instead");
691         }
692
693         str=props.getProperty("use_incoming_packet_handler");
694         if(str != null) {
695             use_incoming_packet_handler=Boolean.valueOf(str).booleanValue();
696             props.remove("use_incoming_packet_handler");
697         }
698
699         str=props.getProperty("use_concurrent_stack");
700         if(str != null) {
701             use_concurrent_stack=Boolean.valueOf(str).booleanValue();
702             props.remove("use_concurrent_stack");
703         }
704
705         str=props.getProperty("thread_naming_pattern");
706         if(str != null) {
707             thread_naming_pattern=new ThreadNamingPattern(str);
708             props.remove("thread_naming_pattern");
709         }
710
711         // ======================================= OOB thread pool =========================================
712
str=props.getProperty("oob_thread_pool.enabled");
713         if(str != null) {
714             oob_thread_pool_enabled=Boolean.valueOf(str).booleanValue();
715             props.remove("oob_thread_pool.enabled");
716         }
717
718         str=props.getProperty("oob_thread_pool.min_threads");
719         if(str != null) {
720             oob_thread_pool_min_threads=Integer.valueOf(str).intValue();
721             props.remove("oob_thread_pool.min_threads");
722         }
723
724         str=props.getProperty("oob_thread_pool.max_threads");
725         if(str != null) {
726             oob_thread_pool_max_threads=Integer.valueOf(str).intValue();
727             props.remove("oob_thread_pool.max_threads");
728         }
729
730         str=props.getProperty("oob_thread_pool.keep_alive_time");
731         if(str != null) {
732             oob_thread_pool_keep_alive_time=Long.valueOf(str).longValue();
733             props.remove("oob_thread_pool.keep_alive_time");
734         }
735
736         str=props.getProperty("oob_thread_pool.queue_enabled");
737         if(str != null) {
738             oob_thread_pool_queue_enabled=Boolean.valueOf(str).booleanValue();
739             props.remove("oob_thread_pool.queue_enabled");
740         }
741
742         str=props.getProperty("oob_thread_pool.queue_max_size");
743         if(str != null) {
744             oob_thread_pool_queue_max_size=Integer.valueOf(str).intValue();
745             props.remove("oob_thread_pool.queue_max_size");
746         }
747
748         str=props.getProperty("oob_thread_pool.rejection_policy");
749         if(str != null) {
750             str=str.toLowerCase().trim();
751             oob_thread_pool_rejection_policy=str;
752             if(!(str.equals("run") || str.equals("abort")|| str.equals("discard")|| str.equals("discardoldest"))) {
753                 log.error("rejection policy of " + str + " is unknown");
754                 return false;
755             }
756             props.remove("oob_thread_pool.rejection_policy");
757         }
758
759
760
761
762         // ======================================= Regular thread pool =========================================
763
str=props.getProperty("thread_pool.enabled");
764         if(str != null) {
765             thread_pool_enabled=Boolean.valueOf(str).booleanValue();
766             props.remove("thread_pool.enabled");
767         }
768
769         str=props.getProperty("thread_pool.min_threads");
770         if(str != null) {
771             thread_pool_min_threads=Integer.valueOf(str).intValue();
772             props.remove("thread_pool.min_threads");
773         }
774
775         str=props.getProperty("thread_pool.max_threads");
776         if(str != null) {
777             thread_pool_max_threads=Integer.valueOf(str).intValue();
778             props.remove("thread_pool.max_threads");
779         }
780
781         str=props.getProperty("thread_pool.keep_alive_time");
782         if(str != null) {
783             thread_pool_keep_alive_time=Long.valueOf(str).longValue();
784             props.remove("thread_pool.keep_alive_time");
785         }
786
787         str=props.getProperty("thread_pool.queue_enabled");
788         if(str != null) {
789             thread_pool_queue_enabled=Boolean.valueOf(str).booleanValue();
790             props.remove("thread_pool.queue_enabled");
791         }
792
793         str=props.getProperty("thread_pool.queue_max_size");
794         if(str != null) {
795             thread_pool_queue_max_size=Integer.valueOf(str).intValue();
796             props.remove("thread_pool.queue_max_size");
797         }
798
799         str=props.getProperty("thread_pool.rejection_policy");
800         if(str != null) {
801             str=str.toLowerCase().trim();
802             thread_pool_rejection_policy=str;
803             if(!(str.equals("run") || str.equals("abort")|| str.equals("discard")|| str.equals("discardoldest"))) {
804                 log.error("rejection policy of " + str + " is unknown");
805                 return false;
806             }
807             props.remove("thread_pool.rejection_policy");
808         }
809
810
811         str=props.getProperty("use_outgoing_packet_handler");
812         if(str != null) {
813             log.warn("Attribute \"use_outgoing_packet_handler\" has been deprecated and is ignored");
814             props.remove("use_outgoing_packet_handler");
815         }
816
817         str=props.getProperty("outgoing_queue_max_size");
818         if(str != null) {
819             log.warn("Attribute \"use_outgoing_queue_max_size\" has been deprecated and is ignored");
820             props.remove("outgoing_queue_max_size");
821         }
822
823         str=props.getProperty("max_bundle_size");
824         if(str != null) {
825             int bundle_size=Integer.parseInt(str);
826             if(bundle_size > max_bundle_size) {
827                 if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size +
828                         ") is greater than largest TP fragmentation size (" + max_bundle_size + ')');
829                 return false;
830             }
831             if(bundle_size <= 0) {
832                 if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size + ") is <= 0");
833                 return false;
834             }
835             max_bundle_size=bundle_size;
836             props.remove("max_bundle_size");
837         }
838
839         str=props.getProperty("max_bundle_timeout");
840         if(str != null) {
841             max_bundle_timeout=Long.parseLong(str);
842             if(max_bundle_timeout <= 0) {
843                 if(log.isErrorEnabled()) log.error("max_bundle_timeout of " + max_bundle_timeout + " is invalid");
844                 return false;
845             }
846             props.remove("max_bundle_timeout");
847         }
848
849         str=props.getProperty("enable_bundling");
850         if(str != null) {
851             enable_bundling=Boolean.valueOf(str).booleanValue();
852             props.remove("enable_bundling");
853         }
854
855         str=props.getProperty("enable_diagnostics");
856         if(str != null) {
857             enable_diagnostics=Boolean.valueOf(str).booleanValue();
858             props.remove("enable_diagnostics");
859         }
860
861         str=props.getProperty("diagnostics_addr");
862         if(str != null) {
863             diagnostics_addr=str;
864             props.remove("diagnostics_addr");
865         }
866
867         str=props.getProperty("diagnostics_port");
868         if(str != null) {
869             diagnostics_port=Integer.parseInt(str);
870             props.remove("diagnostics_port");
871         }
872
873         return true;
874     }
875
876
877
878
879     /**
880      * handle the UP event.
881      * @param evt - the event being send from the stack
882      */

883     public Object JavaDoc up(Event evt) {
884         switch(evt.getType()) {
885         case Event.CONFIG:
886             up_prot.up(evt);
887             if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
888             handleConfigEvent((HashMap)evt.getArg());
889             return null;
890         }
891         return up_prot.up(evt);
892     }
893
894     /**
895      * Caller by the layer above this layer. Usually we just put this Message
896      * into the send queue and let one or more worker threads handle it. A worker thread
897      * then removes the Message from the send queue, performs a conversion and adds the
898      * modified Message to the send queue of the layer below it, by calling down()).
899      */

900     public Object JavaDoc down(Event evt) {
901         if(evt.getType() != Event.MSG) { // unless it is a message handle it and respond
902
return handleDownEvent(evt);
903         }
904
905         Message msg=(Message)evt.getArg();
906         if(header != null) {
907             // added patch by Roland Kurmann (March 20 2003)
908
// msg.putHeader(name, new TpHeader(channel_name));
909
msg.putHeader(name, header);
910         }
911
912         setSourceAddress(msg); // very important !! listToBuffer() will fail with a null src address !!
913
if(log.isTraceEnabled()) {
914             log.trace("sending msg to " + msg.getDest() + ", SRC=" + msg.getSrc() + ", headers are " + msg.printHeaders());
915         }
916
917         // Don't send if destination is local address. Instead, switch dst and src and put in up_queue.
918
// If multicast message, loopback a copy directly to us (but still multicast). Once we receive this,
919
// we will discard our own multicast message
920
Address dest=msg.getDest();
921         boolean multicast=dest == null || dest.isMulticastAddress();
922         if(loopback && (multicast || dest.equals(local_addr))) {
923
924             // we *have* to make a copy, or else up_prot.up() might remove headers from msg which will then *not*
925
// be available for marshalling further down (when sending the message)
926
final Message copy=msg.copy();
927             if(log.isTraceEnabled()) log.trace(new StringBuffer JavaDoc("looping back message ").append(copy));
928             // up_prot.up(new Event(Event.MSG, copy));
929

930             // changed to fix http://jira.jboss.com/jira/browse/JGRP-506
931
Executor pool=msg.isFlagSet(Message.OOB)? oob_thread_pool : thread_pool;
932             pool.execute(new Runnable JavaDoc() {
933                 public void run() {
934                     up_prot.up(new Event(Event.MSG, copy));
935                 }
936             });
937
938             if(!multicast)
939                 return null;
940         }
941
942         try {
943             send(msg, dest, multicast);
944         }
945         catch(InterruptedException JavaDoc interruptedEx) {
946             Thread.currentThread().interrupt(); // let someone else handle the interrupt
947
}
948         catch(Throwable JavaDoc e) {
949             if(log.isErrorEnabled()) {
950                 String JavaDoc dst=msg.getDest() == null? "null" : msg.getDest().toString();
951                 log.error("failed sending message to " + dst + " (" + msg.size() + " bytes)", e);
952             }
953         }
954         return null;
955     }
956
957
958
959     /*--------------------------- End of Protocol interface -------------------------- */
960
961
962     /* ------------------------------ Private Methods -------------------------------- */
963
964
965
966     /**
967      * If the sender is null, set our own address. We cannot just go ahead and set the address
968      * anyway, as we might be sending a message on behalf of someone else ! E.gin case of
969      * retransmission, when the original sender has crashed, or in a FLUSH protocol when we
970      * have to return all unstable messages with the FLUSH_OK response.
971      */

972     private void setSourceAddress(Message msg) {
973         if(msg.getSrc() == null)
974             msg.setSrc(local_addr);
975     }
976
977     /**
978      * Subclasses must call this method when a unicast or multicast message has been received.
979      * Declared final so subclasses cannot override this method.
980      *
981      * @param dest
982      * @param sender
983      * @param data
984      * @param offset
985      * @param length
986      */

987     protected final void receive(Address dest, Address sender, byte[] data, int offset, int length) {
988         if(data == null) return;
989
990         if(log.isTraceEnabled()){
991             boolean mcast=dest == null || dest.isMulticastAddress();
992             StringBuilder JavaDoc sb=new StringBuilder JavaDoc("received (");
993             sb.append(mcast? "mcast) " : "ucast) ").append(length).append(" bytes from ").append(sender);
994             log.trace(sb.toString());
995         }
996
997         try {
998             // determine whether OOB or not by looking at first byte of 'data'
999
boolean oob=false;
1000            byte oob_flag=data[Global.SHORT_SIZE]; // we need to skip the first 2 bytes (version)
1001
if((oob_flag & OOB) == OOB)
1002                oob=true;
1003
1004            if(use_concurrent_stack) {
1005                if(oob) {
1006                    num_oob_msgs_received++;
1007                    dispatchToThreadPool(oob_thread_pool, dest, sender, data, offset, length);
1008                }
1009                else {
1010                    num_incoming_msgs_received++;
1011                    dispatchToThreadPool(thread_pool, dest, sender, data, offset, length);
1012                }
1013            }
1014            else {
1015                if(use_incoming_packet_handler) {
1016                    byte[] tmp=new byte[length];
1017                    System.arraycopy(data, offset, tmp, 0, length);
1018                    incoming_packet_queue.add(new IncomingPacket(dest, sender, tmp, 0, length));
1019                }
1020                else
1021                    handleIncomingPacket(dest, sender, data, offset, length);
1022            }
1023        }
1024        catch(Throwable JavaDoc t) {
1025            if(log.isErrorEnabled())
1026                log.error(new StringBuffer JavaDoc("failed handling data from ").append(sender), t);
1027        }
1028    }
1029
1030
1031
1032    private void dispatchToThreadPool(Executor pool, Address dest, Address sender, byte[] data, int offset, int length) {
1033        if(pool instanceof DirectExecutor) {
1034            // we don't make a copy of the buffer if we execute on this thread
1035
pool.execute(new IncomingPacket(dest, sender, data, offset, length));
1036        }
1037        else {
1038            byte[] tmp=new byte[length];
1039            System.arraycopy(data, offset, tmp, 0, length);
1040            pool.execute(new IncomingPacket(dest, sender, tmp, 0, length));
1041        }
1042    }
1043
1044
1045    /**
1046     * Processes a packet read from either the multicast or unicast socket. Needs to be synchronized because
1047     * mcast or unicast socket reads can be concurrent.
1048     * Correction (bela April 19 2005): we access no instance variables, all vars are allocated on the stack, so
1049     * this method should be reentrant: removed 'synchronized' keyword
1050     */

1051    private void handleIncomingPacket(Address dest, Address sender, byte[] data, int offset, int length) {
1052        Message msg=null;
1053        short version=0;
1054        boolean is_message_list, multicast;
1055        byte flags;
1056        List JavaDoc<Message> msgs;
1057
1058        try {
1059            synchronized(in_stream) {
1060                in_stream.setData(data, offset, length);
1061                try {
1062                    version=dis.readShort();
1063                }
1064                catch(IOException JavaDoc ex) {
1065                    if(discard_incompatible_packets)
1066                        return;
1067                    throw ex;
1068                }
1069                if(Version.isBinaryCompatible(version) == false) {
1070                    if(log.isWarnEnabled()) {
1071                        StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
1072                        sb.append("packet from ").append(sender).append(" has different version (").append(Version.print(version));
1073                        sb.append(") from ours (").append(Version.printVersion()).append("). ");
1074                        if(discard_incompatible_packets)
1075                            sb.append("Packet is discarded");
1076                        else
1077                            sb.append("This may cause problems");
1078                        log.warn(sb);
1079                    }
1080                    if(discard_incompatible_packets)
1081                        return;
1082                }
1083
1084                flags=dis.readByte();
1085                is_message_list=(flags & LIST) == LIST;
1086                multicast=(flags & MULTICAST) == MULTICAST;
1087
1088                if(is_message_list)
1089                    msgs=readMessageList(dis, dest, multicast);
1090                else {
1091                    msg=readMessage(dis, dest, sender, multicast);
1092                    msgs=new LinkedList<Message>();
1093                    msgs.add(msg);
1094                }
1095            }
1096
1097            Address src;
1098            for(Iterator it=msgs.iterator(); it.hasNext();) {
1099                msg=(Message)it.next();
1100                src=msg.getSrc();
1101                if(loopback) {
1102                    if(multicast && src != null && local_addr.equals(src)) { // discard own loopback multicast packets
1103
it.remove();
1104                    }
1105                }
1106                else
1107                    handleIncomingMessage(msg);
1108            }
1109            if(incoming_msg_queue != null && !msgs.isEmpty())
1110                incoming_msg_queue.addAll(msgs);
1111        }
1112        catch(Throwable JavaDoc t) {
1113            if(log.isErrorEnabled())
1114                log.error("failed unmarshalling message", t);
1115        }
1116    }
1117
1118
1119
1120    private void handleIncomingMessage(Message msg) {
1121        Event evt;
1122        TpHeader hdr;
1123
1124        if(stats) {
1125            num_msgs_received++;
1126            num_bytes_received+=msg.getLength();
1127        }
1128
1129        evt=new Event(Event.MSG, msg);
1130        if(log.isTraceEnabled()) {
1131            StringBuffer JavaDoc sb=new StringBuffer JavaDoc("message is ").append(msg).append(", headers are ").append(msg.printHeaders());
1132            log.trace(sb);
1133        }
1134
1135        hdr=(TpHeader)msg.getHeader(name); // replaced removeHeader() with getHeader()
1136
if(hdr != null) {
1137
1138            /* Discard all messages destined for a channel with a different name */
1139            String JavaDoc ch_name=hdr.channel_name;
1140
1141            // Discard if message's group name is not the same as our group name unless the
1142
// message is a diagnosis message (special group name DIAG_GROUP)
1143
if(channel_name != null && !channel_name.equals(ch_name)) {
1144                if(log.isWarnEnabled())
1145                    log.warn(new StringBuffer JavaDoc("discarded message from different group \"").append(ch_name).
1146                            append("\" (our group is \"").append(channel_name).append("\"). Sender was ").append(msg.getSrc()));
1147                return;
1148            }
1149        }
1150        else {
1151            if(log.isTraceEnabled())
1152                log.trace(new StringBuffer JavaDoc("message does not have a transport header, msg is ").append(msg).
1153                          append(", headers are ").append(msg.printHeaders()).append(", will be discarded"));
1154            return;
1155        }
1156        up_prot.up(evt);
1157    }
1158
1159
1160    /** Internal method to serialize and send a message. This method is not reentrant */
1161    private void send(Message msg, Address dest, boolean multicast) throws Exception JavaDoc {
1162
1163        // bundle only regular messages; send OOB messages directly
1164
if(enable_bundling && !msg.isFlagSet(Message.OOB)) {
1165            bundler.send(msg, dest);
1166            return;
1167        }
1168
1169        ExposedByteArrayOutputStream out_stream=null;
1170        ExposedDataOutputStream dos=null;
1171        Buffer buf;
1172        out_stream=new ExposedByteArrayOutputStream(INITIAL_BUFSIZE);
1173        dos=new ExposedDataOutputStream(out_stream);
1174        writeMessage(msg, dos, multicast);
1175        buf=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size());
1176        doSend(buf, dest, multicast);
1177    }
1178
1179
1180    private void doSend(Buffer buf, Address dest, boolean multicast) throws Exception JavaDoc {
1181        if(stats) {
1182            num_msgs_sent++;
1183            num_bytes_sent+=buf.getLength();
1184        }
1185        if(multicast) {
1186            sendToAllMembers(buf.getBuf(), buf.getOffset(), buf.getLength());
1187        }
1188        else {
1189            sendToSingleMember(dest, buf.getBuf(), buf.getOffset(), buf.getLength());
1190        }
1191    }
1192
1193
1194
1195    /**
1196     * This method needs to be synchronized on out_stream when it is called
1197     * @param msg
1198     * @return
1199     * @throws java.io.IOException
1200     */

1201    private static void writeMessage(Message msg, DataOutputStream JavaDoc dos, boolean multicast) throws Exception JavaDoc {
1202        byte flags=0;
1203        dos.writeShort(Version.version); // write the version
1204
if(multicast)
1205            flags+=MULTICAST;
1206        if(msg.isFlagSet(Message.OOB))
1207            flags+=OOB;
1208        dos.writeByte(flags);
1209        msg.writeTo(dos);
1210    }
1211
1212    private Message readMessage(DataInputStream JavaDoc instream, Address dest, Address sender, boolean multicast) throws Exception JavaDoc {
1213        Message msg=new Message(false); // don't create headers, readFrom() will do this
1214
msg.readFrom(instream);
1215        postUnmarshalling(msg, dest, sender, multicast); // allows for optimization by subclass
1216
return msg;
1217    }
1218
1219
1220
1221    private static void writeMessageList(List JavaDoc<Message> msgs, DataOutputStream JavaDoc dos, boolean multicast) throws Exception JavaDoc {
1222        Address src;
1223        byte flags=0;
1224        int len=msgs != null? msgs.size() : 0;
1225        boolean src_written=false;
1226
1227        dos.writeShort(Version.version);
1228        flags+=LIST;
1229        if(multicast)
1230            flags+=MULTICAST;
1231        dos.writeByte(flags);
1232        dos.writeInt(len);
1233        if(msgs != null) {
1234            for(Message msg: msgs) {
1235                src=msg.getSrc();
1236                if(!src_written) {
1237                    Util.writeAddress(src, dos);
1238                    src_written=true;
1239                }
1240                msg.writeTo(dos);
1241            }
1242        }
1243    }
1244
1245    private List JavaDoc<Message> readMessageList(DataInputStream JavaDoc instream, Address dest, boolean multicast) throws Exception JavaDoc {
1246        List JavaDoc<Message> list=new LinkedList<Message>();
1247        int len;
1248        Message msg;
1249        Address src;
1250
1251        len=instream.readInt();
1252        src=Util.readAddress(instream);
1253        for(int i=0; i < len; i++) {
1254            msg=new Message(false); // don't create headers, readFrom() will do this
1255
msg.readFrom(instream);
1256            postUnmarshallingList(msg, dest, multicast);
1257            msg.setSrc(src);
1258            list.add(msg);
1259        }
1260        return list;
1261    }
1262
1263
1264
1265    protected Object JavaDoc handleDownEvent(Event evt) {
1266        switch(evt.getType()) {
1267
1268        case Event.TMP_VIEW:
1269        case Event.VIEW_CHANGE:
1270            synchronized(members) {
1271                view=(View)evt.getArg();
1272                members.clear();
1273                Vector<Address> tmpvec=view.getMembers();
1274                members.addAll(tmpvec);
1275            }
1276            break;
1277
1278        case Event.CONNECT:
1279            channel_name=(String JavaDoc)evt.getArg();
1280            header=new TpHeader(channel_name);
1281            setThreadNames();
1282            return null;
1283
1284        case Event.DISCONNECT:
1285            unsetThreadNames();
1286            break;
1287
1288        case Event.CONFIG:
1289            if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
1290            handleConfigEvent((HashMap)evt.getArg());
1291            break;
1292        }
1293        return null;
1294    }
1295
1296
1297
1298
1299    protected void setThreadNames() {
1300        if(thread_naming_pattern != null){
1301            if(incoming_packet_handler != null)
1302                thread_naming_pattern.renameThread(IncomingPacketHandler.THREAD_NAME, incoming_packet_handler.getThread());
1303            if(incoming_msg_handler != null)
1304                thread_naming_pattern.renameThread(IncomingMessageHandler.THREAD_NAME, incoming_msg_handler.getThread());
1305            if(diag_handler != null)
1306                thread_naming_pattern.renameThread(DiagnosticsHandler.THREAD_NAME, diag_handler.getThread());
1307        }
1308    }
1309
1310
1311    protected void unsetThreadNames() {
1312         if(incoming_packet_handler != null && incoming_packet_handler.getThread() != null)
1313             incoming_packet_handler.getThread().setName(IncomingPacketHandler.THREAD_NAME);
1314         if(incoming_msg_handler != null && incoming_msg_handler.getThread() != null)
1315             incoming_msg_handler.getThread().setName(IncomingMessageHandler.THREAD_NAME);
1316         if(diag_handler != null && diag_handler.getThread() != null)
1317             diag_handler.getThread().setName(DiagnosticsHandler.THREAD_NAME);
1318    }
1319    
1320    protected void handleConfigEvent(HashMap map) {
1321        if(map == null) return;
1322        if(map.containsKey("additional_data")) {
1323            additional_data=(byte[])map.get("additional_data");
1324            if(local_addr instanceof IpAddress)
1325                ((IpAddress)local_addr).setAdditionalData(additional_data);
1326        }
1327    }
1328
1329
1330    protected Executor createThreadPool(int min_threads, int max_threads, long keep_alive_time, String JavaDoc rejection_policy,
1331                                        BlockingQueue<Runnable JavaDoc> queue, final String JavaDoc thread_group_name, final String JavaDoc thread_name) {
1332        ThreadPoolExecutor pool=null;
1333        pool=new ThreadPoolExecutor(min_threads, max_threads, keep_alive_time, TimeUnit.MILLISECONDS, queue);
1334
1335        pool.setThreadFactory(new ThreadFactory() {
1336            ThreadGroup JavaDoc unmarshaller_threads=new ThreadGroup JavaDoc(pool_thread_group, thread_group_name);
1337            public Thread JavaDoc newThread(Runnable JavaDoc command) {
1338                Thread JavaDoc retval=new Thread JavaDoc(unmarshaller_threads, command, thread_name);
1339                if(thread_naming_pattern != null)
1340                    thread_naming_pattern.renameThread(thread_name, retval);
1341                return retval;
1342            }
1343        });
1344
1345        if(rejection_policy != null) {
1346            if(rejection_policy.equals("abort"))
1347                pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
1348            else if(rejection_policy.equals("discard"))
1349                pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
1350            else if(rejection_policy.equals("discardoldest"))
1351                pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
1352            else
1353                pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
1354        }
1355
1356        return pool;
1357    }
1358
1359
1360    private static void shutdownThreadPool(ThreadPoolExecutor thread_pool) {
1361        thread_pool.shutdownNow();
1362        try {
1363            thread_pool.awaitTermination(Global.THREADPOOL_SHUTDOWN_WAIT_TIME, TimeUnit.MILLISECONDS);
1364        }
1365        catch(InterruptedException JavaDoc e) {
1366        }
1367    }
1368
1369
1370    /* ----------------------------- End of Private Methods ---------------------------------------- */
1371
1372
1373
1374    /* ----------------------------- Inner Classes ---------------------------------------- */
1375
1376    class IncomingPacket implements Runnable JavaDoc {
1377        Address dest=null;
1378        Address sender=null;
1379        byte[] buf;
1380        int offset, length;
1381
1382        IncomingPacket(Address dest, Address sender, byte[] buf, int offset, int length) {
1383            this.dest=dest;
1384            this.sender=sender;
1385            this.buf=buf;
1386            this.offset=offset;
1387            this.length=length;
1388        }
1389        
1390
1391        /** Code copied from handleIncomingPacket */
1392        public void run() {
1393            short version=0;
1394            boolean is_message_list, multicast;
1395            byte flags;
1396            ExposedByteArrayInputStream in_stream=null;
1397            DataInputStream JavaDoc dis=null;
1398
1399            try {
1400                in_stream=new ExposedByteArrayInputStream(buf, offset, length);
1401                dis=new DataInputStream JavaDoc(in_stream);
1402                try {
1403                    version=dis.readShort();
1404                }
1405                catch(IOException JavaDoc ex) {
1406                    if(discard_incompatible_packets)
1407                        return;
1408                    throw ex;
1409                }
1410                if(Version.isBinaryCompatible(version) == false) {
1411                    if(log.isWarnEnabled()) {
1412                        StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
1413                        sb.append("packet from ").append(sender).append(" has different version (").append(Version.print(version));
1414                        sb.append(") from ours (").append(Version.printVersion()).append("). ");
1415                        if(discard_incompatible_packets)
1416                            sb.append("Packet is discarded");
1417                        else
1418                            sb.append("This may cause problems");
1419                        log.warn(sb);
1420                    }
1421                    if(discard_incompatible_packets)
1422                        return;
1423                }
1424
1425                flags=dis.readByte();
1426                is_message_list=(flags & LIST) == LIST;
1427                multicast=(flags & MULTICAST) == MULTICAST;
1428
1429                if(is_message_list) { // used if message bundling is enabled
1430
List JavaDoc<Message> msgs=readMessageList(dis, dest, multicast);
1431                    for(Message msg: msgs) {
1432                        if(msg.isFlagSet(Message.OOB)) {
1433                            log.warn("bundled message should not be marked as OOB");
1434                        }
1435                        handleMyMessage(msg, multicast);
1436                    }
1437                }
1438                else {
1439                    Message msg=readMessage(dis, dest, sender, multicast);
1440                    handleMyMessage(msg, multicast);
1441                }
1442            }
1443            catch(Throwable JavaDoc t) {
1444                if(log.isErrorEnabled())
1445                    log.error("failed handling incoming message", t);
1446            }
1447        }
1448
1449
1450        private void handleMyMessage(Message msg, boolean multicast) {
1451            if(stats) {
1452                num_msgs_received++;
1453                num_bytes_received+=msg.getLength();
1454            }
1455
1456            Address SRC=msg.getSrc();
1457            if(loopback && multicast && src != null && local_addr.equals(src)) {
1458                return; // drop message that was already looped back and delivered
1459
}
1460
1461            TpHeader hdr=(TpHeader)msg.getHeader(name); // replaced removeHeader() with getHeader()
1462
if(hdr != null) {
1463                String JavaDoc ch_name=hdr.channel_name;
1464
1465                // Discard if message's group name is not the same as our group name
1466
if(channel_name != null && !channel_name.equals(ch_name)) {
1467                    if(log.isWarnEnabled())
1468                        log.warn(new StringBuffer JavaDoc("discarded message from different group \"").append(ch_name).
1469                                append("\" (our group is \"").append(channel_name).append("\"). Sender was ").append(msg.getSrc()));
1470                    return;
1471                }
1472            }
1473            else {
1474                if(log.isTraceEnabled())
1475                    log.trace(new StringBuffer JavaDoc("message does not have a transport header, msg is ").append(msg).
1476                            append(", headers are ").append(msg.printHeaders()).append(", will be discarded"));
1477                return;
1478            }
1479
1480            Event evt=new Event(Event.MSG, msg);
1481            if(log.isTraceEnabled()) {
1482                StringBuffer JavaDoc sb=new StringBuffer JavaDoc("message is ").append(msg).append(", headers are ").append(msg.printHeaders());
1483                log.trace(sb);
1484            }
1485
1486            up_prot.up(evt);
1487        }
1488
1489
1490    }
1491
1492
1493
1494
1495
1496    /**
1497     * This thread fetches byte buffers from the packet_queue, converts them into messages and passes them up
1498     * to the higher layer (done in handleIncomingUdpPacket()).
1499     */

1500    class IncomingPacketHandler implements Runnable JavaDoc {
1501        
1502        public static final String JavaDoc THREAD_NAME="IncomingPacketHandler";
1503        Thread JavaDoc t=null;
1504
1505        Thread JavaDoc getThread(){
1506            return t;
1507        }
1508
1509        void start() {
1510            if(t == null || !t.isAlive()) {
1511                t=new Thread JavaDoc(Util.getGlobalThreadGroup(), this, THREAD_NAME);
1512                t.setDaemon(true);
1513                t.start();
1514            }
1515        }
1516
1517        void stop() {
1518            incoming_packet_queue.close(true); // should terminate the packet_handler thread too
1519
if(t != null) {
1520                try {
1521                    t.join(Global.THREAD_SHUTDOWN_WAIT_TIME);
1522                }
1523                catch(InterruptedException JavaDoc e) {
1524                    Thread.currentThread().interrupt(); // set interrupt flag again
1525
}
1526            }
1527        }
1528
1529        public void run() {
1530            IncomingPacket entry;
1531            while(!incoming_packet_queue.closed() && Thread.currentThread().equals(t)) {
1532                try {
1533                    entry=(IncomingPacket)incoming_packet_queue.remove();
1534                    handleIncomingPacket(entry.dest, entry.sender, entry.buf, entry.offset, entry.length);
1535                }
1536                catch(QueueClosedException closed_ex) {
1537                    break;
1538                }
1539                catch(Throwable JavaDoc ex) {
1540                    if(log.isErrorEnabled())
1541                        log.error("error processing incoming packet", ex);
1542                }
1543            }
1544            if(log.isTraceEnabled()) log.trace("incoming packet handler terminating");
1545        }
1546    }
1547
1548
1549    class IncomingMessageHandler implements Runnable JavaDoc {
1550        
1551        public static final String JavaDoc THREAD_NAME = "IncomingMessageHandler";
1552        Thread JavaDoc t;
1553
1554        Thread JavaDoc getThread(){
1555            return t;
1556        }
1557
1558        public void start() {
1559            if(t == null || !t.isAlive()) {
1560                t=new Thread JavaDoc(Util.getGlobalThreadGroup(), this, THREAD_NAME);
1561                t.setDaemon(true);
1562                t.start();
1563            }
1564        }
1565
1566
1567        public void stop() {
1568            incoming_msg_queue.close(true);
1569            if(t != null) {
1570                try {
1571                    t.join(Global.THREAD_SHUTDOWN_WAIT_TIME);
1572                }
1573                catch(InterruptedException JavaDoc e) {
1574                    Thread.currentThread().interrupt(); // set interrupt flag again
1575
}
1576            }
1577        }
1578
1579        public void run() {
1580            Message msg;
1581            while(!incoming_msg_queue.closed() && Thread.currentThread().equals(t)) {
1582                try {
1583                    msg=(Message)incoming_msg_queue.remove();
1584                    handleIncomingMessage(msg);
1585                }
1586                catch(QueueClosedException closed_ex) {
1587                    break;
1588                }
1589                catch(Throwable JavaDoc ex) {
1590                    if(log.isErrorEnabled())
1591                        log.error("error processing incoming message", ex);
1592                }
1593            }
1594            if(log.isTraceEnabled()) log.trace("incoming message handler terminating");
1595        }
1596    }
1597
1598
1599
1600
1601    private class Bundler {
1602        static final int MIN_NUMBER_OF_BUNDLING_TASKS=2;
1603        /** HashMap<Address, List<Message>>. Keys are destinations, values are lists of Messages */
1604        final Map<Address,List JavaDoc<Message>> msgs=new HashMap<Address,List JavaDoc<Message>>(36);
1605        @GuardedBy("lock")
1606        long count=0; // current number of bytes accumulated
1607
int num_msgs=0;
1608        @GuardedBy("lock")
1609        int num_bundling_tasks=0;
1610        long last_bundle_time;
1611        final ReentrantLock JavaDoc lock=new ReentrantLock JavaDoc();
1612
1613
1614        private void send(Message msg, Address dest) throws Exception JavaDoc {
1615            long length=msg.size();
1616            checkLength(length);
1617            Map<Address,List JavaDoc<Message>> bundled_msgs=null;
1618
1619            lock.lock();
1620            try {
1621                if(count + length >= max_bundle_size) {
1622                    bundled_msgs=removeBundledMessages();
1623                }
1624                addMessage(msg, dest);
1625                count+=length;
1626                if(num_bundling_tasks < MIN_NUMBER_OF_BUNDLING_TASKS) {
1627                    num_bundling_tasks++;
1628                    timer.schedule(new BundlingTimer(), max_bundle_timeout, TimeUnit.MILLISECONDS);
1629                }
1630            }
1631            finally {
1632                lock.unlock();
1633            }
1634
1635            if(bundled_msgs != null) {
1636                sendBundledMessages(bundled_msgs);
1637            }
1638        }
1639
1640        /** Run with lock acquired */
1641        private void addMessage(Message msg, Address dest) { // no sync needed, always called with lock held
1642
if(msgs.isEmpty())
1643                last_bundle_time=System.currentTimeMillis();
1644            List JavaDoc<Message> tmp=msgs.get(dest);
1645            if(tmp == null) {
1646                tmp=new LinkedList<Message>();
1647                msgs.put(dest, tmp);
1648            }
1649            tmp.add(msg);
1650            num_msgs++;
1651        }
1652
1653
1654        /** Must always be called with lock held */
1655        private Map<Address,List JavaDoc<Message>> removeBundledMessages() {
1656            if(msgs.isEmpty())
1657                return null;
1658            Map<Address,List JavaDoc<Message>> copy=new HashMap<Address,List JavaDoc<Message>>(msgs);
1659            if(log.isTraceEnabled()) {
1660                long stop=System.currentTimeMillis();
1661                double percentage=100.0 / max_bundle_size * count;
1662                StringBuilder JavaDoc sb=new StringBuilder JavaDoc("sending ").append(num_msgs).append(" msgs (");
1663                num_msgs=0;
1664                sb.append(count).append(" bytes (" + f.format(percentage) + "% of max_bundle_size)");
1665                if(last_bundle_time > 0) {
1666                    sb.append(", collected in ").append(stop-last_bundle_time).append("ms) ");
1667                }
1668                sb.append(" to ").append(copy.size()).append(" destination(s)");
1669                if(copy.size() > 1) sb.append(" (dests=").append(copy.keySet()).append(")");
1670                log.trace(sb);
1671            }
1672            msgs.clear();
1673            count=0;
1674            return copy;
1675        }
1676
1677
1678        /**
1679         * Sends all messages from the map, all messages for the same destination are bundled into 1 message.
1680         * This method may be called by timer and bundler concurrently
1681         * @param msgs
1682         */

1683        private void sendBundledMessages(Map<Address,List JavaDoc<Message>> msgs) {
1684            boolean multicast;
1685            Buffer buffer;
1686            Map.Entry<Address,List JavaDoc<Message>> entry;
1687            Address dst;
1688            ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(INITIAL_BUFSIZE);
1689            ExposedDataOutputStream dos=new ExposedDataOutputStream(out_stream);
1690            boolean first=true;
1691
1692            for(Iterator<Map.Entry<Address,List JavaDoc<Message>>> it=msgs.entrySet().iterator(); it.hasNext();) {
1693                entry=it.next();
1694                List JavaDoc<Message> list=entry.getValue();
1695                if(list.isEmpty())
1696                    continue;
1697                dst=entry.getKey();
1698                multicast=dst == null || dst.isMulticastAddress();
1699                try {
1700                    if(first) {
1701                        first=false;
1702                    }
1703                    else {
1704                        out_stream.reset();
1705                        dos.reset();
1706                    }
1707                    writeMessageList(list, dos, multicast); // flushes output stream when done
1708
buffer=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size());
1709                    doSend(buffer, dst, multicast);
1710                }
1711                catch(Throwable JavaDoc e) {
1712                    if(log.isErrorEnabled()) log.error("exception sending msg: " + e.toString(), e.getCause());
1713                }
1714            }
1715        }
1716
1717
1718
1719        private void checkLength(long len) throws Exception JavaDoc {
1720            if(len > max_bundle_size)
1721                throw new Exception JavaDoc("message size (" + len + ") is greater than max bundling size (" + max_bundle_size +
1722                        "). Set the fragmentation/bundle size in FRAG and TP correctly");
1723        }
1724
1725
1726        private class BundlingTimer implements Runnable JavaDoc {
1727
1728            public void run() {
1729                Map<Address, List JavaDoc<Message>> msgs=null;
1730                boolean unlocked=false;
1731
1732                lock.lock();
1733                try {
1734                    msgs=removeBundledMessages();
1735                    if(msgs != null) {
1736                        lock.unlock();
1737                        unlocked=true;
1738                        sendBundledMessages(msgs);
1739                    }
1740                }
1741                finally {
1742                    if(unlocked)
1743                        lock.lock();
1744                    num_bundling_tasks--;
1745                    lock.unlock();
1746                }
1747            }
1748        }
1749    }
1750
1751
1752    private class DiagnosticsHandler implements Runnable JavaDoc {
1753        public static final String JavaDoc THREAD_NAME = "DiagnosticsHandler";
1754        Thread JavaDoc thread=null;
1755        MulticastSocket diag_sock=null;
1756
1757        DiagnosticsHandler() {
1758        }
1759
1760        Thread JavaDoc getThread(){
1761            return thread;
1762        }
1763
1764        void start() throws IOException JavaDoc {
1765            diag_sock=new MulticastSocket(diagnostics_port);
1766            java.util.List JavaDoc interfaces=Util.getAllAvailableInterfaces();
1767            bindToInterfaces(interfaces, diag_sock);
1768
1769            if(thread == null || !thread.isAlive()) {
1770                thread=new Thread JavaDoc(Util.getGlobalThreadGroup(), this, THREAD_NAME);
1771                thread.setDaemon(true);
1772                thread.start();
1773            }
1774        }
1775
1776        void stop() {
1777            if(diag_sock != null)
1778                diag_sock.close();
1779            if(thread != null){
1780                try{
1781                    thread.join(Global.THREAD_SHUTDOWN_WAIT_TIME);
1782                }
1783                catch(InterruptedException JavaDoc e){
1784                    Thread.currentThread().interrupt(); // set interrupt flag
1785
}
1786            }
1787        }
1788
1789        public void run() {
1790            byte[] buf=new byte[1500]; // MTU on most LANs
1791
DatagramPacket packet;
1792            while(!diag_sock.isClosed() && Thread.currentThread().equals(thread)) {
1793                packet=new DatagramPacket(buf, 0, buf.length);
1794                try {
1795                    diag_sock.receive(packet);
1796                    handleDiagnosticProbe(packet.getSocketAddress(), diag_sock,
1797                                          new String JavaDoc(packet.getData(), packet.getOffset(), packet.getLength()));
1798                }
1799                catch(IOException JavaDoc e) {
1800                }
1801            }
1802        }
1803
1804        private void bindToInterfaces(java.util.List JavaDoc interfaces, MulticastSocket s) {
1805            SocketAddress group_addr=new InetSocketAddress(diagnostics_addr, diagnostics_port);
1806            for(Iterator it=interfaces.iterator(); it.hasNext();) {
1807                NetworkInterface i=(NetworkInterface)it.next();
1808                try {
1809                    if (i.getInetAddresses().hasMoreElements()) { // fix for VM crash - suggested by JJalenak@netopia.com
1810
s.joinGroup(group_addr, i);
1811                        if(log.isTraceEnabled())
1812                            log.trace("joined " + group_addr + " on " + i.getName());
1813                    }
1814                }
1815                catch(IOException JavaDoc e) {
1816                    log.warn("failed to join " + group_addr + " on " + i.getName() + ": " + e);
1817                }
1818            }
1819        }
1820    }
1821
1822    public class ThreadNamingPattern {
1823        final boolean includeClusterName;
1824        final boolean includeLocalAddress;
1825
1826        public ThreadNamingPattern(String JavaDoc pattern) {
1827            includeClusterName=pattern.contains("c");
1828            includeLocalAddress=pattern.contains("l");
1829        }
1830
1831        public boolean isIncludeLocalAddress() {
1832            return includeLocalAddress;
1833        }
1834
1835        public boolean isIncludeClusterName() {
1836            return includeClusterName;
1837        }
1838
1839
1840        protected String JavaDoc renameThread(String JavaDoc base_name, Thread JavaDoc runner) {
1841            String JavaDoc oldName = null;
1842            if(runner!=null){
1843                oldName=runner.getName();
1844    
1845                StringBuilder JavaDoc threadName=new StringBuilder JavaDoc();
1846                threadName.append(base_name);
1847
1848                if(isIncludeClusterName()) {
1849                    if(threadName.length() > 0)
1850                        threadName.append(',');
1851                    threadName.append(getChannelName());
1852                }
1853                if(isIncludeLocalAddress()) {
1854                    if(threadName.length() > 0)
1855                        threadName.append(',');
1856                    threadName.append(getLocalAddress());
1857                }
1858
1859                runner.setName(threadName.toString());
1860            }
1861            return oldName;
1862        }
1863        
1864    }
1865
1866
1867}
1868
Popular Tags