KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > UDP1_4


1 package org.jgroups.protocols;
2
3 import org.apache.commons.logging.Log;
4 import org.apache.commons.logging.LogFactory;
5 import org.jgroups.*;
6 import org.jgroups.stack.LogicalAddress1_4;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.util.Queue;
9 import org.jgroups.util.QueueClosedException;
10 import org.jgroups.util.Util;
11
12 import java.io.*;
13 import java.net.*;
14 import java.util.*;
15
16 /**
17  * Multicast transport. Similar to UDP, but binds to multiple (or all) interfaces for sending and receiving
18  * multicast and unicast traffic.<br/>
19  * The list of interfaces can be set via a property (comma-delimited list of IP addresses or "all" for all
20  * interfaces). Note that this class only works under JDK 1.4 and higher.<br/>
21  * For each of the interfaces listed we create a Listener, which listens on the group multicast address and creates
22  * a unicast datagram socket. The address of this member is determined at startup time, and is the host name plus
23  * a timestamp (LogicalAddress1_4). It does not change during the lifetime of the process. The LogicalAddress1_4 contains
24  * a list of all unicast socket addresses to which we can send back unicast messages. When we send a message, the
25  * Listener adds the sender's return address. When we receive a message, we add that address to our routing cache, which
26  * contains logical addresses and physical addresses. When we need to send a unicast address, we first check whether
27  * the logical address has a physical address associated with it in the cache. If so, we send a message to that address.
28  * If not, we send the unicast message to <em>all</em> physical addresses contained in the LogicalAddress1_4.<br/>
29  * UDP1_4 guarantees that - in scenarios with multiple subnets and multi-homed machines - members do see each other.
30  * There is some overhead in multicasting the same message on multiple interfaces, and potentially sending a unicast
31  * on multiple interfaces as well, but the advantage is that we don't need stuff like bind_addr any longer. Plus,
32  * the unicast routing caches should ensure that unicasts are only sent via 1 interface in almost all cases.
33  *
34  * @author Bela Ban Oct 2003
35  * @version $Id: UDP1_4.java,v 1.21 2005/04/11 09:00:26 belaban Exp $
36  * todo: sending of dummy packets
37  */

38 public class UDP1_4 extends Protocol implements Receiver {
39
40     static final String JavaDoc name="UDP1_4";
41
42     /** Maintains a list of Connectors, one for each interface we're listening on */
43     ConnectorTable ct=null;
44
45     /** A List<String> of bind addresses, we create 1 Connector for each interface */
46     List bind_addrs=null;
47
48     /** The name of the group to which this member is connected */
49     String JavaDoc group_name=null;
50
51     /** The multicast address (mcast address and port) this member uses (default: 230.1.2.3:7500) */
52     InetSocketAddress mcast_addr=null;
53
54     /** The address of this member. Valid for the lifetime of the JVM in which this member runs */
55     LogicalAddress1_4 local_addr=new LogicalAddress1_4(null, null);
56
57     /** Logical address without list of physical addresses */
58     LogicalAddress1_4 local_addr_canonical=local_addr.copy();
59
60     /** Pre-allocated byte stream. Used for serializing datagram packets */
61     ByteArrayOutputStream out_stream=new ByteArrayOutputStream(65535);
62
63     /**
64      * The port to which the unicast receiver socket binds.
65      * 0 means to bind to any (ephemeral) port
66      */

67     int local_bind_port=0;
68     int port_range=1; // 27-6-2003 bgooren, Only try one port by default
69

70
71     /**
72      * Whether to enable IP multicasting. If false, multiple unicast datagram
73      * packets are sent rather than one multicast packet
74      */

75     boolean ip_mcast=true;
76
77     /** The time-to-live (TTL) for multicast datagram packets */
78     int ip_ttl=32;
79
80     /** The members of this group (updated when a member joins or leaves) */
81     Vector members=new Vector();
82
83     /**
84      * Header to be added to all messages sent via this protocol. It is
85      * preallocated for efficiency
86      */

87     UdpHeader udp_hdr=null;
88
89     /** Send buffer size of the multicast datagram socket */
90     int mcast_send_buf_size=300000;
91
92     /** Receive buffer size of the multicast datagram socket */
93     int mcast_recv_buf_size=300000;
94
95     /** Send buffer size of the unicast datagram socket */
96     int ucast_send_buf_size=300000;
97
98     /** Receive buffer size of the unicast datagram socket */
99     int ucast_recv_buf_size=300000;
100
101     /**
102      * If true, messages sent to self are treated specially: unicast messages are
103      * looped back immediately, multicast messages get a local copy first and -
104      * when the real copy arrives - it will be discarded. Useful for Window
105      * media (non)sense
106      * @deprecated This is used by default now
107      */

108     boolean loopback=true; //todo: remove
109

110     /**
111      * Sometimes receivers are overloaded (they have to handle de-serialization etc).
112      * Packet handler is a separate thread taking care of de-serialization, receiver
113      * thread(s) simply put packet in queue and return immediately. Setting this to
114      * true adds one more thread
115      */

116     boolean use_packet_handler=false;
117
118     /** Used by packet handler to store incoming DatagramPackets */
119     Queue packet_queue=null;
120
121     /**
122      * If set it will be added to <tt>local_addr</tt>. Used to implement
123      * for example transport independent addresses
124      */

125     byte[] additional_data=null;
126
127     /**
128      * Dequeues DatagramPackets from packet_queue, unmarshalls them and
129      * calls <tt>handleIncomingUdpPacket()</tt>
130      */

131     PacketHandler packet_handler=null;
132
133     protected static Log mylog=LogFactory.getLog(UDP1_4.class);
134
135
136     final int VERSION_LENGTH=Version.getLength();
137
138     /** Number of bytes to allocate to receive a packet. Needs to be set to be higher than frag_size
139      * (handle CONFIG event)
140      */

141     static final int DEFAULT_RECEIVE_BUFFER_SIZE=120000; // todo: make settable and/or use CONFIG event
142

143
144
145
146     /**
147      * Public constructor. creates the UDP protocol, and initializes the
148      * state variables, does however not start any sockets or threads
149      */

150     public UDP1_4() {
151     }
152
153     /**
154      * debug only
155      */

156     public String JavaDoc toString() {
157         return "Protocol UDP(local address: " + local_addr + ')';
158     }
159
160
161     public void receive(DatagramPacket packet) {
162         int len=packet.getLength();
163         byte[] data=packet.getData();
164         SocketAddress sender=packet.getSocketAddress();
165
166         if(len == 4) { // received a diagnostics probe
167
if(data[0] == 'd' && data[1] == 'i' && data[2] == 'a' && data[3] == 'g') {
168                 handleDiagnosticProbe(sender);
169                 return;
170             }
171         }
172
173         if(mylog.isTraceEnabled())
174             mylog.trace("received " + len + " bytes from " + sender);
175
176         if(Version.compareTo(packet.getData()) == false) {
177             if(mylog.isWarnEnabled()) mylog.warn("packet from " + sender + " has different version (" +
178                     Version.printVersionId(data, Version.version_id.length) +
179                     ") from ours (" + Version.printVersionId(Version.version_id) +
180                     "). This may cause problems");
181         }
182
183         if(use_packet_handler && packet_queue != null) {
184             byte[] tmp=new byte[len];
185             System.arraycopy(data, 0, tmp, 0, len);
186             try {
187                 Object JavaDoc[] arr=new Object JavaDoc[]{tmp, sender};
188                 packet_queue.add(arr);
189                 return;
190             }
191             catch(QueueClosedException e) {
192                 if(mylog.isWarnEnabled()) mylog.warn("packet queue for packet handler thread is closed");
193                 // pass through to handleIncomingPacket()
194
}
195         }
196
197         handleIncomingUdpPacket(data, sender);
198     }
199
200
201     /* ----------------------- Receiving of MCAST UDP packets ------------------------ */
202
203 // public void run() {
204
// DatagramPacket packet;
205
// byte receive_buf[]=new byte[65000];
206
// int len;
207
// byte[] tmp1, tmp2;
208
//
209
// // moved out of loop to avoid excessive object creations (bela March 8 2001)
210
// packet=new DatagramPacket(receive_buf, receive_buf.length);
211
//
212
// while(mcast_receiver != null && mcast_sock != null) {
213
// try {
214
// packet.setData(receive_buf, 0, receive_buf.length);
215
// mcast_sock.receive(packet);
216
// len=packet.getLength();
217
// if(len == 1 && packet.getData()[0] == 0) {
218
// if(log.isTraceEnabled()) if(log.isInfoEnabled()) log.info("UDP1_4.run()", "received dummy packet");
219
// continue;
220
// }
221
//
222
// if(len == 4) { // received a diagnostics probe
223
// byte[] tmp=packet.getData();
224
// if(tmp[0] == 'd' && tmp[1] == 'i' && tmp[2] == 'a' && tmp[3] == 'g') {
225
// handleDiagnosticProbe(null, null);
226
// continue;
227
// }
228
// }
229
//
230
// if(log.isTraceEnabled())
231
// if(log.isInfoEnabled()) log.info("UDP1_4.receive()", "received (mcast) " + packet.getLength() + " bytes from " +
232
// packet.getAddress() + ":" + packet.getPort() + " (size=" + len + " bytes)");
233
// if(len > receive_buf.length) {
234
// if(log.isErrorEnabled()) log.error("UDP1_4.run()", "size of the received packet (" + len + ") is bigger than " +
235
// "allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " +
236
// "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length);
237
// }
238
//
239
// if(Version.compareTo(packet.getData()) == false) {
240
// if(log.isWarnEnabled()) log.warn("UDP1_4.run()",
241
// "packet from " + packet.getAddress() + ":" + packet.getPort() +
242
// " has different version (" +
243
// Version.printVersionId(packet.getData(), Version.version_id.length) +
244
// ") from ours (" + Version.printVersionId(Version.version_id) +
245
// "). This may cause problems");
246
// }
247
//
248
// if(use_packet_handler) {
249
// tmp1=packet.getData();
250
// tmp2=new byte[len];
251
// System.arraycopy(tmp1, 0, tmp2, 0, len);
252
// packet_queue.add(tmp2);
253
// } else
254
// handleIncomingUdpPacket(packet.getData());
255
// } catch(SocketException sock_ex) {
256
// if(log.isInfoEnabled()) log.info("UDP1_4.run()", "multicast socket is closed, exception=" + sock_ex);
257
// break;
258
// } catch(InterruptedIOException io_ex) { // thread was interrupted
259
// ; // go back to top of loop, where we will terminate loop
260
// } catch(Throwable ex) {
261
// if(log.isErrorEnabled()) log.error("UDP1_4.run()", "exception=" + ex + ", stack trace=" + Util.printStackTrace(ex));
262
// Util.sleep(1000); // so we don't get into 100% cpu spinning (should NEVER happen !)
263
// }
264
// }
265
// if(log.isInfoEnabled()) log.info("UDP1_4.run()", "multicast thread terminated");
266
// }
267

268     void handleDiagnosticProbe(SocketAddress sender) {
269         try {
270             byte[] diag_rsp=getDiagResponse().getBytes();
271             DatagramPacket rsp=new DatagramPacket(diag_rsp, 0, diag_rsp.length, sender);
272
273                 if(mylog.isInfoEnabled()) mylog.info("sending diag response to " + sender);
274             ct.send(rsp);
275         } catch(Throwable JavaDoc t) {
276             if(mylog.isErrorEnabled()) mylog.error("failed sending diag rsp to " + sender + ", exception=" + t);
277         }
278     }
279
280     String JavaDoc getDiagResponse() {
281         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
282         sb.append(local_addr).append(" (").append(group_name).append(')');
283         sb.append(" [").append(mcast_addr).append("]\n");
284         sb.append("Version=").append(Version.version).append(", cvs=\"").append(Version.cvs).append("\"\n");
285         sb.append("physical addresses: ").append(local_addr.getPhysicalAddresses()).append('\n');
286         sb.append("members: ").append(members).append('\n');
287
288         return sb.toString();
289     }
290
291     /* ------------------------------------------------------------------------------- */
292
293
294
295     /*------------------------------ Protocol interface ------------------------------ */
296
297     public String JavaDoc getName() {
298         return name;
299     }
300
301
302     public void init() throws Exception JavaDoc {
303         if(use_packet_handler) {
304             packet_queue=new Queue();
305             packet_handler=new PacketHandler();
306         }
307     }
308
309
310     /**
311      * Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads
312      */

313     public void start() throws Exception JavaDoc {
314          if(mylog.isInfoEnabled()) mylog.info("creating sockets and starting threads");
315         if(ct == null) {
316             ct=new ConnectorTable(mcast_addr, DEFAULT_RECEIVE_BUFFER_SIZE, mcast_recv_buf_size, ip_mcast, this);
317
318             for(Iterator it=bind_addrs.iterator(); it.hasNext();) {
319                 String JavaDoc bind_addr=(String JavaDoc)it.next();
320                 ct.listenOn(bind_addr, local_bind_port, port_range, DEFAULT_RECEIVE_BUFFER_SIZE, ucast_recv_buf_size,
321                         ucast_send_buf_size, ip_ttl, this);
322             }
323
324             // add physical addresses to local_addr
325
List physical_addrs=ct.getConnectorAddresses(); // must be non-null and size() >= 1
326
for(Iterator it=physical_addrs.iterator(); it.hasNext();) {
327                 SocketAddress address=(SocketAddress)it.next();
328                 local_addr.addPhysicalAddress(address);
329             }
330
331             if(additional_data != null)
332                 local_addr.setAdditionalData(additional_data);
333
334             ct.start();
335
336             passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
337             if(use_packet_handler)
338             packet_handler.start();
339         }
340     }
341
342
343     public void stop() {
344          if(mylog.isInfoEnabled()) mylog.info("closing sockets and stopping threads");
345         if(packet_handler != null)
346             packet_handler.stop();
347         if(ct != null) {
348             ct.stop();
349             ct=null;
350         }
351         local_addr.removeAllPhysicalAddresses();
352     }
353
354
355     /**
356      * Setup the Protocol instance acording to the configuration string
357      * The following properties are being read by the UDP protocol
358      * param mcast_addr - the multicast address to use default is 224.0.0.200
359      * param mcast_port - (int) the port that the multicast is sent on default is 7500
360      * param ip_mcast - (boolean) flag whether to use IP multicast - default is true
361      * param ip_ttl - Set the default time-to-live for multicast packets sent out on this socket. default is 32
362      *
363      * @return true if no other properties are left.
364      * false if the properties still have data in them, ie ,
365      * properties are left over and not handled by the protocol stack
366      */

367     public boolean setProperties(Properties props) {
368         String JavaDoc str;
369         List exclude_list=null;
370         String JavaDoc mcast_addr_name="230.8.8.8";
371         int mcast_port=7500;
372
373         super.setProperties(props);
374         str=props.getProperty("bind_addrs");
375         if(str != null) {
376             str=str.trim();
377             if("all".equals(str.toLowerCase())) {
378                 try {
379                     bind_addrs=determineAllBindInterfaces();
380                 }
381                 catch(SocketException e) {
382                     e.printStackTrace();
383                     bind_addrs=null;
384                 }
385             }
386             else {
387                 bind_addrs=Util.parseCommaDelimitedStrings(str);
388             }
389             props.remove("bind_addrs");
390         }
391
392         str=props.getProperty("bind_addrs_exclude");
393         if(str != null) {
394             str=str.trim();
395             exclude_list=Util.parseCommaDelimitedStrings(str);
396             props.remove("bind_addrs_exclude");
397         }
398
399         str=props.getProperty("bind_port");
400         if(str != null) {
401             local_bind_port=Integer.parseInt(str);
402             props.remove("bind_port");
403         }
404
405         str=props.getProperty("start_port");
406         if(str != null) {
407             local_bind_port=Integer.parseInt(str);
408             props.remove("start_port");
409         }
410
411         str=props.getProperty("port_range");
412         if(str != null) {
413             port_range=Integer.parseInt(str);
414             props.remove("port_range");
415         }
416
417         str=props.getProperty("mcast_addr");
418         if(str != null) {
419             mcast_addr_name=str;
420             props.remove("mcast_addr");
421         }
422
423         str=props.getProperty("mcast_port");
424         if(str != null) {
425             mcast_port=Integer.parseInt(str);
426             props.remove("mcast_port");
427         }
428
429         str=props.getProperty("ip_mcast");
430         if(str != null) {
431             ip_mcast=Boolean.valueOf(str).booleanValue();
432             props.remove("ip_mcast");
433         }
434
435         str=props.getProperty("ip_ttl");
436         if(str != null) {
437             ip_ttl=Integer.parseInt(str);
438             props.remove("ip_ttl");
439         }
440
441         str=props.getProperty("mcast_send_buf_size");
442         if(str != null) {
443             mcast_send_buf_size=Integer.parseInt(str);
444             props.remove("mcast_send_buf_size");
445         }
446
447         str=props.getProperty("mcast_recv_buf_size");
448         if(str != null) {
449             mcast_recv_buf_size=Integer.parseInt(str);
450             props.remove("mcast_recv_buf_size");
451         }
452
453         str=props.getProperty("ucast_send_buf_size");
454         if(str != null) {
455             ucast_send_buf_size=Integer.parseInt(str);
456             props.remove("ucast_send_buf_size");
457         }
458
459         str=props.getProperty("ucast_recv_buf_size");
460         if(str != null) {
461             ucast_recv_buf_size=Integer.parseInt(str);
462             props.remove("ucast_recv_buf_size");
463         }
464
465         str=props.getProperty("use_packet_handler");
466         if(str != null) {
467             use_packet_handler=Boolean.valueOf(str).booleanValue();
468             props.remove("use_packet_handler");
469         }
470
471
472         // determine mcast_addr
473
mcast_addr=new InetSocketAddress(mcast_addr_name, mcast_port);
474
475         // handling of bind_addrs
476
if(bind_addrs == null)
477             bind_addrs=new ArrayList();
478         if(bind_addrs.size() == 0) {
479             try {
480                 String JavaDoc default_bind_addr=determineDefaultBindInterface();
481                 bind_addrs.add(default_bind_addr);
482             }
483             catch(SocketException ex) {
484                 if(mylog.isErrorEnabled()) mylog.error("failed determining the default bind interface: " + ex);
485             }
486         }
487         if(exclude_list != null) {
488             bind_addrs.removeAll(exclude_list);
489         }
490         if(bind_addrs.size() == 0) {
491             if(mylog.isErrorEnabled()) mylog.error("no valid bind interface found, unable to listen for network traffic");
492             return false;
493         }
494         else {
495
496                 if(mylog.isInfoEnabled()) mylog.info("bind interfaces are " + bind_addrs);
497         }
498
499         if(props.size() > 0) {
500             System.err.println("UDP1_4.setProperties(): the following properties are not recognized:");
501             props.list(System.out);
502             return false;
503         }
504         return true;
505     }
506
507
508     /**
509      * DON'T REMOVE ! This prevents the up-handler thread to be created, which essentially is superfluous:
510      * messages are received from the network rather than from a layer below.
511      */

512     public void startUpHandler() {
513         ;
514     }
515
516     /**
517      * handle the UP event.
518      *
519      * @param evt - the event being send from the stack
520      */

521     public void up(Event evt) {
522         passUp(evt);
523
524         switch(evt.getType()) {
525
526             case Event.CONFIG:
527                 passUp(evt);
528                  if(mylog.isInfoEnabled()) mylog.info("received CONFIG event: " + evt.getArg());
529                 handleConfigEvent((HashMap)evt.getArg());
530                 return;
531         }
532
533         passUp(evt);
534     }
535
536     /**
537      * Caller by the layer above this layer. Usually we just put this Message
538      * into the send queue and let one or more worker threads handle it. A worker thread
539      * then removes the Message from the send queue, performs a conversion and adds the
540      * modified Message to the send queue of the layer below it, by calling Down).
541      */

542     public void down(Event evt) {
543         Message msg;
544         Object JavaDoc dest_addr;
545
546         if(evt.getType() != Event.MSG) { // unless it is a message handle it and respond
547
handleDownEvent(evt);
548             return;
549         }
550
551         msg=(Message)evt.getArg();
552
553         if(udp_hdr != null && udp_hdr.channel_name != null) {
554             // added patch by Roland Kurmann (March 20 2003)
555
msg.putHeader(name, udp_hdr);
556         }
557
558         dest_addr=msg.getDest();
559
560         // Because we don't call Protocol.passDown(), we notify the observer directly (e.g. PerfObserver).
561
// This way, we still have performance numbers for UDP
562
if(observer != null)
563             observer.passDown(evt);
564
565         if(dest_addr == null) { // 'null' means send to all group members
566
if(ip_mcast == false) {
567                 //sends a separate UDP message to each address
568
sendMultipleUdpMessages(msg, members);
569                 return;
570             }
571         }
572
573         try {
574             sendUdpMessage(msg); // either unicast (dest != null) or multicast (dest == null)
575
}
576         catch(Exception JavaDoc e) {
577             if(mylog.isErrorEnabled()) mylog.error("exception=" + e + ", msg=" + msg + ", mcast_addr=" + mcast_addr);
578         }
579     }
580
581
582
583
584
585
586     /*--------------------------- End of Protocol interface -------------------------- */
587
588
589     /* ------------------------------ Private Methods -------------------------------- */
590
591
592     void handleMessage(Message msg) {
593
594     }
595
596
597     /**
598      * Processes a packet read from either the multicast or unicast socket. Needs to be synchronized because
599      * mcast or unicast socket reads can be concurrent
600      */

601     void handleIncomingUdpPacket(byte[] data, SocketAddress sender) {
602         ByteArrayInputStream inp_stream;
603         ObjectInputStream inp;
604         Message msg=null;
605         UdpHeader hdr=null;
606         Event evt;
607         Address dst, src;
608
609         try {
610             // skip the first n bytes (default: 4), this is the version info
611
inp_stream=new ByteArrayInputStream(data, VERSION_LENGTH, data.length - VERSION_LENGTH);
612             inp=new ObjectInputStream(inp_stream);
613             msg=new Message();
614             msg.readExternal(inp);
615             dst=msg.getDest();
616             src=msg.getSrc();
617             if(src == null) {
618                 if(mylog.isErrorEnabled()) mylog.error("sender's address is null");
619             }
620             else {
621                 ((LogicalAddress1_4)src).setPrimaryPhysicalAddress(sender);
622             }
623
624             // discard my own multicast loopback copy
625
if((dst == null || dst.isMulticastAddress()) && src != null && local_addr.equals(src)) {
626                 if(mylog.isTraceEnabled())
627                     mylog.trace("discarded own loopback multicast packet");
628
629                 // System.out.println("-- discarded " + msg.getObject());
630

631                 return;
632             }
633
634             evt=new Event(Event.MSG, msg);
635             if(mylog.isTraceEnabled())
636                 mylog.trace("Message is " + msg + ", headers are " + msg.getHeaders());
637
638             /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
639              * This allows e.g. PerfObserver to get the time of reception of a message */

640             if(observer != null)
641                 observer.up(evt, up_queue.size());
642
643             hdr=(UdpHeader)msg.removeHeader(name);
644         } catch(Throwable JavaDoc e) {
645             if(mylog.isErrorEnabled()) mylog.error("exception=" + Util.getStackTrace(e));
646             return;
647         }
648
649         if(hdr != null) {
650
651             /* Discard all messages destined for a channel with a different name */
652             String JavaDoc ch_name=null;
653
654             if(hdr.channel_name != null)
655                 ch_name=hdr.channel_name;
656
657             // Discard if message's group name is not the same as our group name unless the
658
// message is a diagnosis message (special group name DIAG_GROUP)
659
if(ch_name != null && group_name != null && !group_name.equals(ch_name) &&
660                     !ch_name.equals(Util.DIAG_GROUP)) {
661
662                     if(mylog.isWarnEnabled()) mylog.warn("discarded message from different group (" +
663                             ch_name + "). Sender was " + msg.getSrc());
664                 return;
665             }
666         }
667
668         passUp(evt);
669     }
670
671
672     /**
673      * Send a message to the address specified in dest
674      */

675     void sendUdpMessage(Message msg) throws Exception JavaDoc {
676         Address dest, src;
677         ObjectOutputStream out;
678         byte buf[];
679         DatagramPacket packet;
680         Message copy;
681         Event evt; // for loopback messages
682

683         dest=msg.getDest(); // if non-null: unicast, else multicast
684
SRC=msg.getSrc();
685         if(src == null) {
686             src=local_addr_canonical; // no physical addresses present
687
msg.setSrc(src);
688         }
689
690         if(mylog.isTraceEnabled())
691             mylog.trace("sending message to " + msg.getDest() +
692                     " (src=" + msg.getSrc() + "), headers are " + msg.getHeaders());
693
694         // Don't send if destination is local address. Instead, switch dst and src and put in up_queue.
695
// If multicast message, loopback a copy directly to us (but still multicast). Once we receive this,
696
// we will discard our own multicast message
697
if(dest == null || dest.isMulticastAddress() || dest.equals(local_addr)) {
698             copy=msg.copy();
699             copy.removeHeader(name);
700             evt=new Event(Event.MSG, copy);
701
702             /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
703                This allows e.g. PerfObserver to get the time of reception of a message */

704             if(observer != null)
705                 observer.up(evt, up_queue.size());
706             if(mylog.isTraceEnabled()) mylog.trace("looped back local message " + copy);
707
708             // System.out.println("\n-- passing up packet id=" + copy.getObject());
709
passUp(evt);
710             // System.out.println("-- passed up packet id=" + copy.getObject());
711

712             if(dest != null && !dest.isMulticastAddress())
713                 return; // it is a unicast message to myself, no need to put on the network
714
}
715
716         out_stream.reset();
717         out_stream.write(Version.version_id, 0, Version.version_id.length); // write the version
718
out=new ObjectOutputStream(out_stream);
719         msg.writeExternal(out);
720         out.flush(); // needed if out buffers its output to out_stream
721
buf=out_stream.toByteArray();
722         packet=new DatagramPacket(buf, buf.length, mcast_addr);
723
724         //System.out.println("-- sleeping 4 secs");
725
// Thread.sleep(4000);
726

727
728         // System.out.println("\n-- sending packet " + msg.getObject());
729
ct.send(packet);
730         // System.out.println("-- sent " + msg.getObject());
731
}
732
733
734     void sendMultipleUdpMessages(Message msg, Vector dests) {
735         Address dest;
736
737         for(int i=0; i < dests.size(); i++) {
738             dest=(Address)dests.elementAt(i);
739             msg.setDest(dest);
740
741             try {
742                 sendUdpMessage(msg);
743             }
744             catch(Exception JavaDoc e) {
745                 if(mylog.isDebugEnabled()) mylog.debug("exception=" + e);
746             }
747         }
748     }
749
750
751
752
753
754 //
755
// /**
756
// * Workaround for the problem encountered in certains JDKs that a thread listening on a socket
757
// * cannot be interrupted. Therefore we just send a dummy datagram packet so that the thread 'wakes up'
758
// * and realizes it has to terminate. Should be removed when all JDKs support Thread.interrupt() on
759
// * reads. Uses send_sock t send dummy packet, so this socket has to be still alive.
760
// *
761
// * @param dest The destination host. Will be local host if null
762
// * @param port The destination port
763
// */
764
// void sendDummyPacket(InetAddress dest, int port) {
765
// DatagramPacket packet;
766
// byte[] buf={0};
767
//
768
// if(dest == null) {
769
// try {
770
// dest=InetAddress.getLocalHost();
771
// } catch(Exception e) {
772
// }
773
// }
774
//
775
// if(log.isTraceEnabled()) if(log.isInfoEnabled()) log.info("UDP1_4.sendDummyPacket()", "sending packet to " + dest + ":" + port);
776
//
777
// if(ucast_sock == null || dest == null) {
778
// if(log.isWarnEnabled()) log.warn("UDP1_4.sendDummyPacket()", "send_sock was null or dest was null, cannot send dummy packet");
779
// return;
780
// }
781
// packet=new DatagramPacket(buf, buf.length, dest, port);
782
// try {
783
// ucast_sock.send(packet);
784
// } catch(Throwable e) {
785
// if(log.isErrorEnabled()) log.error("UDP1_4.sendDummyPacket()", "exception sending dummy packet to " +
786
// dest + ":" + port + ": " + e);
787
// }
788
// }
789

790
791
792
793
794     void handleDownEvent(Event evt) {
795         switch(evt.getType()) {
796
797             case Event.TMP_VIEW:
798             case Event.VIEW_CHANGE:
799                 synchronized(members) {
800                     members.removeAllElements();
801                     Vector tmpvec=((View)evt.getArg()).getMembers();
802                     for(int i=0; i < tmpvec.size(); i++)
803                         members.addElement(tmpvec.elementAt(i));
804                 }
805                 break;
806
807             case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local)
808
passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
809                 break;
810
811             case Event.CONNECT:
812                 group_name=(String JavaDoc)evt.getArg();
813                 udp_hdr=new UdpHeader(group_name);
814
815                 // removed March 18 2003 (bela), not needed (handled by GMS)
816
// changed July 2 2003 (bela): we discard CONNECT_OK at the GMS level anyway, this might
817
// be needed if we run without GMS though
818
passUp(new Event(Event.CONNECT_OK));
819                 break;
820
821             case Event.DISCONNECT:
822                 passUp(new Event(Event.DISCONNECT_OK));
823                 break;
824
825             case Event.CONFIG:
826                  if(mylog.isInfoEnabled()) mylog.info("received CONFIG event: " + evt.getArg());
827                 handleConfigEvent((HashMap)evt.getArg());
828                 break;
829         }
830     }
831
832
833     void handleConfigEvent(HashMap map) {
834         if(map == null) return;
835         if(map.containsKey("additional_data"))
836             additional_data=(byte[])map.get("additional_data");
837         if(map.containsKey("send_buf_size")) {
838             mcast_send_buf_size=((Integer JavaDoc)map.get("send_buf_size")).intValue();
839             ucast_send_buf_size=mcast_send_buf_size;
840         }
841         if(map.containsKey("recv_buf_size")) {
842             mcast_recv_buf_size=((Integer JavaDoc)map.get("recv_buf_size")).intValue();
843             ucast_recv_buf_size=mcast_recv_buf_size;
844         }
845     }
846
847
848     /** Return the first non-loopback interface */
849     public String JavaDoc determineDefaultBindInterface() throws SocketException {
850         for(Enumeration en=NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) {
851             NetworkInterface ni=(NetworkInterface)en.nextElement();
852             for(Enumeration en2=ni.getInetAddresses(); en2.hasMoreElements();) {
853                 InetAddress bind_addr=(InetAddress)en2.nextElement();
854                 if(!bind_addr.isLoopbackAddress()) {
855                     return bind_addr.getHostAddress();
856                 }
857             }
858         }
859         return null;
860     }
861
862     public List determineAllBindInterfaces() throws SocketException {
863         List ret=new ArrayList();
864         for(Enumeration en=NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) {
865             NetworkInterface ni=(NetworkInterface)en.nextElement();
866             for(Enumeration en2=ni.getInetAddresses(); en2.hasMoreElements();) {
867                 InetAddress bind_addr=(InetAddress)en2.nextElement();
868                 ret.add(bind_addr.getHostAddress());
869             }
870         }
871
872         return ret;
873     }
874
875     /* ----------------------------- End of Private Methods ---------------------------------------- */
876
877
878
879     /* ----------------------------- Inner Classes ---------------------------------------- */
880
881
882     /**
883      * This thread fetches byte buffers from the packet_queue, converts them into messages and passes them up
884      * to the higher layer (done in handleIncomingUdpPacket()).
885      */

886     class PacketHandler implements Runnable JavaDoc {
887         Thread JavaDoc t=null;
888
889         public void run() {
890             byte[] data;
891             SocketAddress sender;
892
893             while(packet_queue != null && packet_handler != null) {
894                 try {
895                     Object JavaDoc[] arr=(Object JavaDoc[])packet_queue.remove();
896                     data=(byte[])arr[0];
897                     sender=(SocketAddress)arr[1];
898                 } catch(QueueClosedException closed_ex) {
899                      if(mylog.isInfoEnabled()) mylog.info("packet_handler thread terminating");
900                     break;
901                 }
902                 handleIncomingUdpPacket(data, sender);
903                 data=null; // let's give the poor garbage collector a hand...
904
}
905         }
906
907         void start() {
908             if(t == null) {
909                 t=new Thread JavaDoc(this, "UDP1_4.PacketHandler thread");
910                 t.setDaemon(true);
911                 t.start();
912