KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > UDP


1 // $Id: UDP.java,v 1.76 2005/04/23 20:39:17 belaban Exp $
2

3 package org.jgroups.protocols;
4
5
6 import org.jgroups.*;
7 import org.jgroups.stack.IpAddress;
8 import org.jgroups.stack.Protocol;
9 import org.jgroups.util.List;
10 import org.jgroups.util.*;
11 import org.jgroups.util.Queue;
12
13 import java.io.*;
14 import java.net.*;
15 import java.util.*;
16
17
18
19
20 /**
21  * IP multicast transport based on UDP. Messages to the group (msg.dest == null) will
22  * be multicast (to all group members), whereas point-to-point messages
23  * (msg.dest != null) will be unicast to a single member. Uses a multicast and
24  * a unicast socket.<p>
25  * The following properties are being read by the UDP protocol<p>
26  * param mcast_addr - the multicast address to use default is 228.8.8.8<br>
27  * param mcast_port - (int) the port that the multicast is sent on default is 7600<br>
28  * param ip_mcast - (boolean) flag whether to use IP multicast - default is true<br>
29  * param ip_ttl - Set the default time-to-live for multicast packets sent out on this
30  * socket. default is 32<br>
31  * param use_packet_handler - If set, the mcast and ucast receiver threads just put
32  * the datagram's payload (a byte buffer) into a queue, from where a separate thread
33  * will dequeue and handle them (unmarshal and pass up). This frees the receiver
34  * threads from having to do message unmarshalling; this time can now be spent
35  * receiving packets. If you have lots of retransmissions because of network
36  * input buffer overflow, consider setting this property to true (default is false).
37  * @author Bela Ban
38  */

39 public class UDP extends Protocol implements Runnable JavaDoc {
40
41     /** Socket used for
42      * <ol>
43      * <li>sending unicast packets and
44      * <li>receiving unicast packets
45      * </ol>
46      * The address of this socket will be our local address (<tt>local_addr</tt>) */

47     DatagramSocket sock=null;
48
49     /**
50      * BoundedList<Integer> of the last 100 ports used. This is to avoid reusing a port for DatagramSocket
51      */

52     private static BoundedList last_ports_used=null;
53
54     /** Maintain a list of local ports opened by DatagramSocket. If this is 0, this option is turned off.
55      * If bind_port is null, then this options will be ignored */

56     int num_last_ports=100;
57
58     /** IP multicast socket for <em>receiving</em> multicast packets */
59     MulticastSocket mcast_recv_sock=null;
60
61     /** IP multicast socket for <em>sending</em> multicast packets */
62     MulticastSocket mcast_send_sock=null;
63
64     /** The address (host and port) of this member */
65     IpAddress local_addr=null;
66
67     /** The name of the group to which this member is connected */
68     String JavaDoc channel_name=null;
69
70     UdpHeader udp_hdr=null;
71
72     /** The multicast address (mcast address and port) this member uses */
73     IpAddress mcast_addr=null;
74
75     /** The interface (NIC) to which the unicast and multicast sockets bind */
76     InetAddress bind_addr=null;
77
78     /** Bind the receiver multicast socket to all available interfaces (requires JDK 1.4) */
79     boolean bind_to_all_interfaces=false;
80
81     /** The port to which the unicast receiver socket binds.
82      * 0 means to bind to any (ephemeral) port */

83     int bind_port=0;
84     int port_range=1; // 27-6-2003 bgooren, Only try one port by default
85

86     /** The multicast address used for sending and receiving packets */
87     String JavaDoc mcast_addr_name="228.8.8.8";
88
89     /** The multicast port used for sending and receiving packets */
90     int mcast_port=7600;
91
92     /** The multicast receiver thread */
93     Thread JavaDoc mcast_receiver=null;
94
95     /** The unicast receiver thread */
96     UcastReceiver ucast_receiver=null;
97
98     /** Whether to enable IP multicasting. If false, multiple unicast datagram
99      * packets are sent rather than one multicast packet */

100     boolean ip_mcast=true;
101
102     /** The time-to-live (TTL) for multicast datagram packets */
103     int ip_ttl=64;
104
105     /** The members of this group (updated when a member joins or leaves) */
106     final Vector members=new Vector(11);
107
108     /** Pre-allocated byte stream. Used for serializing datagram packets. Will grow as needed */
109     final ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(1024);
110
111     /** Send buffer size of the multicast datagram socket */
112     int mcast_send_buf_size=32000;
113
114     /** Receive buffer size of the multicast datagram socket */
115     int mcast_recv_buf_size=64000;
116
117     /** Send buffer size of the unicast datagram socket */
118     int ucast_send_buf_size=32000;
119
120     /** Receive buffer size of the unicast datagram socket */
121     int ucast_recv_buf_size=64000;
122
123     /** If true, messages sent to self are treated specially: unicast messages are
124      * looped back immediately, multicast messages get a local copy first and -
125      * when the real copy arrives - it will be discarded. Useful for Window
126      * media (non)sense */

127     boolean loopback=true;
128
129
130     /** Discard packets with a different version. Usually minor version differences are okay. Setting this property
131      * to true means that we expect the exact same version on all incoming packets */

132     boolean discard_incompatible_packets=false;
133
134     /** Sometimes receivers are overloaded (they have to handle de-serialization etc).
135      * Packet handler is a separate thread taking care of de-serialization, receiver
136      * thread(s) simply put packet in queue and return immediately. Setting this to
137      * true adds one more thread */

138     boolean use_incoming_packet_handler=false;
139
140     /** Used by packet handler to store incoming DatagramPackets */
141     Queue incoming_queue=null;
142
143     /** Dequeues DatagramPackets from packet_queue, unmarshalls them and
144      * calls <tt>handleIncomingUdpPacket()</tt> */

145     IncomingPacketHandler incoming_packet_handler=null;
146
147     /** Packets to be sent are stored in outgoing_queue and sent by a separate thread. Enabling this
148      * value uses an additional thread */

149     boolean use_outgoing_packet_handler=false;
150
151     /** Used by packet handler to store outgoing DatagramPackets */
152     Queue outgoing_queue=null;
153
154     OutgoingPacketHandler outgoing_packet_handler=null;
155
156     /** If set it will be added to <tt>local_addr</tt>. Used to implement
157      * for example transport independent addresses */

158     byte[] additional_data=null;
159
160     /** Maximum number of bytes for messages to be queued until they are sent. This value needs to be smaller
161         than the largest UDP datagram packet size */

162     int max_bundle_size=AUTOCONF.senseMaxFragSizeStatic();
163
164     /** Max number of milliseconds until queued messages are sent. Messages are sent when max_bundle_size or
165      * max_bundle_timeout has been exceeded (whichever occurs faster)
166      */

167     long max_bundle_timeout=20;
168
169     /** Enabled bundling of smaller messages into bigger ones */
170     boolean enable_bundling=false;
171
172     /** Used by BundlingOutgoingPacketHandler */
173     TimeScheduler timer=null;
174
175     /** HashMap<Address, Address>. Keys=senders, values=destinations. For each incoming message M with sender S, adds
176      * an entry with key=S and value= sender's IP address and port.
177      */

178     HashMap addr_translation_table=new HashMap();
179
180     boolean use_addr_translation=false;
181
182     /** The name of this protocol */
183     static final String JavaDoc name="UDP";
184
185     static final String JavaDoc IGNORE_BIND_ADDRESS_PROPERTY="ignore.bind.address";
186
187
188     final int VERSION_LENGTH=Version.getLength();
189
190
191
192
193     /**
194      * public constructor. creates the UDP protocol, and initializes the
195      * state variables, does however not start any sockets or threads
196      */

197     public UDP() {
198         ;
199     }
200
201     /**
202      * debug only
203      */

204     public String JavaDoc toString() {
205         return "Protocol UDP(local address: " + local_addr + ')';
206     }
207
208
209     BoundedList getLastPortsUsed() {
210         if(last_ports_used == null)
211             last_ports_used=new BoundedList(num_last_ports);
212         return last_ports_used;
213     }
214
215     /* ----------------------- Receiving of MCAST UDP packets ------------------------ */
216
217     public void run() {
218         DatagramPacket packet;
219         byte receive_buf[]=new byte[65535];
220         int len, sender_port;
221         byte[] tmp, data;
222         InetAddress sender_addr;
223
224         // moved out of loop to avoid excessive object creations (bela March 8 2001)
225
packet=new DatagramPacket(receive_buf, receive_buf.length);
226
227         while(mcast_receiver != null && mcast_recv_sock != null) {
228             try {
229                 packet.setData(receive_buf, 0, receive_buf.length);
230                 mcast_recv_sock.receive(packet);
231                 sender_addr=packet.getAddress();
232                 sender_port=packet.getPort();
233                 len=packet.getLength();
234                 data=packet.getData();
235
236                 if(len == 4) { // received a diagnostics probe
237
if(data[0] == 'd' && data[1] == 'i' && data[2] == 'a' && data[3] == 'g') {
238                         handleDiagnosticProbe(sender_addr, sender_port);
239                         continue;
240                     }
241                 }
242
243                 if(log.isTraceEnabled()){
244                     StringBuffer JavaDoc sb=new StringBuffer JavaDoc("received (mcast) ");
245                     sb.append(len).append(" bytes from ").append(sender_addr).append(':');
246                     sb.append(sender_port).append(" (size=").append(len).append(" bytes)");
247                     log.trace(sb.toString());
248                 }
249                 if(len > receive_buf.length) {
250                     if(log.isErrorEnabled()) log.error("size of the received packet (" + len + ") is bigger than " +
251                                              "allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " +
252                                              "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length);
253                 }
254
255                 if(Version.compareTo(data) == false) {
256                     if(log.isWarnEnabled()) {
257                         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
258                         sb.append("packet from ").append(sender_addr).append(':').append(sender_port);
259                         sb.append(" has different version (").append(Version.printVersionId(data, Version.version_id.length));
260                         sb.append(") from ours (").append(Version.printVersionId(Version.version_id)).append("). ");
261                         if(discard_incompatible_packets)
262                             sb.append("Packet is discarded");
263                         else
264                             sb.append("This may cause problems");
265                         log.warn(sb.toString());
266                     }
267                     if(discard_incompatible_packets)
268                         continue;
269                 }
270
271                 if(use_incoming_packet_handler) {
272                     tmp=new byte[len];
273                     System.arraycopy(data, 0, tmp, 0, len);
274                     incoming_queue.add(new IncomingQueueEntry(mcast_addr, sender_addr, sender_port, tmp));
275                 }
276                 else
277                     handleIncomingUdpPacket(mcast_addr, sender_addr, sender_port, data);
278             }
279             catch(SocketException sock_ex) {
280                  if(log.isTraceEnabled()) log.trace("multicast socket is closed, exception=" + sock_ex);
281                 break;
282             }
283             catch(InterruptedIOException io_ex) { // thread was interrupted
284
; // go back to top of loop, where we will terminate loop
285
}
286             catch(Throwable JavaDoc ex) {
287                 if(log.isErrorEnabled())
288                     log.error("failure in multicast receive()", ex);
289                 Util.sleep(100); // so we don't get into 100% cpu spinning (should NEVER happen !)
290
}
291         }
292         if(log.isDebugEnabled()) log.debug("multicast thread terminated");
293     }
294
295 // private void printPacket(DatagramPacket packet) {
296
// StringBuffer sb=new StringBuffer();
297
// sb.append(packet.getAddress()).append(":").append(packet.getPort());
298
// System.out.println("packet: " + sb.toString());
299
// }
300

301     void handleDiagnosticProbe(InetAddress sender, int port) {
302         try {
303             byte[] diag_rsp=getDiagResponse().getBytes();
304             DatagramPacket rsp=new DatagramPacket(diag_rsp, 0, diag_rsp.length, sender, port);
305             if(log.isDebugEnabled()) log.debug("sending diag response to " + sender + ':' + port);
306             sock.send(rsp);
307         }
308         catch(Throwable JavaDoc t) {
309             if(log.isErrorEnabled()) log.error("failed sending diag rsp to " + sender + ':' + port +
310                                                        ", exception=" + t);
311         }
312     }
313
314     String JavaDoc getDiagResponse() {
315         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
316         sb.append(local_addr).append(" (").append(channel_name).append(')');
317         sb.append(" [").append(mcast_addr_name).append(':').append(mcast_port).append("]\n");
318         sb.append("Version=").append(Version.version).append(", cvs=\"").append(Version.cvs).append("\"\n");
319         sb.append("bound to ").append(bind_addr).append(':').append(bind_port).append('\n');
320         sb.append("members: ").append(members).append('\n');
321
322         return sb.toString();
323     }
324
325     /* ------------------------------------------------------------------------------- */
326
327
328
329     /*------------------------------ Protocol interface ------------------------------ */
330
331     public String JavaDoc getName() {
332         return name;
333     }
334
335
336     public void init() throws Exception JavaDoc {
337         if(use_incoming_packet_handler) {
338             incoming_queue=new Queue();
339             incoming_packet_handler=new IncomingPacketHandler();
340         }
341         if(use_outgoing_packet_handler) {
342             outgoing_queue=new Queue();
343             if(enable_bundling) {
344                 timer=stack != null? stack.timer : null;
345                 if(timer == null)
346                     throw new Exception JavaDoc("timer could not be retrieved");
347                 outgoing_packet_handler=new BundlingOutgoingPacketHandler();
348             }
349             else
350                 outgoing_packet_handler=new OutgoingPacketHandler();
351         }
352     }
353
354
355     /**
356      * Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads
357      */

358     public void start() throws Exception JavaDoc {
359         if(log.isDebugEnabled()) log.debug("creating sockets and starting threads");
360         createSockets();
361         passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
362         startThreads();
363     }
364
365
366     public void stop() {
367         if(log.isDebugEnabled()) log.debug("closing sockets and stopping threads");
368         stopThreads(); // will close sockets, closeSockets() is not really needed anymore, but...
369
closeSockets(); // ... we'll leave it in there for now (doesn't do anything if already closed)
370
}
371
372
373
374     /**
375      * Setup the Protocol instance acording to the configuration string
376      * The following properties are being read by the UDP protocol
377      * param mcast_addr - the multicast address to use default is 228.8.8.8
378      * param mcast_port - (int) the port that the multicast is sent on default is 7600
379      * param ip_mcast - (boolean) flag whether to use IP multicast - default is true
380      * param ip_ttl - Set the default time-to-live for multicast packets sent out on this socket. default is 32
381      * @return true if no other properties are left.
382      * false if the properties still have data in them, ie ,
383      * properties are left over and not handled by the protocol stack
384      *
385      */

386     public boolean setProperties(Properties props) {
387         String JavaDoc str;
388         String JavaDoc tmp = null;
389
390         super.setProperties(props);
391         
392         // PropertyPermission not granted if running in an untrusted environment with JNLP.
393
try {
394             tmp=System.getProperty("bind.address");
395             if(Boolean.getBoolean(IGNORE_BIND_ADDRESS_PROPERTY)) {
396                 tmp=null;
397             }
398         }
399         catch (SecurityException JavaDoc ex){
400         }
401         
402         if(tmp != null)
403             str=tmp;
404         else
405             str=props.getProperty("bind_addr");
406         if(str != null) {
407             try {
408                 bind_addr=InetAddress.getByName(str);
409             }
410             catch(UnknownHostException unknown) {
411                 if(log.isFatalEnabled()) log.fatal("(bind_addr): host " + str + " not known");
412                 return false;
413             }
414             props.remove("bind_addr");
415         }
416
417         str=props.getProperty("bind_to_all_interfaces");
418         if(str != null) {
419             bind_to_all_interfaces=new Boolean JavaDoc(str).booleanValue();
420             props.remove("bind_to_all_interfaces");
421         }
422
423         str=props.getProperty("bind_port");
424         if(str != null) {
425             bind_port=Integer.parseInt(str);
426             props.remove("bind_port");
427         }
428
429         str=props.getProperty("num_last_ports");
430         if(str != null) {
431             num_last_ports=Integer.parseInt(str);
432             props.remove("num_last_ports");
433         }
434
435         str=props.getProperty("start_port");
436         if(str != null) {
437             bind_port=Integer.parseInt(str);
438             props.remove("start_port");
439         }
440
441         str=props.getProperty("port_range");
442         if(str != null) {
443             port_range=Integer.parseInt(str);
444             props.remove("port_range");
445         }
446
447         str=props.getProperty("mcast_addr");
448         if(str != null) {
449             mcast_addr_name=str;
450             props.remove("mcast_addr");
451         }
452
453         str=props.getProperty("mcast_port");
454         if(str != null) {
455             mcast_port=Integer.parseInt(str);
456             props.remove("mcast_port");
457         }
458
459         str=props.getProperty("ip_mcast");
460         if(str != null) {
461             ip_mcast=Boolean.valueOf(str).booleanValue();
462             props.remove("ip_mcast");
463         }
464
465         str=props.getProperty("ip_ttl");
466         if(str != null) {
467             ip_ttl=Integer.parseInt(str);
468             props.remove("ip_ttl");
469         }
470
471         str=props.getProperty("mcast_send_buf_size");
472         if(str != null) {
473             mcast_send_buf_size=Integer.parseInt(str);
474             props.remove("mcast_send_buf_size");
475         }
476
477         str=props.getProperty("mcast_recv_buf_size");
478         if(str != null) {
479             mcast_recv_buf_size=Integer.parseInt(str);
480             props.remove("mcast_recv_buf_size");
481         }
482
483         str=props.getProperty("ucast_send_buf_size");
484         if(str != null) {
485             ucast_send_buf_size=Integer.parseInt(str);
486             props.remove("ucast_send_buf_size");
487         }
488
489         str=props.getProperty("ucast_recv_buf_size");
490         if(str != null) {
491             ucast_recv_buf_size=Integer.parseInt(str);
492             props.remove("ucast_recv_buf_size");
493         }
494
495         str=props.getProperty("loopback");
496         if(str != null) {
497             loopback=Boolean.valueOf(str).booleanValue();
498             props.remove("loopback");
499         }
500
501         str=props.getProperty("discard_incompatibe_packets");
502         if(str != null) {
503             discard_incompatible_packets=Boolean.valueOf(str).booleanValue();
504             props.remove("discard_incompatibe_packets");
505         }
506
507         // this is deprecated, just left for compatibility (use use_incoming_packet_handler)
508
str=props.getProperty("use_packet_handler");
509         if(str != null) {
510             use_incoming_packet_handler=Boolean.valueOf(str).booleanValue();
511             props.remove("use_packet_handler");
512             if(log.isWarnEnabled()) log.warn("'use_packet_handler' is deprecated; use 'use_incoming_packet_handler' instead");
513         }
514
515         str=props.getProperty("use_incoming_packet_handler");
516         if(str != null) {
517             use_incoming_packet_handler=Boolean.valueOf(str).booleanValue();
518             props.remove("use_incoming_packet_handler");
519         }
520
521         str=props.getProperty("use_outgoing_packet_handler");
522         if(str != null) {
523             use_outgoing_packet_handler=Boolean.valueOf(str).booleanValue();
524             props.remove("use_outgoing_packet_handler");
525         }
526
527         str=props.getProperty("max_bundle_size");
528         if(str != null) {
529             int bundle_size=Integer.parseInt(str);
530             if(bundle_size > max_bundle_size) {
531                 if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size +
532                         ") is greater than largest UDP fragmentation size (" + max_bundle_size + ')');
533                 return false;
534             }
535             if(bundle_size <= 0) {
536                 if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size + ") is <= 0");
537                 return false;
538             }
539             max_bundle_size=bundle_size;
540             props.remove("max_bundle_size");
541         }
542
543         str=props.getProperty("max_bundle_timeout");
544         if(str != null) {
545             max_bundle_timeout=Long.parseLong(str);
546             if(max_bundle_timeout <= 0) {
547                 if(log.isErrorEnabled()) log.error("max_bundle_timeout of " + max_bundle_timeout + " is invalid");
548                 return false;
549             }
550             props.remove("max_bundle_timeout");
551         }
552
553         str=props.getProperty("enable_bundling");
554         if(str != null) {
555             enable_bundling=Boolean.valueOf(str).booleanValue();
556             props.remove("enable_bundling");
557         }
558
559         str=props.getProperty("use_addr_translation");
560         if(str != null) {
561             use_addr_translation=Boolean.valueOf(str).booleanValue();
562             props.remove("use_addr_translation");
563         }
564
565         if(props.size() > 0) {
566             System.err.println("UDP.setProperties(): the following properties are not recognized:");
567             props.list(System.out);
568             return false;
569         }
570
571         if(enable_bundling) {
572             if(use_outgoing_packet_handler == false)
573                 if(log.isWarnEnabled()) log.warn("enable_bundling is true; setting use_outgoing_packet_handler=true");
574             use_outgoing_packet_handler=true;
575         }
576
577         return true;
578     }
579
580
581
582     /**
583      * DON'T REMOVE ! This prevents the up-handler thread to be created, which essentially is superfluous:
584      * messages are received from the network rather than from a layer below.
585      */

586     public void startUpHandler() {
587         ;
588     }
589
590     /**
591      * handle the UP event.
592      * @param evt - the event being send from the stack
593      */

594     public void up(Event evt) {
595
596         switch(evt.getType()) {
597
598             case Event.CONFIG:
599                 passUp(evt);
600                  if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
601                 handleConfigEvent((HashMap)evt.getArg());
602                 return;
603         }
604
605         passUp(evt);
606     }
607
608     /**
609      * Caller by the layer above this layer. Usually we just put this Message
610      * into the send queue and let one or more worker threads handle it. A worker thread
611      * then removes the Message from the send queue, performs a conversion and adds the
612      * modified Message to the send queue of the layer below it, by calling Down).
613      */

614     public void down(Event evt) {
615         Message msg;
616         Object JavaDoc dest_addr;
617
618         if(evt.getType() != Event.MSG) { // unless it is a message handle it and respond
619
handleDownEvent(evt);
620             return;
621         }
622
623         msg=(Message)evt.getArg();
624
625         if(channel_name != null) {
626             // added patch by Roland Kurmann (March 20 2003)
627
// msg.putHeader(name, new UdpHeader(channel_name));
628
msg.putHeader(name, udp_hdr);
629         }
630
631         dest_addr=msg.getDest();
632
633         // Because we don't call Protocol.passDown(), we notify the observer directly (e.g. PerfObserver).
634
// This way, we still have performance numbers for UDP
635
if(observer != null)
636             observer.passDown(evt);
637
638         if(dest_addr == null) { // 'null' means send to all group members
639
if(ip_mcast) {
640                 if(mcast_addr == null) {
641                     if(log.isErrorEnabled()) log.error("dest address of message is null, and " +
642                                               "sending to default address fails as mcast_addr is null, too !" +
643                                               " Discarding message " + Util.printEvent(evt));
644                     return;
645                 }
646                 // if we want to use IP multicast, then set the destination of the message
647
msg.setDest(mcast_addr);
648             }
649             else {
650                 //sends a separate UDP message to each address
651
sendMultipleUdpMessages(msg, members);
652                 return;
653             }
654         }
655
656         try {
657             sendUdpMessage(msg);
658         }
659         catch(Exception JavaDoc e) {
660             if(log.isErrorEnabled()) log.error("exception=" + e + ", msg=" + msg + ", mcast_addr=" + mcast_addr);
661         }
662     }
663
664
665
666
667
668
669     /*--------------------------- End of Protocol interface -------------------------- */
670
671
672     /* ------------------------------ Private Methods -------------------------------- */
673
674
675
676     /**
677      * If the sender is null, set our own address. We cannot just go ahead and set the address
678      * anyway, as we might be sending a message on behalf of someone else ! E.gin case of
679      * retransmission, when the original sender has crashed, or in a FLUSH protocol when we
680      * have to return all unstable messages with the FLUSH_OK response.
681      */

682     void setSourceAddress(Message msg) {
683         if(msg.getSrc() == null)
684             msg.setSrc(local_addr);
685     }
686
687
688
689
690     /**
691      * Processes a packet read from either the multicast or unicast socket. Needs to be synchronized because
692      * mcast or unicast socket reads can be concurrent.
693      * Correction (bela April 19 2005): we acces no instance variables, all vars are allocated on the stack, so
694      * this method should be reentrant: removed 'synchronized' keyword
695      */

696     void handleIncomingUdpPacket(IpAddress dest, InetAddress sender, int port, byte[] data) {
697         ByteArrayInputStream inp_stream=null;
698         DataInputStream inp=null;
699         Message msg=null;
700         List l; // used if bundling is enabled
701

702         try {
703             // skip the first n bytes (default: 4), this is the version info
704
inp_stream=new ByteArrayInputStream(data, VERSION_LENGTH, data.length - VERSION_LENGTH);
705             inp=new DataInputStream(inp_stream);
706
707             if(enable_bundling) {
708                 l=bufferToList(inp, dest, sender, port);
709                 for(Enumeration en=l.elements(); en.hasMoreElements();) {
710                     msg=(Message)en.nextElement();
711                     try {
712                         handleMessage(msg);
713                     }
714                     catch(Throwable JavaDoc t) {
715                         if(log.isErrorEnabled()) log.error("failure: " + t.toString());
716                     }
717                 }
718             }
719             else {
720                 msg=bufferToMessage(inp, dest, sender, port);
721                 handleMessage(msg);
722             }
723         }
724         catch(Throwable JavaDoc e) {
725             if(log.isErrorEnabled()) log.error("exception in processing incoming packet", e);
726         }
727         finally {
728             Util.closeInputStream(inp);
729             Util.closeInputStream(inp_stream);
730         }
731     }
732
733
734
735     void handleMessage(Message msg) {
736         Event evt;
737         UdpHeader hdr;
738         Address dst=msg.getDest();
739
740         if(dst == null)
741             dst=mcast_addr;
742
743         // discard my own multicast loopback copy
744
if(loopback) {
745             Address SRC=msg.getSrc();
746             if((dst == null || (dst != null && dst.isMulticastAddress())) && src != null && local_addr.equals(src)) {
747                 if(log.isTraceEnabled())
748                     log.trace("discarded own loopback multicast packet");
749                 return;
750             }
751         }
752
753         evt=new Event(Event.MSG, msg);
754         if(log.isTraceEnabled()) {
755             StringBuffer JavaDoc sb=new StringBuffer JavaDoc("message is ");
756             sb.append(msg).append(", headers are ").append(msg.getHeaders());
757             log.trace(sb.toString());
758         }
759
760         /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
761         * This allows e.g. PerfObserver to get the time of reception of a message */

762         if(observer != null)
763             observer.up(evt, up_queue.size());
764
765         hdr=(UdpHeader)msg.getHeader(name); // replaced removeHeader() with getHeader()
766
if(hdr != null) {
767
768             /* Discard all messages destined for a channel with a different name */
769             String JavaDoc ch_name=hdr.channel_name;
770
771             // Discard if message's group name is not the same as our group name unless the
772
// message is a diagnosis message (special group name DIAG_GROUP)
773
if(ch_name != null && channel_name != null && !channel_name.equals(ch_name) &&
774                     !ch_name.equals(Util.DIAG_GROUP)) {
775                 if(log.isWarnEnabled()) log.warn("discarded message from different group (" +
776                                                  ch_name + "). Sender was " + msg.getSrc());
777                 return;
778             }
779         }
780         else {
781             if(log.isErrorEnabled()) log.error("message does not have a UDP header");
782         }
783         passUp(evt);
784     }
785
786
787     void sendUdpMessage(Message msg) throws Exception JavaDoc {
788         sendUdpMessage(msg, false);
789     }
790
791     /** Send a message to the address specified in dest */
792     void sendUdpMessage(Message msg, boolean copyForOutgoingQueue) throws Exception JavaDoc {
793         IpAddress dest;
794         Message copy;
795         Event evt;
796
797         dest=(IpAddress)msg.getDest(); // guaranteed to be non-null
798
setSourceAddress(msg);
799
800         if(log.isTraceEnabled()) {
801             StringBuffer JavaDoc sb=new StringBuffer JavaDoc("sending msg to ");
802             sb.append(msg.getDest()).append(" (src=").append(msg.getSrc()).append("), headers are ").append(msg.getHeaders());
803             log.trace(sb.toString());
804         }
805
806         // Don't send if destination is local address. Instead, switch dst and src and put in up_queue.
807
// If multicast message, loopback a copy directly to us (but still multicast). Once we receive this,
808
// we will discard our own multicast message
809
if(loopback && (dest.equals(local_addr) || dest.isMulticastAddress())) {
810             copy=msg.copy();
811             // copy.removeHeader(name); // we don't remove the header
812
copy.setSrc(local_addr);
813             // copy.setDest(dest);
814
evt=new Event(Event.MSG, copy);
815
816             /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
817                This allows e.g. PerfObserver to get the time of reception of a message */

818             if(observer != null)
819                 observer.up(evt, up_queue.size());
820             if(log.isTraceEnabled()) log.trace("looped back local message " + copy);
821             passUp(evt);
822             if(dest != null && !dest.isMulticastAddress())
823                 return;
824         }
825
826         if(use_outgoing_packet_handler) {
827             if(copyForOutgoingQueue)
828                 outgoing_queue.add(msg.copy());
829             else
830                 outgoing_queue.add(msg);
831             return;
832         }
833
834         send(msg);
835     }
836
837
838     /** Internal method to serialize and send a message. This method is not reentrant */
839     void send(Message msg) throws Exception JavaDoc {
840         Buffer buf;
841         IpAddress dest=(IpAddress)msg.getDest(); // guaranteed to be non-null
842
IpAddress SRC=(IpAddress)msg.getSrc();
843
844         synchronized(out_stream) {
845             buf=messageToBuffer(msg, dest, src);
846             doSend(buf, dest.getIpAddress(), dest.getPort());
847         }
848     }
849
850
851
852     void doSend(Buffer buf, InetAddress dest, int port) throws IOException {
853         DatagramPacket packet;
854
855         // packet=new DatagramPacket(data, data.length, dest, port);
856
packet=new DatagramPacket(buf.getBuf(), buf.getOffset(), buf.getLength(), dest, port);
857         if(dest.isMulticastAddress() && mcast_send_sock != null) { // mcast_recv_sock might be null if ip_mcast is false
858
mcast_send_sock.send(packet);
859         }
860         else {
861             if(sock != null)
862                 sock.send(packet);
863         }
864     }
865
866
867
868     void sendMultipleUdpMessages(Message msg, Vector dests) {
869         Address dest;
870
871         for(int i=0; i < dests.size(); i++) {
872             dest=(Address)dests.elementAt(i);
873             msg.setDest(dest);
874
875             try {
876                 sendUdpMessage(msg,
877                                true); // copy for outgoing queue if outgoing queue handler is enabled
878
}
879             catch(Exception JavaDoc e) {
880                 if(log.isDebugEnabled()) log.debug("exception=" + e);
881             }
882         }
883     }
884
885
886     /**
887      * This method needs to be synchronized on out_stream when it is called
888      * @param msg
889      * @param dest
890      * @param src
891      * @return
892      * @throws IOException
893      */

894     Buffer messageToBuffer(Message msg, IpAddress dest, IpAddress src) throws IOException {
895         Buffer retval;
896         DataOutputStream out=null;
897
898         out_stream.reset();
899         out_stream.write(Version.version_id, 0, Version.version_id.length); // write the version
900
try {
901             out=new DataOutputStream(out_stream);
902             nullAddresses(msg, dest, src);
903             msg.writeTo(out);
904             revertAddresses(msg, dest, src);
905             out.flush();
906             retval=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size());
907             return retval;
908         }
909         finally {
910             Util.closeOutputStream(out);
911         }
912     }
913
914
915     void nullAddresses(Message msg, IpAddress dest, IpAddress src) {
916         msg.setDest(null);
917         if(!dest.isMulticastAddress()) { // unicast
918
if(src != null) {
919                 msg.setSrc(new IpAddress(src.getPort(), false)); // null the host part, leave the port
920
if(src.getAdditionalData() != null)
921                     ((IpAddress)msg.getSrc()).setAdditionalData(src.getAdditionalData());
922             }
923             else {
924                 msg.setSrc(null);
925             }
926         }
927         else { // multicast
928
if(src != null) {
929                 msg.setSrc(new IpAddress(src.getPort(), false));
930                 if(src.getAdditionalData() != null)
931                     ((IpAddress)msg.getSrc()).setAdditionalData(src.getAdditionalData());
932             }
933         }
934     }
935
936     void revertAddresses(Message msg, IpAddress dest, IpAddress src) {
937         msg.setDest(dest);
938         msg.setSrc(src);
939     }
940
941
942     Message bufferToMessage(DataInputStream instream, IpAddress dest, InetAddress sender, int port)
943             throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
944         Message msg=new Message();
945         msg.readFrom(instream);
946         setAddresses(msg, dest, sender, port);
947         return msg;
948     }
949
950
951     void setAddresses(Message msg, IpAddress dest, InetAddress sender, int port) {
952         // set the destination address
953
if(msg.getDest() == null && dest != null)
954             msg.setDest(dest);
955
956         // set the source address if not set
957
IpAddress src_addr=(IpAddress)msg.getSrc();
958         if(src_addr == null) {
959             try {msg.setSrc(new IpAddress(sender, port));} catch(Throwable JavaDoc t) {}
960         }
961         else {
962             byte[] tmp_additional_data=src_addr.getAdditionalData();
963             if(src_addr.getIpAddress() == null) {
964                 try {msg.setSrc(new IpAddress(sender, src_addr.getPort()));} catch(Throwable JavaDoc t) {}
965             }
966             if(tmp_additional_data != null)
967                 ((IpAddress)msg.getSrc()).setAdditionalData(tmp_additional_data);
968         }
969     }
970
971     Buffer listToBuffer(List l, IpAddress dest) throws IOException {
972         Buffer retval=null;
973         IpAddress src;
974         Message msg;
975         int len=l != null? l.size() : 0;
976         DataOutputStream out=null;
977         out_stream.reset();
978         out_stream.write(Version.version_id, 0, Version.version_id.length); // write the version
979
try {
980             out=new DataOutputStream(out_stream);
981             out.writeInt(len);
982             for(Enumeration en=l.elements(); en.hasMoreElements();) {
983                 msg=(Message)en.nextElement();
984                 src=(IpAddress)msg.getSrc();
985                 nullAddresses(msg, dest, src);
986                 msg.writeTo(out);
987                 revertAddresses(msg, dest, src);
988             }
989             out.flush();
990             retval=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size());
991             return retval;
992         }
993         finally {
994             Util.closeOutputStream(out);
995         }
996     }
997
998
999
1000    List bufferToList(DataInputStream instream, IpAddress dest, InetAddress sender, int port)
1001            throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
1002        List l=new List();
1003        DataInputStream in=null;
1004        int len;
1005        Message msg;
1006
1007        try {
1008            len=instream.readInt();
1009            for(int i=0; i < len; i++) {
1010                msg=new Message();
1011                msg.readFrom(instream);
1012                setAddresses(msg, dest, sender, port);
1013                l.add(msg);
1014            }
1015            return l;
1016        }
1017        finally {
1018            Util.closeInputStream(in);
1019        }
1020    }
1021
1022
1023
1024    /**
1025     * Create UDP sender and receiver sockets. Currently there are 2 sockets
1026     * (sending and receiving). This is due to Linux's non-BSD compatibility
1027     * in the JDK port (see DESIGN).
1028     */

1029    void createSockets() throws Exception JavaDoc {
1030        InetAddress tmp_addr=null;
1031
1032        // bind_addr not set, try to assign one by default. This is needed on Windows
1033

1034        // changed by bela Feb 12 2003: by default multicast sockets will be bound to all network interfaces
1035

1036        // CHANGED *BACK* by bela March 13 2003: binding to all interfaces did not result in a correct
1037
// local_addr. As a matter of fact, comparison between e.g. 0.0.0.0:1234 (on hostA) and
1038
// 0.0.0.0:1.2.3.4 (on hostB) would fail !
1039
if(bind_addr == null) {
1040            InetAddress[] interfaces=InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
1041            if(interfaces != null && interfaces.length > 0)
1042                bind_addr=interfaces[0];
1043        }
1044        if(bind_addr == null)
1045            bind_addr=InetAddress.getLocalHost();
1046
1047        if(bind_addr != null)
1048            if(log.isInfoEnabled()) log.info("sockets will use interface " + bind_addr.getHostAddress());
1049
1050
1051        // 2. Create socket for receiving unicast UDP packets. The address and port
1052
// of this socket will be our local address (local_addr)
1053
if(bind_port > 0) {
1054            sock=createDatagramSocketWithBindPort();
1055        }
1056        else {
1057            sock=createEphemeralDatagramSocket();
1058        }
1059
1060        if(sock == null)
1061            throw new Exception JavaDoc("UDP.createSocket(): sock is null");
1062
1063        local_addr=new IpAddress(sock.getLocalAddress(), sock.getLocalPort());
1064        if(additional_data != null)
1065            local_addr.setAdditionalData(additional_data);
1066
1067
1068        // 3. Create socket for receiving IP multicast packets
1069
if(ip_mcast) {
1070            // 3a. Create mcast receiver socket
1071
mcast_recv_sock=new MulticastSocket(mcast_port);
1072            mcast_recv_sock.setTimeToLive(ip_ttl);
1073            tmp_addr=InetAddress.getByName(mcast_addr_name);
1074            mcast_addr=new IpAddress(tmp_addr, mcast_port);
1075
1076            if(bind_to_all_interfaces && Util.getJavaVersion() >= 14) {
1077                bindToAllInterfaces(mcast_recv_sock, mcast_addr.getIpAddress());
1078            }
1079            else {
1080                if(bind_addr != null)
1081                    mcast_recv_sock.setInterface(bind_addr);
1082                 mcast_recv_sock.joinGroup(tmp_addr);
1083            }
1084
1085            // 3b. Create mcast sender socket
1086
mcast_send_sock=new MulticastSocket();
1087            mcast_send_sock.setTimeToLive(ip_ttl);
1088            if(bind_addr != null)
1089                mcast_send_sock.setInterface(bind_addr);
1090            // mcast_send_sock.setTrafficClass(0x08); // high throughput; should investigate when baseline is JDK 1.4
1091
}
1092
1093        setBufferSizes();
1094        if(log.isInfoEnabled()) log.info("socket information:\n" + dumpSocketInfo());
1095    }
1096
1097
1098    private void bindToAllInterfaces(MulticastSocket s, InetAddress mcastAddr) throws IOException {
1099        SocketAddress tmp_mcast_addr=new InetSocketAddress(mcastAddr, mcast_port);
1100        Enumeration en=NetworkInterface.getNetworkInterfaces();
1101        while(en.hasMoreElements()) {
1102            NetworkInterface i=(NetworkInterface)en.nextElement();
1103            for(Enumeration en2=i.getInetAddresses(); en2.hasMoreElements();) {
1104                InetAddress addr=(InetAddress)en2.nextElement();
1105                // if(addr.isLoopbackAddress())
1106
// continue;
1107
s.joinGroup(tmp_mcast_addr, i);
1108                if(log.isTraceEnabled())
1109                    log.trace("joined " + tmp_mcast_addr + " on interface " + i.getName() + " (" + addr + ")");
1110                break;
1111            }
1112        }
1113    }
1114
1115
1116    /** Creates a DatagramSocket with a random port. Because in certain operating systems, ports are reused,
1117     * we keep a list of the n last used ports, and avoid port reuse */

1118    DatagramSocket createEphemeralDatagramSocket() throws SocketException {
1119        DatagramSocket tmp=null;
1120        int localPort=0;
1121        while(true) {
1122            tmp=new DatagramSocket(localPort, bind_addr); // first time localPort is 0
1123
if(num_last_ports <= 0)
1124                break;
1125            localPort=tmp.getLocalPort();
1126            if(getLastPortsUsed().contains(new Integer JavaDoc(localPort))) {
1127                if(log.isDebugEnabled())
1128                    log.debug("local port " + localPort + " already seen in this session; will try to get other port");
1129                try {tmp.close();} catch(Throwable JavaDoc e) {}
1130                localPort++;
1131                continue;
1132            }
1133            else {
1134                getLastPortsUsed().add(new Integer JavaDoc(localPort));
1135                break;
1136            }
1137        }
1138        return tmp;
1139    }
1140
1141
1142
1143
1144    /**
1145     * Creates a DatagramSocket when bind_port > 0. Attempts to allocate the socket with port == bind_port, and
1146     * increments until it finds a valid port, or until port_range has been exceeded
1147     * @return DatagramSocket The newly created socket
1148     * @throws Exception
1149     */

1150    DatagramSocket createDatagramSocketWithBindPort() throws Exception JavaDoc {
1151        DatagramSocket tmp=null;
1152        // 27-6-2003 bgooren, find available port in range (start_port, start_port+port_range)
1153
int rcv_port=bind_port, max_port=bind_port + port_range;
1154        while(rcv_port <= max_port) {
1155            try {
1156                tmp=new DatagramSocket(rcv_port, bind_addr);
1157                break;
1158            }
1159            catch(SocketException bind_ex) { // Cannot listen on this port
1160
rcv_port++;
1161            }
1162            catch(SecurityException JavaDoc sec_ex) { // Not allowed to listen on this port
1163
rcv_port++;
1164            }
1165
1166            // Cannot listen at all, throw an Exception
1167
if(rcv_port >= max_port + 1) { // +1 due to the increment above
1168
throw new Exception JavaDoc("UDP.createSockets(): cannot list on any port in range " +
1169                        bind_port + '-' + (bind_port + port_range));
1170            }
1171        }
1172        return tmp;
1173    }
1174
1175
1176    String JavaDoc dumpSocketInfo() throws Exception JavaDoc {
1177        StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
1178        sb.append("local_addr=").append(local_addr);
1179        sb.append(", mcast_addr=").append(mcast_addr);
1180        sb.append(", bind_addr=").append(bind_addr);
1181        sb.append(", ttl=").append(ip_ttl);
1182
1183        if(sock != null) {
1184            sb.append("\nsock: bound to ");
1185            sb.append(sock.getLocalAddress().getHostAddress()).append(':').append(sock.getLocalPort());
1186            sb.append(", receive buffer size=").append(sock.getReceiveBufferSize());
1187            sb.append(", send buffer size=").append(sock.getSendBufferSize());
1188        }
1189
1190        if(mcast_recv_sock != null) {
1191            sb.append("\nmcast_recv_sock: bound to ");
1192            sb.append(mcast_recv_sock.getInterface().getHostAddress()).append(':').append(mcast_recv_sock.getLocalPort());
1193            sb.append(", send buffer size=").append(mcast_recv_sock.getSendBufferSize());
1194            sb.append(", receive buffer size=").append(mcast_recv_sock.getReceiveBufferSize());
1195        }
1196
1197         if(mcast_send_sock != null) {
1198            sb.append("\nmcast_send_sock: bound to ");
1199            sb.append(mcast_send_sock.getInterface().getHostAddress()).append(':').append(mcast_send_sock.getLocalPort());
1200            sb.append(", send buffer size=").append(mcast_send_sock.getSendBufferSize());
1201            sb.append(", receive buffer size=").append(mcast_send_sock.getReceiveBufferSize());
1202        }
1203        return sb.toString();
1204    }
1205
1206
1207    void setBufferSizes() {
1208        if(sock != null) {
1209            try {
1210                sock.setSendBufferSize(ucast_send_buf_size);
1211            }
1212            catch(Throwable JavaDoc ex) {
1213                if(log.isWarnEnabled()) log.warn("failed setting ucast_send_buf_size in sock: " + ex);
1214            }
1215            try {
1216                sock.setReceiveBufferSize(ucast_recv_buf_size);
1217            }
1218            catch(Throwable JavaDoc ex) {
1219                if(log.isWarnEnabled()) log.warn("failed setting ucast_recv_buf_size in sock: " + ex);
1220            }
1221        }
1222
1223        if(mcast_recv_sock != null) {
1224            try {
1225                mcast_recv_sock.setSendBufferSize(mcast_send_buf_size);
1226            }
1227            catch(Throwable JavaDoc ex) {
1228                if(log.isWarnEnabled()) log.warn("failed setting mcast_send_buf_size in mcast_recv_sock: " + ex);
1229            }
1230
1231            try {
1232                mcast_recv_sock.setReceiveBufferSize(mcast_recv_buf_size);
1233            }
1234            catch(Throwable JavaDoc ex) {
1235                if(log.isWarnEnabled()) log.warn("failed setting mcast_recv_buf_size in mcast_recv_sock: " + ex);
1236            }
1237        }
1238
1239        if(mcast_send_sock != null) {
1240            try {
1241                mcast_send_sock.setSendBufferSize(mcast_send_buf_size);
1242            }
1243            catch(Throwable JavaDoc ex) {
1244                if(log.isWarnEnabled()) log.warn("failed setting mcast_send_buf_size in mcast_send_sock: " + ex);
1245            }
1246
1247            try {
1248                mcast_send_sock.setReceiveBufferSize(mcast_recv_buf_size);
1249            }
1250            catch(Throwable JavaDoc ex) {
1251                if(log.isWarnEnabled()) log.warn("failed setting mcast_recv_buf_size in mcast_send_sock: " + ex);
1252            }
1253        }
1254
1255    }
1256
1257
1258    /**
1259     * Closed UDP unicast and multicast sockets
1260     */

1261    void closeSockets() {
1262        // 1. Close multicast socket
1263
closeMulticastSocket();
1264
1265        // 2. Close socket
1266
closeSocket();
1267    }
1268
1269
1270    void closeMulticastSocket() {
1271        if(mcast_recv_sock != null) {
1272            try {
1273                if(mcast_addr != null) {
1274                    mcast_recv_sock.leaveGroup(mcast_addr.getIpAddress());
1275                }
1276                mcast_recv_sock.close(); // this will cause the mcast receiver thread to break out of its loop
1277
mcast_recv_sock=null;
1278                if(log.isDebugEnabled()) log.debug("multicast receive socket closed");
1279            }
1280            catch(IOException ex) {
1281            }
1282            mcast_addr=null;
1283        }
1284
1285        if(mcast_send_sock != null) {
1286            mcast_send_sock.close(); // this will cause the mcast receiver thread to break out of its loop
1287
mcast_send_sock=null;
1288            if(log.isDebugEnabled()) log.debug("multicast send socket closed");
1289        }
1290    }
1291
1292
1293    void closeSocket() {
1294        if(sock != null) {
1295            sock.close();
1296            sock=null;
1297            if(log.isDebugEnabled()) log.debug("socket closed");
1298        }
1299    }
1300
1301
1302
1303    /**
1304     * Workaround for the problem encountered in certains JDKs that a thread listening on a socket
1305     * cannot be interrupted. Therefore we just send a dummy datagram packet so that the thread 'wakes up'
1306     * and realizes it has to terminate. Should be removed when all JDKs support Thread.interrupt() on
1307     * reads. Uses sock t send dummy packet, so this socket has to be still alive.
1308     * @param dest The destination host. Will be local host if null
1309     * @param port The destination port
1310     */

1311// private void sendDummyPacket(InetAddress dest, int port) {
1312
// DatagramPacket packet;
1313
// byte[] buf={0};
1314
//
1315
// if(dest == null) {
1316
// try {
1317
// dest=InetAddress.getLocalHost();
1318
// }
1319
// catch(Exception e) {
1320
// }
1321
// }
1322
//
1323
// if(log.isTraceEnabled()) log.trace("sending packet to " + dest + ':' + port);
1324
//
1325
// if(sock == null || dest == null) {
1326
// if(log.isWarnEnabled()) log.warn("sock was null or dest was null, cannot send dummy packet");
1327
// return;
1328
// }
1329
// packet=new DatagramPacket(buf, buf.length, dest, port);
1330
// try {
1331
// sock.send(packet);
1332
// }
1333
// catch(Throwable e) {
1334
// if(log.isErrorEnabled()) log.error("exception sending dummy packet to " + dest + ':' + port + ": " + e);
1335
// }
1336
// }
1337

1338
1339    /**
1340     * Starts the unicast and multicast receiver threads
1341     */

1342    void startThreads() throws Exception JavaDoc {
1343        if(ucast_receiver == null) {
1344            //start the listener thread of the ucast_recv_sock
1345
ucast_receiver=new UcastReceiver();
1346            ucast_receiver.start();
1347             if(log.isDebugEnabled()) log.debug("created unicast receiver thread");
1348        }
1349
1350        if(ip_mcast) {
1351            if(mcast_receiver != null) {
1352                if(mcast_receiver.isAlive()) {
1353                    if(log.isDebugEnabled()) log.debug("did not create new multicastreceiver thread as existing " +
1354                                                       "multicast receiver thread is still running");
1355                }
1356                else
1357                    mcast_receiver=null; // will be created just below...
1358
}
1359
1360            if(mcast_receiver == null) {
1361                mcast_receiver=new Thread JavaDoc(this, "UDP mcast receiver");
1362                mcast_receiver.setPriority(Thread.MAX_PRIORITY); // needed ????
1363
mcast_receiver.setDaemon(true);
1364                mcast_receiver.start();
1365            }
1366        }
1367        if(use_outgoing_packet_handler)
1368            outgoing_packet_handler.start();
1369        if(use_incoming_packet_handler)
1370            incoming_packet_handler.start();
1371    }
1372
1373
1374    /**
1375     * Stops unicast and multicast receiver threads
1376     */

1377    void stopThreads() {
1378        Thread JavaDoc tmp;
1379
1380        // 1. Stop the multicast receiver thread
1381
if(mcast_receiver != null) {
1382            if(mcast_receiver.isAlive()) {
1383                tmp=mcast_receiver;
1384                mcast_receiver=null;
1385                closeMulticastSocket(); // will cause the multicast thread to terminate
1386
tmp.interrupt();
1387                try {
1388                    tmp.join(100);
1389                }
1390                catch(Exception JavaDoc e) {
1391                }
1392                tmp=null;
1393            }
1394            mcast_receiver=null;
1395        }
1396
1397        // 2. Stop the unicast receiver thread
1398
if(ucast_receiver != null) {
1399            ucast_receiver.stop();
1400            ucast_receiver=null;
1401        }
1402
1403        // 3. Stop the in_packet_handler thread
1404
if(incoming_packet_handler != null)
1405            incoming_packet_handler.stop();
1406
1407        // 4. Stop the outgoing packet handler thread
1408
if(outgoing_packet_handler != null)
1409            outgoing_packet_handler.stop();
1410    }
1411
1412
1413    void handleDownEvent(Event evt) {
1414        switch(evt.getType()) {
1415
1416        case Event.TMP_VIEW:
1417        case Event.VIEW_CHANGE:
1418            synchronized(members) {
1419                members.removeAllElements();
1420                Vector tmpvec=((View)evt.getArg()).getMembers();
1421                for(int i=0; i < tmpvec.size(); i++)
1422                    members.addElement(tmpvec.elementAt(i));
1423            }
1424            break;
1425
1426        case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local)
1427
passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
1428            break;
1429
1430        case Event.CONNECT:
1431            channel_name=(String JavaDoc)evt.getArg();
1432            udp_hdr=new UdpHeader(channel_name);
1433
1434            // removed March 18 2003 (bela), not needed (handled by GMS)
1435
// changed July 2 2003 (bela): we discard CONNECT_OK at the GMS level anyway, this might
1436
// be needed if we run without GMS though
1437
passUp(new Event(Event.CONNECT_OK));
1438            break;
1439
1440        case Event.DISCONNECT:
1441            passUp(new Event(Event.DISCONNECT_OK));
1442            break;
1443
1444        case Event.CONFIG:
1445            if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
1446            handleConfigEvent((HashMap)evt.getArg());
1447            break;
1448        }
1449    }
1450
1451
1452    void handleConfigEvent(HashMap map) {
1453        if(map == null) return;
1454        if(map.containsKey("additional_data"))
1455            additional_data=(byte[])map.get("additional_data");
1456        if(map.containsKey("send_buf_size")) {
1457            mcast_send_buf_size=((Integer JavaDoc)map.get("send_buf_size")).intValue();
1458            ucast_send_buf_size=mcast_send_buf_size;
1459        }
1460        if(map.containsKey("recv_buf_size")) {
1461            mcast_recv_buf_size=((Integer JavaDoc)map.get("recv_buf_size")).intValue();
1462            ucast_recv_buf_size=mcast_recv_buf_size;
1463        }
1464        setBufferSizes();
1465    }
1466
1467
1468
1469    /* ----------------------------- End of Private Methods ---------------------------------------- */
1470
1471    /* ----------------------------- Inner Classes ---------------------------------------- */
1472
1473    class IncomingQueueEntry {
1474        IpAddress dest=null;
1475        InetAddress sender=null;
1476        int port=-1;
1477        byte[] buf;
1478
1479        public IncomingQueueEntry(IpAddress dest, InetAddress sender, int port, byte[] buf) {
1480            this.dest=dest;
1481            this.sender=sender;
1482            this.port=port;
1483            this.buf=buf;
1484        }
1485
1486        public IncomingQueueEntry(byte[] buf) {
1487            this.buf=buf;
1488        }
1489    }
1490
1491
1492
1493    public class UcastReceiver implements Runnable JavaDoc {
1494        boolean running=true;
1495        Thread JavaDoc thread=null;
1496
1497
1498        public void start() {
1499            if(thread == null) {
1500                thread=new Thread JavaDoc(this, "UDP.UcastReceiverThread");
1501                thread.setDaemon(true);
1502                running=true;
1503                thread.start();
1504            }
1505        }
1506
1507
1508        public void stop() {
1509            Thread JavaDoc tmp;
1510            if(thread != null && thread.isAlive()) {
1511                running=false;
1512                tmp=thread;
1513                thread=null;
1514                closeSocket(); // this will cause the thread to break out of its loop
1515
tmp.interrupt();
1516                tmp=null;
1517            }
1518            thread=null;
1519        }
1520
1521
1522        public void run() {
1523            DatagramPacket packet;
1524            byte receive_buf[]=new byte[65535];
1525            int len;
1526            byte[] data, tmp;
1527            InetAddress sender_addr;
1528            int sender_port;
1529
1530            // moved out of loop to avoid excessive object creations (bela March 8 2001)
1531
packet=new DatagramPacket(receive_buf, receive_buf.length);
1532
1533            while(running && thread != null && sock != null) {
1534                try {
1535                    packet.setData(receive_buf, 0, receive_buf.length);
1536                    sock.receive(packet);
1537                    sender_addr=packet.getAddress();
1538                    sender_port=packet.getPort();
1539                    len=packet.getLength();
1540                    data=packet.getData();
1541                    if(log.isTraceEnabled())
1542                        log.trace("received (ucast) " + len + " bytes from " + sender_addr + ':' + sender_port);
1543                    if(len > receive_buf.length) {
1544                        if(log.isErrorEnabled())
1545                            log.error("size of the received packet (" + len + ") is bigger than allocated buffer (" +
1546                                      receive_buf.length + "): will not be able to handle packet. " +
1547                                      "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length);
1548                    }
1549
1550                    if(Version.compareTo(data) == false) {
1551                        if(log.isWarnEnabled()) {
1552                            StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
1553                            sb.append("packet from ").append(sender_addr).append(':').append(sender_port);
1554                            sb.append(" has different version (").append(Version.printVersionId(data, Version.version_id.length));
1555                            sb.append(") from ours (").append(Version.printVersionId(Version.version_id)).append("). ");
1556                            if(discard_incompatible_packets)
1557                                sb.append("Packet is discarded");
1558                            else
1559                                sb.append("This may cause problems");
1560                            log.warn(sb.toString());
1561                        }
1562                        if(discard_incompatible_packets)
1563                            continue;
1564                    }
1565
1566                    if(use_incoming_packet_handler) {
1567                        tmp=new byte[len];
1568                        System.arraycopy(data, 0, tmp, 0, len);
1569                        incoming_queue.add(new IncomingQueueEntry(local_addr, sender_addr, sender_port, tmp));
1570                    }
1571                    else
1572                        handleIncomingUdpPacket(local_addr, sender_addr, sender_port, data);
1573                }
1574                catch(SocketException sock_ex) {
1575                    if(log.isDebugEnabled()) log.debug("unicast receiver socket is closed, exception=" + sock_ex);
1576                    break;
1577                }
1578                catch(InterruptedIOException io_ex) { // thread was interrupted
1579
; // go back to top of loop, where we will terminate loop
1580
}
1581                catch(Throwable JavaDoc ex) {
1582                    if(log.isErrorEnabled())
1583                        log.error("[" + local_addr + "] failed receiving unicast packet", ex);
1584                    Util.sleep(100); // so we don't get into 100% cpu spinning (should NEVER happen !)
1585
}
1586            }
1587            if(log.isDebugEnabled()) log.debug("unicast receiver thread terminated");
1588        }
1589    }
1590
1591
1592    /**
1593     * This thread fetches byte buffers from the packet_queue, converts them into messages and passes them up
1594     * to the higher layer (done in handleIncomingUdpPacket()).
1595     */

1596    class IncomingPacketHandler implements Runnable JavaDoc {
1597        Thread JavaDoc t=null;
1598
1599        public void run() {
1600            byte[] data;
1601            IncomingQueueEntry entry;
1602
1603            while(incoming_queue != null && incoming_packet_handler != null) {
1604                try {
1605                    entry=(IncomingQueueEntry)incoming_queue.remove();
1606                    data=entry.buf;
1607                }
1608                catch(QueueClosedException closed_ex) {
1609                    if(log.isDebugEnabled()) log.debug("packet_handler thread terminating");
1610                    break;
1611                }
1612                handleIncomingUdpPacket(entry.dest, entry.sender, entry.port, data);
1613            }
1614        }
1615
1616        void start() {
1617            if(t == null || !t.isAlive()) {
1618                t=new Thread JavaDoc(this, "UDP.IncomingPacketHandler thread");
1619                t.setDaemon(true);
1620                t.start();
1621            }
1622        }
1623
1624        void stop() {
1625            if(incoming_queue != null)
1626                incoming_queue.close(false); // should terminate the packet_handler thread too
1627
t=null;
1628            incoming_queue=null;
1629        }
1630    }
1631
1632
1633    /**
1634     * This thread fetches byte buffers from the outgoing_packet_queue, converts them into messages and sends them
1635     * using the unicast or multicast socket
1636     */

1637    class OutgoingPacketHandler implements Runnable JavaDoc {
1638        Thread JavaDoc t=null;
1639        byte[] buf;
1640        DatagramPacket packet;
1641        IpAddress dest;
1642
1643        public void run() {
1644            Message msg;
1645
1646            while(outgoing_queue != null && outgoing_packet_handler != null) {
1647                try {
1648                    msg=(Message)outgoing_queue.remove();
1649                    handleMessage(msg);
1650                }
1651                catch(QueueClosedException closed_ex) {
1652                    break;
1653                }
1654                catch(Throwable JavaDoc th) {
1655                    if(log.isErrorEnabled()) log.error("exception sending packet", th);
1656                }
1657                msg=null; // let's give the poor garbage collector a hand...
1658
}
1659            if(log.isTraceEnabled()) log.trace("packet_handler thread terminating");
1660        }
1661
1662        protected void handleMessage(Message msg) throws Exception JavaDoc {
1663            send(msg);
1664        }
1665
1666
1667        void start() {
1668            if(t == null || !t.isAlive()) {
1669                t=new Thread JavaDoc(this, "UDP.OutgoingPacketHandler thread");
1670                t.setDaemon(true);
1671                t.start();
1672            }
1673        }
1674
1675        void stop() {
1676            if(outgoing_queue != null)
1677                outgoing_queue.close(false); // should terminate the packet_handler thread too
1678
t=null;
1679            // outgoing_queue=null;
1680
}
1681    }
1682
1683
1684
1685
1686    /**
1687     * Bundles smaller messages into bigger ones. Collects messages in a list until
1688     * messages of a total of <tt>max_bundle_size bytes</tt> have accumulated, or until
1689     * <tt>max_bundle_timeout</tt> milliseconds have elapsed, whichever is first. Messages
1690     * are unbundled at the receiver.
1691     */

1692    class BundlingOutgoingPacketHandler extends OutgoingPacketHandler {
1693        long total_bytes=0;
1694        /** HashMap<Address, List<Message>>. Keys are destinations, values are lists of Messages */
1695        final HashMap msgs=new HashMap(11);
1696
1697
1698        void start() {
1699            super.start();
1700            t.setName("UDP.BundlingOutgoingPacketHandler thread");
1701        }
1702
1703
1704        public void run() {
1705            Message msg=null, leftover=null;
1706            long start=0;
1707            while(outgoing_queue != null) {
1708                try {
1709                    total_bytes=0;
1710                    msg=leftover != null? leftover : (Message)outgoing_queue.remove(); // blocks until message is available
1711
start=System.currentTimeMillis();
1712                    leftover=waitForMessagesToAccumulate(msg, outgoing_queue, max_bundle_size, start, max_bundle_timeout);
1713                    bundleAndSend(start);
1714                }
1715                catch(QueueClosedException closed_ex) {
1716                    break;
1717                }
1718                catch(Throwable JavaDoc th) {
1719                    if(log.isErrorEnabled()) log.error("exception sending packet", th);
1720                }
1721            }
1722            bundleAndSend(start);
1723            if(log.isTraceEnabled()) log.trace("packet_handler thread terminating");
1724        }
1725
1726
1727        /**
1728         * Waits until max_size bytes have accumulated in the queue, or max_time milliseconds have elapsed.
1729         * When a message cannot be added to the ready-to-send bundle, it is returned, so the caller can
1730         * re-submit it again next time.
1731         * @param m
1732         * @param q
1733         * @param max_size
1734         * @param max_time
1735         * @return
1736         */

1737        Message waitForMessagesToAccumulate(Message m, Queue q, long max_size, long start_time, long max_time) {
1738            Message msg, leftover=null;
1739            boolean running=true, size_exceeded=false, time_reached=false;
1740            long len, time_to_wait=max_time, waited_time=0;
1741
1742            while(running) {
1743                try {
1744                    msg=m != null? m : (Message)q.remove(time_to_wait);
1745                    m=null; // necessary, otherwise we get 'm' again in subsequent iterations of the same loop !
1746
len=msg.size();
1747                    checkLength(len);
1748                    waited_time=System.currentTimeMillis() - start_time;
1749                    time_to_wait=max_time - waited_time;
1750                    size_exceeded=total_bytes + len > max_size;
1751                    time_reached=time_to_wait <= 0;
1752
1753                    if(size_exceeded) {
1754                        running=false;
1755                        leftover=msg;
1756                    }
1757                    else {
1758                        addMessage(msg);
1759                        total_bytes+=len;
1760                        if(time_reached)
1761                            running=false;
1762                    }
1763                }
1764                catch(TimeoutException timeout) {
1765                    waited_time=System.currentTimeMillis() - start_time;
1766                    time_reached=true;
1767                    break;
1768                }
1769                catch(QueueClosedException closed) {
1770                    break;
1771                }
1772                catch(Exception JavaDoc ex) {
1773                    log.error("failure in bundling", ex);
1774                }
1775            }
1776// if(log.isTraceEnabled()) {
1777
// StringBuffer sb=new StringBuffer("size_exceeded=").append(size_exceeded).append(", time_reached=");
1778
// sb.append(time_reached).append(", bytes received=").append(total_bytes);
1779
// sb.append(", time waited=").append(waited_time).append(")");
1780
// log.trace(sb.toString());
1781
// }
1782
return leftover;
1783        }
1784
1785
1786        void checkLength(long len) throws Exception JavaDoc {
1787            if(len > max_bundle_size)
1788                throw new Exception JavaDoc("UDP.BundlingOutgoingPacketHandler.handleMessage(): message size (" + len +
1789                                    ") is greater than max bundling size (" + max_bundle_size + "). " +
1790                                    "Set the fragmentation/bundle size in FRAG and UDP correctly");
1791        }
1792
1793
1794        void addMessage(Message msg) {
1795            List tmp;
1796            Address dst=msg.getDest();
1797            synchronized(msgs) {
1798                tmp=(List)msgs.get(dst);
1799                if(tmp == null) {
1800                    tmp=new List();
1801                    msgs.put(dst, tmp);
1802                }
1803                tmp.add(msg);
1804            }
1805        }
1806
1807
1808
1809        private void bundleAndSend(long start_time) {
1810            Map.Entry entry;
1811            IpAddress dst;
1812            Buffer buffer;
1813            InetAddress addr;
1814            int port;
1815            List l;
1816            long stop_time=System.currentTimeMillis();
1817
1818            synchronized(msgs) {
1819                if(msgs.size() == 0)
1820                    return;
1821                if(start_time == 0)
1822                    start_time=System.currentTimeMillis();
1823
1824                if(log.isTraceEnabled()) {
1825                    StringBuffer JavaDoc sb=new StringBuffer JavaDoc("sending ").append(numMsgs(msgs)).append(" msgs (");
1826                    sb.append(total_bytes).append(" bytes, ").append(stop_time-start_time).append("ms)");
1827                    sb.append(" to ").append(msgs.size()).append(" destination(s)");
1828                    if(msgs.size() > 1) sb.append(" (dests=").append(msgs.keySet()).append(")");
1829                    log.trace(sb.toString());
1830                }
1831                for(Iterator it=msgs.entrySet().iterator(); it.hasNext();) {
1832                    entry=(Map.Entry)it.next();
1833                    dst=(IpAddress)entry.getKey();
1834                    addr=dst.getIpAddress();
1835                    port=dst.getPort();
1836                    l=(List)entry.getValue();
1837                    try {
1838                        if(l.size() > 0) {
1839                            synchronized(out_stream) {
1840                                buffer=listToBuffer(l, dst);
1841                                doSend(buffer, addr, port);
1842                            }
1843                        }
1844                    }
1845                    catch(IOException e) {
1846                        if(log.isErrorEnabled()) log.error("exception sending msg (to dest=" + dst + "): " + e);
1847                    }
1848                }
1849                msgs.clear();
1850            }
1851        }
1852
1853        private int numMsgs(HashMap map) {
1854            Collection values=map.values();
1855            List l;
1856            int size=0;
1857            for(Iterator it=values.iterator(); it.hasNext();) {
1858                l=(List)it.next();
1859                size+=l.size();
1860            }
1861            return size;
1862        }
1863    }
1864
1865
1866    String JavaDoc dumpMessages(HashMap map) {
1867        StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
1868        Map.Entry entry;
1869        List l;
1870        Object JavaDoc key;
1871        if(map != null) {
1872            synchronized(map) {
1873                for(Iterator it=map.entrySet().iterator(); it.hasNext();) {
1874                    entry=(Map.Entry)it.next();
1875                    key=entry.getKey();
1876                    if(key == null)
1877                        key="null";
1878                    l=(List)entry.getValue();
1879                    sb.append(key).append(": ");
1880                    sb.append(l.size()).append(" msgs\n");
1881                }
1882            }
1883        }
1884        return sb.toString();
1885    }
1886
1887
1888}
1889
Popular Tags