KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > UDP


1 // $Id: UDP.java,v 1.76 2005/04/23 20:39:17 belaban Exp $
2

3 package org.jgroups.protocols;
4
5
6 import org.jgroups.*;
7 import org.jgroups.stack.IpAddress;
8 import org.jgroups.stack.Protocol;
9 import org.jgroups.util.List;
10 import org.jgroups.util.*;
11 import org.jgroups.util.Queue;
12
13 import java.io.*;
14 import java.net.*;
15 import java.util.*;
16
17
18
19
20 /**
21  * IP multicast transport based on UDP. Messages to the group (msg.dest == null) will
22  * be multicast (to all group members), whereas point-to-point messages
23  * (msg.dest != null) will be unicast to a single member. Uses a multicast and
24  * a unicast socket.<p>
25  * The following properties are being read by the UDP protocol<p>
26  * param mcast_addr - the multicast address to use default is 228.8.8.8<br>
27  * param mcast_port - (int) the port that the multicast is sent on default is 7600<br>
28  * param ip_mcast - (boolean) flag whether to use IP multicast - default is true<br>
29  * param ip_ttl - Set the default time-to-live for multicast packets sent out on this
30  * socket. default is 32<br>
31  * param use_packet_handler - If set, the mcast and ucast receiver threads just put
32  * the datagram's payload (a byte buffer) into a queue, from where a separate thread
33  * will dequeue and handle them (unmarshal and pass up). This frees the receiver
34  * threads from having to do message unmarshalling; this time can now be spent
35  * receiving packets. If you have lots of retransmissions because of network
36  * input buffer overflow, consider setting this property to true (default is false).
37  * @author Bela Ban
38  */

39 public class UDP extends Protocol implements Runnable JavaDoc {
40
41     /** Socket used for
42      * <ol>
43      * <li>sending unicast packets and
44      * <li>receiving unicast packets
45      * </ol>
46      * The address of this socket will be our local address (<tt>local_addr</tt>) */

47     DatagramSocket sock=null;
48
49     /**
50      * BoundedList<Integer> of the last 100 ports used. This is to avoid reusing a port for DatagramSocket
51      */

52     private static BoundedList last_ports_used=null;
53
54     /** Maintain a list of local ports opened by DatagramSocket. If this is 0, this option is turned off.
55      * If bind_port is null, then this options will be ignored */

56     int num_last_ports=100;
57
58     /** IP multicast socket for <em>receiving</em> multicast packets */
59     MulticastSocket mcast_recv_sock=null;
60
61     /** IP multicast socket for <em>sending</em> multicast packets */
62     MulticastSocket mcast_send_sock=null;
63
64     /** The address (host and port) of this member */
65     IpAddress local_addr=null;
66
67     /** The name of the group to which this member is connected */
68     String JavaDoc channel_name=null;
69
70     UdpHeader udp_hdr=null;
71
72     /** The multicast address (mcast address and port) this member uses */
73     IpAddress mcast_addr=null;
74
75     /** The interface (NIC) to which the unicast and multicast sockets bind */
76     InetAddress bind_addr=null;
77
78     /** Bind the receiver multicast socket to all available interfaces (requires JDK 1.4) */
79     boolean bind_to_all_interfaces=false;
80
81     /** The port to which the unicast receiver socket binds.
82      * 0 means to bind to any (ephemeral) port */

83     int bind_port=0;
84     int port_range=1; // 27-6-2003 bgooren, Only try one port by default
85

86     /** The multicast address used for sending and receiving packets */
87     String JavaDoc mcast_addr_name="228.8.8.8";
88
89     /** The multicast port used for sending and receiving packets */
90     int mcast_port=7600;
91
92     /** The multicast receiver thread */
93     Thread JavaDoc mcast_receiver=null;
94
95     /** The unicast receiver thread */
96     UcastReceiver ucast_receiver=null;
97
98     /** Whether to enable IP multicasting. If false, multiple unicast datagram
99      * packets are sent rather than one multicast packet */

100     boolean ip_mcast=true;
101
102     /** The time-to-live (TTL) for multicast datagram packets */
103     int ip_ttl=64;
104
105     /** The members of this group (updated when a member joins or leaves) */
106     final Vector members=new Vector(11);
107
108     /** Pre-allocated byte stream. Used for serializing datagram packets. Will grow as needed */
109     final ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(1024);
110
111     /** Send buffer size of the multicast datagram socket */
112     int mcast_send_buf_size=32000;
113
114     /** Receive buffer size of the multicast datagram socket */
115     int mcast_recv_buf_size=64000;
116
117     /** Send buffer size of the unicast datagram socket */
118     int ucast_send_buf_size=32000;
119
120     /** Receive buffer size of the unicast datagram socket */
121     int ucast_recv_buf_size=64000;
122
123     /** If true, messages sent to self are treated specially: unicast messages are
124      * looped back immediately, multicast messages get a local copy first and -
125      * when the real copy arrives - it will be discarded. Useful for Window
126      * media (non)sense */

127     boolean loopback=true;
128
129
130     /** Discard packets with a different version. Usually minor version differences are okay. Setting this property
131      * to true means that we expect the exact same version on all incoming packets */

132     boolean discard_incompatible_packets=false;
133
134     /** Sometimes receivers are overloaded (they have to handle de-serialization etc).
135      * Packet handler is a separate thread taking care of de-serialization, receiver
136      * thread(s) simply put packet in queue and return immediately. Setting this to
137      * true adds one more thread */

138     boolean use_incoming_packet_handler=false;
139
140     /** Used by packet handler to store incoming DatagramPackets */
141     Queue incoming_queue=null;
142
143     /** Dequeues DatagramPackets from packet_queue, unmarshalls them and
144      * calls <tt>handleIncomingUdpPacket()</tt> */

145     IncomingPacketHandler incoming_packet_handler=null;
146
147     /** Packets to be sent are stored in outgoing_queue and sent by a separate thread. Enabling this
148      * value uses an additional thread */

149     boolean use_outgoing_packet_handler=false;
150
151     /** Used by packet handler to store outgoing DatagramPackets */
152     Queue outgoing_queue=null;
153
154     OutgoingPacketHandler outgoing_packet_handler=null;
155
156     /** If set it will be added to <tt>local_addr</tt>. Used to implement
157      * for example transport independent addresses */

158     byte[] additional_data=null;
159
160     /** Maximum number of bytes for messages to be queued until they are sent. This value needs to be smaller
161         than the largest UDP datagram packet size */

162     int max_bundle_size=AUTOCONF.senseMaxFragSizeStatic();
163
164     /** Max number of milliseconds until queued messages are sent. Messages are sent when max_bundle_size or
165      * max_bundle_timeout has been exceeded (whichever occurs faster)
166      */

167     long max_bundle_timeout=20;
168
169     /** Enabled bundling of smaller messages into bigger ones */
170     boolean enable_bundling=false;
171
172     /** Used by BundlingOutgoingPacketHandler */
173     TimeScheduler timer=null;
174
175     /** HashMap<Address, Address>. Keys=senders, values=destinations. For each incoming message M with sender S, adds
176      * an entry with key=S and value= sender's IP address and port.
177      */

178     HashMap addr_translation_table=new HashMap();
179
180     boolean use_addr_translation=false;
181
182     /** The name of this protocol */
183     static final String JavaDoc name="UDP";
184
185     static final String JavaDoc IGNORE_BIND_ADDRESS_PROPERTY="ignore.bind.address";
186
187
188     final int VERSION_LENGTH=Version.getLength();
189
190
191
192
193     /**
194      * public constructor. creates the UDP protocol, and initializes the
195      * state variables, does however not start any sockets or threads
196      */

197     public UDP() {
198         ;
199     }
200
201     /**
202      * debug only
203      */

204     public String JavaDoc toString() {
205         return "Protocol UDP(local address: " + local_addr + ')';
206     }
207
208
209     BoundedList getLastPortsUsed() {
210         if(last_ports_used == null)
211             last_ports_used=new BoundedList(num_last_ports);
212         return last_ports_used;
213     }
214
215     /* ----------------------- Receiving of MCAST UDP packets ------------------------ */
216
217     public void run() {
218         DatagramPacket packet;
219         byte receive_buf[]=new byte[65535];
220         int len, sender_port;
221         byte[] tmp, data;
222         InetAddress sender_addr;
223
224         // moved out of loop to avoid excessive object creations (bela March 8 2001)
225
packet=new DatagramPacket(receive_buf, receive_buf.length);
226
227         while(mcast_receiver != null && mcast_recv_sock != null) {
228             try {
229                 packet.setData(receive_buf, 0, receive_buf.length);
230                 mcast_recv_sock.receive(packet);
231                 sender_addr=packet.getAddress();
232                 sender_port=packet.getPort();
233                 len=packet.getLength();
234                 data=packet.getData();
235
236                 if(len == 4) { // received a diagnostics probe
237
if(data[0] == 'd' && data[1] == 'i' && data[2] == 'a' && data[3] == 'g') {
238                         handleDiagnosticProbe(sender_addr, sender_port);
239                         continue;
240                     }
241                 }
242
243                 if(log.isTraceEnabled()){
244                     StringBuffer JavaDoc sb=new StringBuffer JavaDoc("received (mcast) ");
245                     sb.append(len).append(" bytes from ").append(sender_addr).append(':');
246                     sb.append(sender_port).append(" (size=").append(len).append(" bytes)");
247                     log.trace(sb.toString());
248                 }
249                 if(len > receive_buf.length) {
250                     if(log.isErrorEnabled()) log.error("size of the received packet (" + len + ") is bigger than " +
251                                              "allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " +
252                                              "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length);
253                 }
254
255                 if(Version.compareTo(data) == false) {
256                     if(log.isWarnEnabled()) {
257                         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
258                         sb.append("packet from ").append(sender_addr).append(':').append(sender_port);
259                         sb.append(" has different version (").append(Version.printVersionId(data, Version.version_id.length));
260                         sb.append(") from ours (").append(Version.printVersionId(Version.version_id)).append("). ");
261                         if(discard_incompatible_packets)
262                             sb.append("Packet is discarded");
263                         else
264                             sb.append("This may cause problems");
265                         log.warn(sb.toString());
266                     }
267                     if(discard_incompatible_packets)
268                         continue;
269                 }
270
271                 if(use_incoming_packet_handler) {
272                     tmp=new byte[len];
273                     System.arraycopy(data, 0, tmp, 0, len);
274                     incoming_queue.add(new IncomingQueueEntry(mcast_addr, sender_addr, sender_port, tmp));
275                 }
276                 else
277                     handleIncomingUdpPacket(mcast_addr, sender_addr, sender_port, data);
278             }
279             catch(SocketException sock_ex) {
280                  if(log.isTraceEnabled()) log.trace("multicast socket is closed, exception=" + sock_ex);
281                 break;
282             }
283             catch(InterruptedIOException io_ex) { // thread was interrupted
284
; // go back to top of loop, where we will terminate loop
285
}
286             catch(Throwable JavaDoc ex) {
287                 if(log.isErrorEnabled())
288                     log.error("failure in multicast receive()", ex);
289                 Util.sleep(100); // so we don't get into 100% cpu spinning (should NEVER happen !)
290
}
291         }
292         if(log.isDebugEnabled()) log.debug("multicast thread terminated");
293     }
294
295 // private void printPacket(DatagramPacket packet) {
296
// StringBuffer sb=new StringBuffer();
297
// sb.append(packet.getAddress()).append(":").append(packet.getPort());
298
// System.out.println("packet: " + sb.toString());
299
// }
300

301     void handleDiagnosticProbe(InetAddress sender, int port) {
302         try {
303             byte[] diag_rsp=getDiagResponse().getBytes();
304             DatagramPacket rsp=new DatagramPacket(diag_rsp, 0, diag_rsp.length, sender, port);
305             if(log.isDebugEnabled()) log.debug("sending diag response to " + sender + ':' + port);
306             sock.send(rsp);
307         }
308         catch(Throwable JavaDoc t) {
309             if(log.isErrorEnabled()) log.error("failed sending diag rsp to " + sender + ':' + port +
310                                                        ", exception=" + t);
311         }
312     }
313
314     String JavaDoc getDiagResponse() {
315         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
316         sb.append(local_addr).append(" (").append(channel_name).append(')');
317         sb.append(" [").append(mcast_addr_name).append(':').append(mcast_port).append("]\n");
318         sb.append("Version=").append(Version.version).append(", cvs=\"").append(Version.cvs).append("\"\n");
319         sb.append("bound to ").append(bind_addr).append(':').append(bind_port).append('\n');
320         sb.append("members: ").append(members).append('\n');
321
322         return sb.toString();
323     }
324
325     /* ------------------------------------------------------------------------------- */
326
327
328
329     /*------------------------------ Protocol interface ------------------------------ */
330
331     public String JavaDoc getName() {
332         return name;
333     }
334
335
336     public void init() throws Exception JavaDoc {
337         if(use_incoming_packet_handler) {
338             incoming_queue=new Queue();
339             incoming_packet_handler=new IncomingPacketHandler();
340         }
341         if(use_outgoing_packet_handler) {
342             outgoing_queue=new Queue();
343             if(enable_bundling) {
344                 timer=stack != null? stack.timer : null;
345                 if(timer == null)
346                     throw new Exception JavaDoc("timer could not be retrieved");
347                 outgoing_packet_handler=new BundlingOutgoingPacketHandler();
348             }
349             else
350                 outgoing_packet_handler=new OutgoingPacketHandler();
351         }
352     }
353
354
355     /**
356      * Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads
357      */

358     public void start() throws Exception JavaDoc {
359         if(log.isDebugEnabled()) log.debug("creating sockets and starting threads");
360         createSockets();
361         passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
362         startThreads();
363     }
364
365
366     public void stop() {
367         if(log.isDebugEnabled()) log.debug("closing sockets and stopping threads");
368         stopThreads(); // will close sockets, closeSockets() is not really needed anymore, but...
369
closeSockets(); // ... we'll leave it in there for now (doesn't do anything if already closed)
370
}
371
372
373
374     /**
375      * Setup the Protocol instance acording to the configuration string
376      * The following properties are being read by the UDP protocol
377      * param mcast_addr - the multicast address to use default is 228.8.8.8
378      * param mcast_port - (int) the port that the multicast is sent on default is 7600
379      * param ip_mcast - (boolean) flag whether to use IP multicast - default is true
380      * param ip_ttl - Set the default time-to-live for multicast packets sent out on this socket. default is 32
381      * @return true if no other properties are left.
382      * false if the properties still have data in them, ie ,
383      * properties are left over and not handled by the protocol stack
384      *
385      */

386     public boolean setProperties(Properties props) {
387         String JavaDoc str;
388         String JavaDoc tmp = null;
389
390         super.setProperties(props);
391         
392         // PropertyPermission not granted if running in an untrusted environment with JNLP.
393
try {
394             tmp=System.getProperty("bind.address");
395             if(Boolean.getBoolean(IGNORE_BIND_ADDRESS_PROPERTY)) {
396                 tmp=null;
397             }
398         }
399         catch (SecurityException JavaDoc ex){
400         }
401         
402         if(tmp != null)
403             str=tmp;
404         else
405             str=props.getProperty("bind_addr");
406         if(str != null) {
407             try {
408                 bind_addr=InetAddress.getByName(str);
409             }
410             catch(UnknownHostException unknown) {
411                 if(log.isFatalEnabled()) log.fatal("(bind_addr): host " + str + " not known");
412                 return false;
413             }
414             props.remove("bind_addr");
415         }
416
417         str=props.getProperty("bind_to_all_interfaces");
418         if(str != null) {
419             bind_to_all_interfaces=new Boolean JavaDoc(str).booleanValue();
420             props.remove("bind_to_all_interfaces");
421         }
422
423         str=props.getProperty("bind_port");
424         if(str != null) {
425             bind_port=Integer.parseInt(str);
426             props.remove("bind_port");
427         }
428
429         str=props.getProperty("num_last_ports");
430         if(str != null) {
431             num_last_ports=Integer.parseInt(str);
432             props.remove("num_last_ports");
433         }
434
435         str=props.getProperty("start_port");
436         if(str != null) {
437             bind_port=Integer.parseInt(str);
438             props.remove("start_port");
439         }
440
441         str=props.getProperty("port_range");
442         if(str != null) {
443             port_range=Integer.parseInt(str);
444             props.remove("port_range");
445         }
446
447         str=props.getProperty("mcast_addr");
448         if(str != null) {
449             mcast_addr_name=str;
450             props.remove("mcast_addr");
451         }
452
453         str=props.getProperty("mcast_port");
454         if(str != null) {
455             mcast_port=Integer.parseInt(str);
456             props.remove("mcast_port");
457         }
458
459         str=props.getProperty("ip_mcast");
460         if(str != null) {
461             ip_mcast=Boolean.valueOf(str).booleanValue();
462             props.remove("ip_mcast");
463         }
464
465         str=props.getProperty("ip_ttl");
466         if(str != null) {
467             ip_ttl=Integer.parseInt(str);
468             props.remove("ip_ttl");
469         }
470
471         str=props.getProperty("mcast_send_buf_size");
472         if(str != null) {
473             mcast_send_buf_size=Integer.parseInt(str);
474             props.remove("mcast_send_buf_size");
475         }
476
477         str=props.getProperty("mcast_recv_buf_size");
478         if(str != null) {
479             mcast_recv_buf_size=Integer.parseInt(str);
480             props.remove("mcast_recv_buf_size");
481         }
482
483         str=props.getProperty("ucast_send_buf_size");
484         if(str != null) {
485             ucast_send_buf_size=Integer.parseInt(str);
486             props.remove("ucast_send_buf_size");
487         }
488
489         str=props.getProperty("ucast_recv_buf_size");
490         if(str != null) {
491             ucast_recv_buf_size=Integer.parseInt(str);
492             props.remove("ucast_recv_buf_size");
493         }
494
495         str=props.getProperty("loopback");
496         if(str != null) {
497             loopback=Boolean.valueOf(str).booleanValue();
498             props.remove("loopback");
499         }
500
501         str=props.getProperty("discard_incompatibe_packets");
502         if(str != null) {
503             discard_incompatible_packets=Boolean.valueOf(str).booleanValue();
504             props.remove("discard_incompatibe_packets");
505         }
506
507         // this is deprecated, just left for compatibility (use use_incoming_packet_handler)
508
str=props.getProperty("use_packet_handler");
509         if(str != null) {
510             use_incoming_packet_handler=Boolean.valueOf(str).booleanValue();
511             props.remove("use_packet_handler");
512             if(log.isWarnEnabled()) log.warn("'use_packet_handler' is deprecated; use 'use_incoming_packet_handler' instead");
513         }
514
515         str=props.getProperty("use_incoming_packet_handler");
516         if(str != null) {
517             use_incoming_packet_handler=Boolean.valueOf(str).booleanValue();
518             props.remove("use_incoming_packet_handler");
519         }
520
521         str=props.getProperty("use_outgoing_packet_handler");
522         if(str != null) {
523             use_outgoing_packet_handler=Boolean.valueOf(str).booleanValue();
524             props.remove("use_outgoing_packet_handler");
525         }
526
527         str=props.getProperty("max_bundle_size");
528         if(str != null) {
529             int bundle_size=Integer.parseInt(str);
530             if(bundle_size > max_bundle_size) {
531                 if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size +
532                         ") is greater than largest UDP fragmentation size (" + max_bundle_size + ')');
533                 return false;
534             }
535             if(bundle_size <= 0) {
536                 if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size + ") is <= 0");
537                 return false;
538             }
539             max_bundle_size=bundle_size;
540             props.remove("max_bundle_size");
541         }
542
543         str=props.getProperty("max_bundle_timeout");
544         if(str != null) {
545             max_bundle_timeout=Long.parseLong(str);
546             if(max_bundle_timeout <= 0) {
547                 if(log.isErrorEnabled()) log.error("max_bundle_timeout of " + max_bundle_timeout + " is invalid");
548                 return false;
549             }
550             props.remove("max_bundle_timeout");
551         }
552
553         str=props.getProperty("enable_bundling");
554         if(str != null) {
555             enable_bundling=Boolean.valueOf(str).booleanValue();
556             props.remove("enable_bundling");
557         }
558
559         str=props.getProperty("use_addr_translation");
560         if(str != null) {
561             use_addr_translation=Boolean.valueOf(str).booleanValue();
562             props.remove("use_addr_translation");
563         }
564
565         if(props.size() > 0) {
566             System.err.println("UDP.setProperties(): the following properties are not recognized:");
567             props.list(System.out);
568             return false;
569         }
570
571         if(enable_bundling) {
572             if(use_outgoing_packet_handler == false)
573                 if(log.isWarnEnabled()) log.warn("enable_bundling is true; setting use_outgoing_packet_handler=true");
574             use_outgoing_packet_handler=true;
575         }
576
577         return true;
578     }
579
580
581
582     /**
583      * DON'T REMOVE ! This prevents the up-handler thread to be created, which essentially is superfluous:
584      * messages are received from the network rather than from a layer below.
585      */

586     public void startUpHandler() {
587         ;
588     }
589
590     /**
591      * handle the UP event.
592      * @param evt - the event being send from the stack
593      */

594     public void up(Event evt) {
595
596         switch(evt.getType()) {
597
598             case Event.CONFIG:
599                 passUp(evt);
600                  if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
601                 handleConfigEvent((HashMap)evt.getArg());
602                 return;
603         }
604
605         passUp(evt);
606     }
607
608     /**
609      * Caller by the layer above this layer. Usually we just put this Message
610      * into the send queue and let one or more worker threads handle it. A worker thread
611      * then removes the Message from the send queue, performs a conversion and adds the
612      * modified Message to the send queue of the layer below it, by calling Down).
613      */

614     public void down(Event evt) {
615         Message msg;
616         Object JavaDoc dest_addr;
617
618         if(evt.getType() != Event.MSG) { // unless it is a message handle it and respond
619
handleDownEvent(evt);
620             return;
621         }
622
623         msg=(Message)evt.getArg();
624
625         if(channel_name != null) {
626             // added patch by Roland Kurmann (March 20 2003)
627
// msg.putHeader(name, new UdpHeader(channel_name));
628
msg.putHeader(name, udp_hdr);
629         }
630
631         dest_addr=msg.getDest();
632
633         // Because we don't call Protocol.passDown(), we notify the observer directly (e.g. PerfObserver).
634
// This way, we still have performance numbers for UDP
635
if(observer != null)
636             observer.passDown(evt);
637
638         if(dest_addr == null) { // 'null' means send to all group members
639
if(ip_mcast) {
640                 if(mcast_addr == null) {
641                     if(log.isErrorEnabled()) log.error("dest address of message is null, and " +
642                                               "sending to default address fails as mcast_addr is null, too !" +
643                                               " Discarding message " + Util.printEvent(evt));
644                     return;
645                 }
646                 // if we want to use IP multicast, then set the destination of the message
647
msg.setDest(mcast_addr);
648             }
649             else {
650                 //sends a separate UDP message to each address
651
sendMultipleUdpMessages(msg, members);
652                 return;
653             }
654         }
655
656         try {
657             sendUdpMessage(msg);
658         }
659         catch(Exception JavaDoc e) {
660             if(log.isErrorEnabled()) log.error("exception=" + e + ", msg=" + msg + ", mcast_addr=" + mcast_addr);
661         }
662     }
663
664
665
666
667
668
669     /*--------------------------- End of Protocol interface -------------------------- */
670
671
672     /* ------------------------------ Private Methods -------------------------------- */
673
674
675
676     /**
677      * If the sender is null, set our own address. We cannot just go ahead and set the address
678      * anyway, as we might be sending a message on behalf of someone else ! E.gin case of
679      * retransmission, when the original sender has crashed, or in a FLUSH protocol when we
680      * have to return all unstable messages with the FLUSH_OK response.
681      */

682     void setSourceAddress(Message msg) {
683         if(msg.getSrc() == null)
684             msg.setSrc(local_addr);
685     }
686
687
688
689
690     /**
691      * Processes a packet read from either the multicast or unicast socket. Needs to be synchronized because
692      * mcast or unicast socket reads can be concurrent.
693      * Correction (bela April 19 2005): we acces no instance variables, all vars are allocated on the stack, so
694      * this method should be reentrant: removed 'synchronized' keyword
695      */

696     void handleIncomingUdpPacket(IpAddress dest, InetAddress sender, int port, byte[] data) {
697         ByteArrayInputStream inp_stream=null;
698         DataInputStream inp=null;
699         Message msg=null;
700         List l; // used if bundling is enabled
701

702         try {
703             // skip the first n bytes (default: 4), this is the version info
704
inp_stream=new ByteArrayInputStream(data, VERSION_LENGTH, data.length - VERSION_LENGTH);
705             inp=new DataInputStream(inp_stream);
706
707             if(enable_bundling) {
708                 l=bufferToList(inp, dest, sender, port);
709                 for(Enumeration en=l.elements(); en.hasMoreElements();) {
710                     msg=(Message)en.nextElement();
711                     try {
712                         handleMessage(msg);
713                     }
714                     catch(Throwable JavaDoc t) {
715                         if(log.isErrorEnabled()) log.error("failure: " + t.toString());
716                     }
717                 }
718             }
719             else {
720                 msg=bufferToMessage(inp, dest, sender, port);
721                 handleMessage(msg);
722             }
723         }
724         catch(Throwable JavaDoc e) {
725             if(log.isErrorEnabled()) log.error("exception in processing incoming packet", e);
726         }
727         finally {
728             Util.closeInputStream(inp);
729             Util.closeInputStream(inp_stream);
730         }
731     }
732
733
734
735     void handleMessage(Message msg) {
736         Event evt;
737         UdpHeader hdr;
738         Address dst=msg.getDest();
739
740         if(dst == null)
741             dst=mcast_addr;
742
743         // discard my own multicast loopback copy
744
if(loopback) {
745             Address SRC=msg.getSrc();
746             if((dst == null || (dst != null && dst.isMulticastAddress())) && src != null && local_addr.equals(src)) {
747                 if(log.isTraceEnabled())
748                     log.trace("discarded own loopback multicast packet");
749                 return;
750             }
751         }
752
753         evt=new Event(Event.MSG, msg);
754         if(log.isTraceEnabled()) {
755             StringBuffer JavaDoc sb=new StringBuffer JavaDoc("message is ");
756             sb.append(msg).append(", headers are ").append(msg.getHeaders());
757             log.trace(sb.toString());
758         }
759
760         /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
761         * This allows e.g. PerfObserver to get the time of reception of a message */

762         if(observer != null)
763             observer.up(evt, up_queue.size());
764
765         hdr=(UdpHeader)msg.getHeader(name); // replaced removeHeader() with getHeader()
766
if(hdr != null) {
767
768             /* Discard all messages destined for a channel with a different name */
769             String JavaDoc ch_name=hdr.channel_name;
770
771             // Discard if message's group name is not the same as our group name unless the
772
// message is a diagnosis message (special group name DIAG_GROUP)
773
if(ch_name != null && channel_name != null && !channel_name.equals(ch_name) &&
774                     !ch_name.equals(Util.DIAG_GROUP)) {
775                 if(log.isWarnEnabled()) log.warn("discarded message from different group (" +
776                                                  ch_name + "). Sender was " + msg.getSrc());
777                 return;
778             }
779         }
780         else {
781             if(log.isErrorEnabled()) log.error("message does not have a UDP header");
782         }
783         passUp(evt);
784     }
785
786
787     void sendUdpMessage(Message msg) throws Exception JavaDoc {
788         sendUdpMessage(msg, false);
789     }
790
791     /** Send a message to the address specified in dest */
792     void sendUdpMessage(Message msg, boolean copyForOutgoingQueue) throws Exception JavaDoc {
793         IpAddress dest;
794         Message copy;
795         Event evt;
796
797         dest=(IpAddress)msg.getDest(); // guaranteed to be non-null
798
setSourceAddress(msg);
799
800         if(log.isTraceEnabled()) {
801             StringBuffer JavaDoc sb=new StringBuffer JavaDoc("sending msg to ");
802             sb.append(msg.getDest()).append(" (src=").append(msg.getSrc()).append("), headers are ").append(msg.getHeaders());
803             log.trace(sb.toString());
804         }
805
806         // Don't send if destination is local address. Instead, switch dst and src and put in up_queue.
807
// If multicast message, loopback a copy directly to us (but still multicast). Once we receive this,
808
// we will discard our own multicast message
809
if(loopback && (dest.equals(local_addr) || dest.isMulticastAddress())) {
810             copy=msg.copy();
811             // copy.removeHeader(name); // we don't remove the header
812
copy.setSrc(local_addr);
813             // copy.setDest(dest);
814
evt=new Event(Event.MSG, copy);
815
816             /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
817                This allows e.g. PerfObserver to get the time of reception of a message */

818             if(observer != null)
819                 observer.up(evt, up_queue.size());
820             if(log.isTraceEnabled()) log.trace("looped back local message " + copy);
821             passUp(evt);
822             if(dest != null && !dest.isMulticastAddress())
823                 return;
824         }
825
826         if(use_outgoing_packet_handler) {
827             if(copyForOutgoingQueue)
828                 outgoing_queue.add(msg.copy());
829             else
830                 outgoing_queue.add(msg);
831             return;
832         }
833
834         send(msg);
835     }
836
837
838     /** Internal method to serialize and send a message. This method is not reentrant */
839     void send(Message msg) throws Exception JavaDoc {
840         Buffer buf;
841         IpAddress dest=(IpAddress)msg.getDest(); // guaranteed to be non-null
842
IpAddress SRC=(IpAddress)msg.getSrc();
843
844         synchronized(out_stream) {
845             buf=messageToBuffer(msg, dest, src);
846             doSend(buf, dest.getIpAddress(), dest.getPort());
847         }
848     }
849
850
851
852     void doSend(Buffer buf, InetAddress dest, int port) throws IOException {
853         DatagramPacket packet;
854
855         // packet=new DatagramPacket(data, data.length, dest, port);
856
packet=new DatagramPacket(buf.getBuf(), buf.getOffset(), buf.getLength(), dest, port);
857         if(dest.isMulticastAddress() && mcast_send_sock != null) { // mcast_recv_sock might be null if ip_mcast is false
858
mcast_send_sock.send(packet);
859         }
860         else {
861             if(sock != null)
862                 sock.send(packet);
863         }
864     }
865
866
867
868     void sendMultipleUdpMessages(Message msg, Vector dests) {
869         Address dest;
870
871         for(int i=0