KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > UDP1_4


1 package org.jgroups.protocols;
2
3 import org.apache.commons.logging.Log;
4 import org.apache.commons.logging.LogFactory;
5 import org.jgroups.*;
6 import org.jgroups.stack.LogicalAddress1_4;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.util.Queue;
9 import org.jgroups.util.QueueClosedException;
10 import org.jgroups.util.Util;
11
12 import java.io.*;
13 import java.net.*;
14 import java.util.*;
15
16 /**
17  * Multicast transport. Similar to UDP, but binds to multiple (or all) interfaces for sending and receiving
18  * multicast and unicast traffic.<br/>
19  * The list of interfaces can be set via a property (comma-delimited list of IP addresses or "all" for all
20  * interfaces). Note that this class only works under JDK 1.4 and higher.<br/>
21  * For each of the interfaces listed we create a Listener, which listens on the group multicast address and creates
22  * a unicast datagram socket. The address of this member is determined at startup time, and is the host name plus
23  * a timestamp (LogicalAddress1_4). It does not change during the lifetime of the process. The LogicalAddress1_4 contains
24  * a list of all unicast socket addresses to which we can send back unicast messages. When we send a message, the
25  * Listener adds the sender's return address. When we receive a message, we add that address to our routing cache, which
26  * contains logical addresses and physical addresses. When we need to send a unicast address, we first check whether
27  * the logical address has a physical address associated with it in the cache. If so, we send a message to that address.
28  * If not, we send the unicast message to <em>all</em> physical addresses contained in the LogicalAddress1_4.<br/>
29  * UDP1_4 guarantees that - in scenarios with multiple subnets and multi-homed machines - members do see each other.
30  * There is some overhead in multicasting the same message on multiple interfaces, and potentially sending a unicast
31  * on multiple interfaces as well, but the advantage is that we don't need stuff like bind_addr any longer. Plus,
32  * the unicast routing caches should ensure that unicasts are only sent via 1 interface in almost all cases.
33  *
34  * @author Bela Ban Oct 2003
35  * @version $Id: UDP1_4.java,v 1.21 2005/04/11 09:00:26 belaban Exp $
36  * todo: sending of dummy packets
37  */

38 public class UDP1_4 extends Protocol implements Receiver {
39
40     static final String JavaDoc name="UDP1_4";
41
42     /** Maintains a list of Connectors, one for each interface we're listening on */
43     ConnectorTable ct=null;
44
45     /** A List<String> of bind addresses, we create 1 Connector for each interface */
46     List bind_addrs=null;
47
48     /** The name of the group to which this member is connected */
49     String JavaDoc group_name=null;
50
51     /** The multicast address (mcast address and port) this member uses (default: 230.1.2.3:7500) */
52     InetSocketAddress mcast_addr=null;
53
54     /** The address of this member. Valid for the lifetime of the JVM in which this member runs */
55     LogicalAddress1_4 local_addr=new LogicalAddress1_4(null, null);
56
57     /** Logical address without list of physical addresses */
58     LogicalAddress1_4 local_addr_canonical=local_addr.copy();
59
60     /** Pre-allocated byte stream. Used for serializing datagram packets */
61     ByteArrayOutputStream out_stream=new ByteArrayOutputStream(65535);
62
63     /**
64      * The port to which the unicast receiver socket binds.
65      * 0 means to bind to any (ephemeral) port
66      */

67     int local_bind_port=0;
68     int port_range=1; // 27-6-2003 bgooren, Only try one port by default
69

70
71     /**
72      * Whether to enable IP multicasting. If false, multiple unicast datagram
73      * packets are sent rather than one multicast packet
74      */

75     boolean ip_mcast=true;
76
77     /** The time-to-live (TTL) for multicast datagram packets */
78     int ip_ttl=32;
79
80     /** The members of this group (updated when a member joins or leaves) */
81     Vector members=new Vector();
82
83     /**
84      * Header to be added to all messages sent via this protocol. It is
85      * preallocated for efficiency
86      */

87     UdpHeader udp_hdr=null;
88
89     /** Send buffer size of the multicast datagram socket */
90     int mcast_send_buf_size=300000;
91
92     /** Receive buffer size of the multicast datagram socket */
93     int mcast_recv_buf_size=300000;
94
95     /** Send buffer size of the unicast datagram socket */
96     int ucast_send_buf_size=300000;
97
98     /** Receive buffer size of the unicast datagram socket */
99     int ucast_recv_buf_size=300000;
100
101     /**
102      * If true, messages sent to self are treated specially: unicast messages are
103      * looped back immediately, multicast messages get a local copy first and -
104      * when the real copy arrives - it will be discarded. Useful for Window
105      * media (non)sense
106      * @deprecated This is used by default now
107      */

108     boolean loopback=true; //todo: remove
109

110     /**
111      * Sometimes receivers are overloaded (they have to handle de-serialization etc).
112      * Packet handler is a separate thread taking care of de-serialization, receiver
113      * thread(s) simply put packet in queue and return immediately. Setting this to
114      * true adds one more thread
115      */

116     boolean use_packet_handler=false;
117
118     /** Used by packet handler to store incoming DatagramPackets */
119     Queue packet_queue=null;
120
121     /**
122      * If set it will be added to <tt>local_addr</tt>. Used to implement
123      * for example transport independent addresses
124      */

125     byte[] additional_data=null;
126
127     /**
128      * Dequeues DatagramPackets from packet_queue, unmarshalls them and
129      * calls <tt>handleIncomingUdpPacket()</tt>
130      */

131     PacketHandler packet_handler=null;
132
133     protected static Log mylog=LogFactory.getLog(UDP1_4.class);
134
135
136     final int VERSION_LENGTH=Version.getLength();
137
138     /** Number of bytes to allocate to receive a packet. Needs to be set to be higher than frag_size
139      * (handle CONFIG event)
140      */

141     static final int DEFAULT_RECEIVE_BUFFER_SIZE=120000; // todo: make settable and/or use CONFIG event
142

143
144
145
146     /**
147      * Public constructor. creates the UDP protocol, and initializes the
148      * state variables, does however not start any sockets or threads
149      */

150     public UDP1_4() {
151     }
152
153     /**
154      * debug only
155      */

156     public String JavaDoc toString() {
157         return "Protocol UDP(local address: " + local_addr + ')';
158     }
159
160
161     public void receive(DatagramPacket packet) {
162         int len=packet.getLength();
163         byte[] data=packet.getData();
164         SocketAddress sender=packet.getSocketAddress();
165
166         if(len == 4) { // received a diagnostics probe
167
if(data[0] == 'd' && data[1] == 'i' && data[2] == 'a' && data[3] == 'g') {
168                 handleDiagnosticProbe(sender);
169                 return;
170             }
171         }
172
173         if(mylog.isTraceEnabled())
174             mylog.trace("received " + len + " bytes from " + sender);
175
176         if(Version.compareTo(packet.getData()) == false) {
177             if(mylog.isWarnEnabled()) mylog.warn("packet from " + sender + " has different version (" +
178                     Version.printVersionId(data, Version.version_id.length) +
179                     ") from ours (" + Version.printVersionId(Version.version_id) +
180                     "). This may cause problems");
181         }
182
183         if(use_packet_handler && packet_queue != null) {
184             byte[] tmp=new byte[len];
185             System.arraycopy(data, 0, tmp, 0, len);
186             try {
187                 Object JavaDoc[] arr=new Object JavaDoc[]{tmp, sender};
188                 packet_queue.add(arr);
189                 return;
190             }
191             catch(QueueClosedException e) {
192                 if(mylog.isWarnEnabled()) mylog.warn("packet queue for packet handler thread is closed");
193                 // pass through to handleIncomingPacket()
194
}
195         }
196
197         handleIncomingUdpPacket(data, sender);
198     }
199
200
201     /* ----------------------- Receiving of MCAST UDP packets ------------------------ */
202
203 // public void run() {
204
// DatagramPacket packet;
205
// byte receive_buf[]=new byte[65000];
206
// int len;
207
// byte[] tmp1, tmp2;
208
//
209
// // moved out of loop to avoid excessive object creations (bela March 8 2001)
210
// packet=new DatagramPacket(receive_buf, receive_buf.length);
211
//
212
// while(mcast_receiver != null && mcast_sock != null) {
213
// try {
214
// packet.setData(receive_buf, 0, receive_buf.length);
215
// mcast_sock.receive(packet);
216
// len=packet.getLength();
217
// if(len == 1 && packet.getData()[0] == 0) {
218
// if(log.isTraceEnabled()) if(log.isInfoEnabled()) log.info("UDP1_4.run()", "received dummy packet");
219
// continue;
220
// }
221
//
222
// if(len == 4) { // received a diagnostics probe
223
// byte[] tmp=packet.getData();
224
// if(tmp[0] == 'd' && tmp[1] == 'i' && tmp[2] == 'a' && tmp[3] == 'g') {
225
// handleDiagnosticProbe(null, null);
226
// continue;
227
// }
228
// }
229
//
230
// if(log.isTraceEnabled())
231
// if(log.isInfoEnabled()) log.info("UDP1_4.receive()", "received (mcast) " + packet.getLength() + " bytes from " +
232
// packet.getAddress() + ":" + packet.getPort() + " (size=" + len + " bytes)");
233
// if(len > receive_buf.length) {
234
// if(log.isErrorEnabled()) log.error("UDP1_4.run()", "size of the received packet (" + len + ") is bigger than " +
235
// "allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " +
236
// "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length);
237
// }
238
//
239
// if(Version.compareTo(packet.getData()) == false) {
240
// if(log.isWarnEnabled()) log.warn("UDP1_4.run()",
241
// "packet from " + packet.getAddress() + ":" + packet.getPort() +
242
// " has different version (" +
243
// Version.printVersionId(packet.getData(), Version.version_id.length) +
244
// ") from ours (" + Version.printVersionId(Version.version_id) +
245
// "). This may cause problems");
246
// }
247
//
248
// if(use_packet_handler) {
249
// tmp1=packet.getData();
250
// tmp2=new byte[len];
251
// System.arraycopy(tmp1, 0, tmp2, 0, len);
252
// packet_queue.add(tmp2);
253
// } else
254
// handleIncomingUdpPacket(packet.getData());
255
// } catch(SocketException sock_ex) {
256
// if(log.isInfoEnabled()) log.info("UDP1_4.run()", "multicast socket is closed, exception=" + sock_ex);
257
// break;
258
// } catch(InterruptedIOException io_ex) { // thread was interrupted
259
// ; // go back to top of loop, where we will terminate loop
260
// } catch(Throwable ex) {
261
// if(log.isErrorEnabled()) log.error("UDP1_4.run()", "exception=" + ex + ", stack trace=" + Util.printStackTrace(ex));
262
// Util.sleep(1000); // so we don't get into 100% cpu spinning (should NEVER happen !)
263
// }
264
// }
265
// if(log.isInfoEnabled()) log.info("UDP1_4.run()", "multicast thread terminated");
266
// }
267

268     void handleDiagnosticProbe(SocketAddress sender) {
269         try {
270             byte[] diag_rsp=getDiagResponse().getBytes();
271             DatagramPacket rsp=new DatagramPacket(diag_rsp, 0, diag_rsp.length, sender);
272
273                 if(mylog.isInfoEnabled()) mylog.info("sending diag response to " + sender);
274             ct.send(rsp);
275         } catch(Throwable JavaDoc t) {
276             if(mylog.isErrorEnabled()) mylog.error("failed sending diag rsp to " + sender + ", exception=" + t);
277         }
278     }
279
280     String JavaDoc getDiagResponse() {
281         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
282         sb.append(local_addr).append(" (").append(group_name).append(')');
283         sb.append(" [").append(mcast_addr).append("]\n");
284         sb.append("Version=").append(Version.version).append(", cvs=\"").append(Version.cvs).append("\"\n");
285         sb.append("physical addresses: ").append(local_addr.getPhysicalAddresses()).append('\n');
286         sb.append("members: ").append(members).append('\n');
287
288         return sb.toString();
289     }
290
291     /* ------------------------------------------------------------------------------- */
292
293
294
295     /*------------------------------ Protocol interface ------------------------------ */
296
297     public String JavaDoc getName() {
298         return name;
299     }
300
301
302     public void init() throws Exception JavaDoc {
303         if(use_packet_handler) {
304             packet_queue=new Queue();
305             packet_handler=new PacketHandler();
306         }
307     }
308
309
310     /**
311      * Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads
312      */

313     public void start() throws Exception JavaDoc {
314          if(mylog.isInfoEnabled()) mylog.info("creating sockets and starting threads");
315         if(ct == null) {
316             ct=new ConnectorTable(mcast_addr, DEFAULT_RECEIVE_BUFFER_SIZE, mcast_recv_buf_size, ip_mcast, this);
317
318             for(Iterator it=bind_addrs.iterator(); it.hasNext();) {
319                 String JavaDoc bind_addr=(String JavaDoc)it.next();
320                 ct.listenOn(bind_addr, local_bind_port, port_range, DEFAULT_RECEIVE_BUFFER_SIZE, ucast_recv_buf_size,
321                         ucast_send_buf_size, ip_ttl, this);
322             }
323
324             // add physical addresses to local_addr
325
List physical_addrs=ct.getConnectorAddresses(); // must be non-null and size() >= 1
326
for(Iterator it=physical_addrs.iterator(); it.hasNext();) {
327                 SocketAddress address=(SocketAddress)it.next();
328                 local_addr.addPhysicalAddress(address);
329             }
330
331             if(additional_data != null)
332                 local_addr.setAdditionalData(additional_data);
333
334             ct.start();
335
336             passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
337             if(use_packet_handler)
338             packet_handler.start();
339         }
340     }
341
342
343     public void stop() {
344          if(mylog.isInfoEnabled()) mylog.info("closing sockets and stopping threads");
345         if(packet_handler != null)
346             packet_handler.stop();
347         if(ct != null) {
348             ct.stop();
349             ct=null;
350         }
351         local_addr.removeAllPhysicalAddresses();
352     }
353
354
355     /**
356      * Setup the Protocol instance acording to the configuration string
357      * The following properties are being read by the UDP protocol
358      * param mcast_addr - the multicast address to use default is 224.0.0.200
359      * param mcast_port - (int) the port that the multicast is sent on default is 7500
360      * param ip_mcast - (boolean) flag whether to use IP multicast - default is true
361      * param ip_ttl - Set the default time-to-live for multicast packets sent out on this socket. default is 32
362      *
363      * @return true if no other properties are left.
364      * false if the properties still have data in them, ie ,
365      * properties are left over and not handled by the protocol stack
366      */

367     public boolean setProperties(Properties props) {
368         String JavaDoc str;
369         List exclude_list=null;
370         String JavaDoc mcast_addr_name="230.8.8.8";
371         int mcast_port=7500;
372
373         super.setProperties(props);
374         str=props.getProperty("bind_addrs");
375         if(str != null) {
376             str=str.trim();
377             if("all".equals(str.toLowerCase())) {
378                 try {
379                     bind_addrs=determineAllBindInterfaces();
380                 }
381                 catch(SocketException e) {
382                     e.printStackTrace();
383                     bind_addrs=null;
384                 }
385             }
386             else {
387                 bind_addrs=Util.parseCommaDelimitedStrings(str);
388             }
389             props.remove("bind_addrs");
390         }
391
392         str=props.getProperty("bind_addrs_exclude");
393         if(str != null) {
394             str=str.trim();
395             exclude_list=Util.parseCommaDelimitedStrings(str);
396             props.remove("bind_addrs_exclude");
397         }
398
399         str=props.getProperty("bind_port");
400         if(str != null) {
401             local_bind_port=Integer.parseInt(str);
402             props.remove("bind_port");
403         }
404
405         str=props.getProperty("start_port");
406         if(str != null) {
407             local_bind_port=Integer.parseInt(str);
408             props.remove("start_port");
409         }
410
411         str=props.getProperty("port_range");
412         if(str != null) {
413             port_range=Integer.parseInt(str);
414             props.remove("port_range");
415         }
416
417         str=props.getProperty("mcast_addr");
418         if(str != null) {
419             mcast_addr_name=str;
420             props.remove("mcast_addr");
421         }
422
423         str=props.getProperty("mcast_port");
424         if(str != null) {
425             mcast_port=Integer.parseInt(str);
426             props.remove("mcast_port");
427         }
428
429         str=props.getProperty("ip_mcast");
430         if(str != null) {
431             ip_mcast=Boolean.valueOf(str).booleanValue();
432             props.remove("ip_mcast");
433         }
434
435         str=props.getProperty("ip_ttl");
436         if(str != null) {
437             ip_ttl=Integer.parseInt(str);
438             props.remove("ip_ttl");
439         }
440
441         str=props.getProperty("mcast_send_buf_size");
442         if(str != null) {
443             mcast_send_buf_size=Integer.parseInt(str);
444             props.remove("mcast_send_buf_size");
445         }
446
447         str=props.getProperty("mcast_recv_buf_size");
448         if(str != null) {
449             mcast_recv_buf_size=Integer.parseInt(str);
450             props.remove("mcast_recv_buf_size");
451         }
452
453         str=props.getProperty("ucast_send_buf_size");
454         if(str != null) {
455             ucast_send_buf_size=Integer.parseInt(str);
456             props.remove("ucast_send_buf_size");
457         }
458
459         str=props.getProperty("ucast_recv_buf_size");
460         if(str != null) {
461             ucast_recv_buf_size=Integer.parseInt(str);
462             props.remove("ucast_recv_buf_size");
463         }
464
465         str=props.getProperty("use_packet_handler");
466         if(str != null) {
467             use_packet_handler=Boolean.valueOf(str).booleanValue();
468             props.remove("use_packet_handler");
469         }
470
471
472         // determine mcast_addr
473
mcast_addr=new InetSocketAddress(mcast_addr_name, mcast_port);
474
475         // handling of bind_addrs
476
if(bind_addrs == null)
477             bind_addrs=new ArrayList();
478         if(bind_addrs.size() == 0) {
479             try {
480                 String JavaDoc default_bind_addr=determineDefaultBindInterface();
481                 bind_addrs.add(default_bind_addr);
482             }
483             catch(SocketException ex) {
484                 if(mylog.isErrorEnabled()) mylog.error("failed determining the default bind interface: " + ex);
485             }
486         }
487         if(exclude_list != null) {
488             bind_addrs.removeAll(exclude_list);
489         }
490         if(bind_addrs.size() == 0) {
491             if(mylog.isErrorEnabled()) mylog.error("no valid bind interface found, unable to listen for network traffic");
492             return false;
493         }
494         else {
495
496                 if(mylog.isInfoEnabled()) mylog.info("bind interfaces are " + bind_addrs);
497         }
498
499         if(props.size() > 0) {
500             System.err.println("UDP1_4.setProperties(): the following properties are not recognized:");
501             props.list(System.out);
502             return false;
503         }
504         return true;
505     }
506
507
508     /**
509      * DON'T REMOVE ! This prevents the up-handler thread to be created, which essentially is superfluous:
510      * messages are received from the network rather than from a layer below.
511      */

512     public void startUpHandler() {
513         ;
514     }
515
516     /**
517      * handle the UP event.
518      *
519      * @param evt - the event being send from the stack
520      */

521     public void up(Event evt) {
522         passUp(evt);
523
524         switch(evt.getType()) {
525
526             case Event.CONFIG:
527                 passUp(evt);
528                  if(mylog.isInfoEnabled()) mylog.info("received CONFIG event: " + evt.getArg());
529                 handleConfigEvent((HashMap)evt.getArg());
530                 return;
531         }
532
533         passUp(evt);
534     }
535
536     /**
537      * Caller by the layer above this layer. Usually we just put this Message
538      * into the send queue and let one or more worker threads handle it. A worker thread
539      * then removes the Message from the send queue, performs a conversion and adds the
540      * modified Message to the send queue of the layer below it, by calling Down).
541      */

542     public void down(Event evt) {
543         Message msg;
544         Object JavaDoc dest_addr;
545
546         if(evt.getType() != Event.MSG) { // unless it is a message handle it and respond
547
handleDownEvent(evt);
548             return;
549         }
550
551         msg=(Message)evt.getArg();
552
553         if(udp_hdr != null && udp_hdr.channel_name != null) {
554             // added patch by Roland Kurmann (March 20 2003)
555
msg.putHeader(name, udp_hdr);
556         }
557
558         dest_addr=msg.getDest();
559
560         // Because we don't call Protocol.passDown(), we notify the observer directly (e.g. PerfObserver).
561
// This way, we still have performance numbers for UDP
562
if(observer != null)
563             observer.passDown(evt);
564
565         if(dest_addr == null) { // 'null' means send to all group members
566
if(ip_mcast == false) {
567                 //sends a separate UDP message to each address
568
sendMultipleUdpMessages(msg, members);
569                 return;
570             }
571         }
572
573         try {
574             sendUdpMessage(msg); // either unicast (dest != null) or multicast (dest == null)
575
}
576         catch(Exception JavaDoc e) {
577             if(mylog.isErrorEnabled()) mylog.error("exception=" + e + ", msg=" + msg + ", mcast_addr=" + mcast_addr);
578         }
579     }
580
581
582
583
584
585
586     /*--------------------------- End of Protocol interface -------------------------- */
587
588
589     /* ------------------------------ Private Methods -------------------------------- */
590
591
592     void handleMessage(Message msg) {
593
594     }
595
596
597     /**
598      * Processes a packet read from either the multicast or unicast socket. Needs to be synchronized because
599      * mcast or unicast socket reads can be concurrent
600      */

601     void handleIncomingUdpPacket(byte[] data, SocketAddress sender) {
602         ByteArrayInputStream inp_stream;
603         ObjectInputStream inp;
604         Message msg=null;
605         UdpHeader hdr=null;
606         Event evt;
607         Address dst, src;
608
609         try {
610             // skip the first n bytes (default: 4), this is the version info
611
inp_stream=new ByteArrayInputStream(data, VERSION_LENGTH, data.length - VERSION_LENGTH);
612             inp=new ObjectInputStream(inp_stream);
613             msg=new Message();
614             msg.readExternal(inp);
615             dst=msg.getDest();
616             src=msg.getSrc();
617             if(src == null) {
618                 if(mylog.isErrorEnabled()) mylog.error("sender's address is null");
619             }
620             else {
621                 ((LogicalAddress1_4)src).setPrimaryPhysicalAddress(sender);
622             }
623
624             // discard my own multicast loopback copy
625
if((dst == null || dst.isMulticastAddress()) && src != null && local_addr.equals(src)) {
626                 if(mylog.isTraceEnabled())
627                     mylog.trace("discarded own loopback multicast packet");
628
629                 // System.out.println("-- discarded " + msg.getObject());
630

631                 return;
632             }
633
634             evt=new Event(Event.MSG, msg);
635             if(mylog.isTraceEnabled())
636                 mylog.trace("Message is " + msg + ", headers are " + msg.getHeaders());
637
638             /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
639              * This allows e.g. PerfObserver to get the time of reception of a message */

640             if(observer != null)
641                 observer.up(evt, up_queue.size());
642
643             hdr=(UdpHeader)msg.removeHeader(name);
644         } catch(Throwable JavaDoc e) {
645             if(mylog.isErrorEnabled()) mylog.error("exception=" + Util.getStackTrace(e));
646             return;
647         }
648
649         if(hdr != null) {
650
651             /* Discard all messages destined for a channel with a different name */
652             String JavaDoc ch_name=null;
653
654             if(hdr.channel_name != null)
655                 ch_name=hdr.channel_name;
656
657             // Discard if message's group name is not the same as our group name unless the
658
// message is a diagnosis message (special group name DIAG_GROUP)
659
if(ch_name != null && group_name != null && !group_name.equals(ch_name) &&
660                     !ch_name.equals(Util.DIAG_GROUP)) {
661
662                     if(mylog.isWarnEnabled()) mylog.warn("discarded message from different group (" +
663                             ch_name + "). Sender was " + msg.getSrc());
664                 return;
665             }
666         }
667
668         passUp(evt);
669     }
670
671
672     /**
673      * Send a message to the address specified in dest
674      */

675     void sendUdpMessage(Message msg) throws Exception JavaDoc {
676         Address dest, src;
677         ObjectOutputStream out;
678         byte buf[];
679         DatagramPacket packet;
680         Message copy;
681         Event evt; // for loopback messages
682

683         dest=msg.getDest(); // if non-null: unicast, else multicast
684
SRC=msg.getSrc();
685         if(src == null) {
686             src=local_addr_canonical; // no physical addresses present
687
msg.setSrc(src);
688         }
689
690         if(mylog.isTraceEnabled())
691             mylog.trace("sending message to " + msg.getDest() +
692                     " (src=" + msg.getSrc() + "), headers are " + msg.getHeaders());
693
694         // Don't send if destination is local address. Instead, switch dst and src and put in up_queue.
695
// If multicast message, loopback a copy directly to us (but still multicast). Once we receive this,
696
// we will discard our own multicast message
697
if(dest == null || dest.isMulticastAddress() || dest.equals(local_addr)) {
698             copy=msg.copy();
699             copy.removeHeader(name);
700             evt=new Event(Event.MSG, copy);
701
702             /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
703                This allows e.g. PerfObserver to get the time of reception of a message */

704             if(observer != null)
705                 observer.up(evt, up_queue.size());
706             if(mylog.isTraceEnabled()) mylog.trace("looped back local message " + copy);
707
708             // System.out.println("\n-- passing up packet id=" + copy.getObject());
709
passUp(evt);
710             // System.out.println("-- passed up packet id=" + copy.getObject());
711

712             if(dest != null && !dest.isMulticastAddress())
713                 return; // it is a unicast message to myself, no need to put on the network
714
}
715
716         out_stream.reset();
717         out_stream.write(Version.version_id, 0, Version.version_id.length); // write the version
718
out=new ObjectOutputStream(out_stream);
719         msg.writeExternal(out);
720         out.flush(); // needed if out buffers its output to out_stream
721
buf=out_stream.toByteArray();
722         packet=new DatagramPacket(buf, buf.length, mcast_addr);
723
724         //System.out.println("-- sleeping 4 secs");
725
// Thread.sleep(4000);
726

727
728         // System.out.println("\n-- sending packet " + msg.getObject());
729
ct.send(packet);
730         // System.out.println("-- sent " + msg.getObject());
731
}
732
733
734     void sendMultipleUdpMessages(Message msg, Vector dests) {
735         Address dest;
736
737         for(int i=0; i < dests.size(); i++) {
738             dest=(Address)dests.elementAt(i);
739             msg.setDest(dest);
740
741             try {
742                 sendUdpMessage(msg);
743             }
744             catch(Exception JavaDoc e) {
745                 if(mylog.isDebugEnabled()) mylog.debug("exception=" + e);
746             }
747         }
748     }
749
750
751
752
753
754 //
755
// /**
756
// * Workaround for the problem encountered in certains JDKs that a thread listening on a socket
757
// * cannot be interrupted. Therefore we just send a dummy datagram packet so that the thread 'wakes up'
758
// * and realizes it has to terminate. Should be removed when all JDKs support Thread.interrupt() on
759
// * reads. Uses send_sock t send dummy packet, so this socket has to be still alive.
760
// *
761
// * @param dest The destination host. Will be local host if null
762
// * @param port The destination port
763
// */
764
// void sendDummyPacket(InetAddress dest, int port) {
765
// DatagramPacket packet;
766
// byte[] buf={0};
767
//
768
// if(dest == null) {
769
// try {
770
// dest=InetAddress.getLocalHost();
771
// } catch(Exception e) {
772
// }
773
// }
774
//
775
// if(log.isTraceEnabled()) if(log.isInfoEnabled()) log.info("UDP1_4.sendDummyPacket()", "sending packet to " + dest + ":" + port);
776
//
777
// if(ucast_sock == null || dest == null) {
778
// if(log.isWarnEnabled()) log.warn("UDP1_4.sendDummyPacket()", "send_sock was null or dest was null, cannot send dummy packet");
779
// return;
780
// }
781
// packet=new DatagramPacket(buf, buf.length, dest, port);
782
// try {
783
// ucast_sock.send(packet);
784
// } catch(Throwable e) {
785
// if(log.isErrorEnabled()) log.error("UDP1_4.sendDummyPacket()", "exception sending dummy packet to " +
786
// dest + ":" + port + ": " + e);
787
// }
788
// }
789

790
791
792
793
794     void handleDownEvent(Event evt) {
795         switch(evt.getType()) {
796
797             case Event.TMP_VIEW:
798             case Event.VIEW_CHANGE:
799                 synchronized(members) {
800                     members.removeAllElements();
801                     Vector tmpvec=((View)evt.getArg()).getMembers();
802                     for(int i=0; i < tmpvec.size(); i++)
803                         members.addElement(tmpvec.elementAt(i));
804                 }
805                 break;
806
807             case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local)
808
passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
809                 break;
810
811             case Event.CONNECT:
812                 group_name=(String JavaDoc)evt.getArg();
813                 udp_hdr=new UdpHeader(group_name);
814
815                 // removed March 18 2003 (bela), not needed (handled by GMS)
816
// changed July 2 2003 (bela): we discard CONNECT_OK at the GMS level anyway, this might
817
// be needed if we run without GMS though
818
passUp(new Event(Event.CONNECT_OK));
819                 break;
820
821             case Event.DISCONNECT:
822                 passUp(new Event(Event.DISCONNECT_OK));
823                 break;
824
825             case Event.CONFIG:
826                  if(mylog.isInfoEnabled()) mylog.info("received CONFIG event: " + evt.getArg());
827                 handleConfigEvent((HashMap)evt.getArg());
828                 break;
829         }
830     }
831
832
833     void handleConfigEvent(HashMap map) {
834         if(map == null) return;
835         if(map.containsKey("additional_data"))
836             additional_data=(byte[])map.get("additional_data");
837         if(map.containsKey("send_buf_size")) {
838             mcast_send_buf_size=((Integer JavaDoc)map.get("send_buf_size")).intValue();
839             ucast_send_buf_size=mcast_send_buf_size;
840         }
841         if(map.containsKey("recv_buf_size")) {
842             mcast_recv_buf_size=((Integer JavaDoc)map.get("recv_buf_size")).intValue();
843             ucast_recv_buf_size=mcast_recv_buf_size;
844         }
845     }
846
847
848     /** Return the first non-loopback interface */
849     public String JavaDoc determineDefaultBindInterface() throws SocketException {
850         for(Enumeration en=NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) {
851             NetworkInterface ni=(NetworkInterface)en.nextElement();
852             for(Enumeration en2=ni.getInetAddresses(); en2.hasMoreElements();) {
853                 InetAddress bind_addr=(InetAddress)en2.nextElement();
854                 if(!bind_addr.isLoopbackAddress()) {
855                     return bind_addr.getHostAddress();
856                 }
857             }
858         }
859         return null;
860     }
861
862     public List determineAllBindInterfaces() throws SocketException {
863         List ret=new ArrayList();
864         for(Enumeration en=NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) {
865             NetworkInterface ni=(NetworkInterface)en.nextElement();
866             for(Enumeration en2=ni.getInetAddresses(); en2.hasMoreElements();) {
867                 InetAddress bind_addr=(InetAddress)en2.nextElement();
868                 ret.add(bind_addr.getHostAddress());
869             }
870         }
871
872         return ret;
873     }
874
875     /* ----------------------------- End of Private Methods ---------------------------------------- */
876
877
878
879     /* ----------------------------- Inner Classes ---------------------------------------- */
880
881
882     /**
883      * This thread fetches byte buffers from the packet_queue, converts them into messages and passes them up
884      * to the higher layer (done in handleIncomingUdpPacket()).
885      */

886     class PacketHandler implements Runnable JavaDoc {
887         Thread JavaDoc t=null;
888
889         public void run() {
890             byte[] data;
891             SocketAddress sender;
892
893             while(packet_queue != null && packet_handler != null) {
894                 try {
895                     Object JavaDoc[] arr=(Object JavaDoc[])packet_queue.remove();
896                     data=(byte[])arr[0];
897                     sender=(SocketAddress)arr[1];
898                 } catch(QueueClosedException closed_ex) {
899                      if(mylog.isInfoEnabled()) mylog.info("packet_handler thread terminating");
900                     break;
901                 }
902                 handleIncomingUdpPacket(data, sender);
903                 data=null; // let's give the poor garbage collector a hand...
904
}
905         }
906
907         void start() {
908             if(t == null) {
909                 t=new Thread JavaDoc(this, "UDP1_4.PacketHandler thread");
910                 t.setDaemon(true);
911                 t.start();
912             }
913         }
914
915         void stop() {
916             if(packet_queue != null)
917                 packet_queue.close(false); // should terminate the packet_handler thread too
918
t=null;
919             packet_queue=null;
920         }
921     }
922
923
924
925
926     /**
927      * Manages a multicast and unicast socket on a given interface (NIC). The multicast socket is used
928      * to listen for incoming multicast packets, the unicast socket is used to (1) listen for incoming
929      * unicast packets, (2) to send unicast packets and (3) to send multicast packets
930      */

931     public static class Connector implements Runnable JavaDoc {
932
933         protected Thread JavaDoc t=null;
934
935         protected SenderThread sender_thread=null;
936
937         /** Interface on which ucast_sock and mcast_sender_sock are created */
938         NetworkInterface bind_interface;
939
940
941         /** Used for sending/receiving unicast/multicast packets. The reason we have to use a MulticastSocket versus a
942          * DatagramSocket is that only MulticastSockets allow to set the interface over which a multicast
943          * is sent: DatagramSockets consult the routing table to find the interface
944          */

945         MulticastSocket mcast_sock=null;
946
947         /** Local port of the mcast_sock */
948         SocketAddress local_addr=null;
949
950         /** The receiver which handles incoming packets */
951         Receiver receiver=null;
952
953         /** Buffer for incoming unicast packets */
954         protected byte[] receive_buffer=null;
955
956
957         Queue send_queue=new Queue();
958
959
960         class SenderThread extends Thread JavaDoc {
961
962
963             public void run() {
964                 Object JavaDoc[] arr;
965                 byte[] buf;
966                 SocketAddress dest;
967
968                 while(send_queue != null) {
969                     try {
970                         arr=(Object JavaDoc[])send_queue.remove();
971                         buf=(byte[])arr[0];
972                         dest=(SocketAddress)arr[1];
973                         mcast_sock.send(new DatagramPacket(buf, buf.length, dest));
974                     }
975                     catch(QueueClosedException e) {
976                         break;
977                     }
978                     catch(SocketException e) {
979                         e.printStackTrace();
980                     }
981                     catch(IOException e) {
982                         e.printStackTrace();
983                     }
984
985                 }
986             }
987         }
988
989
990
991         public Connector(NetworkInterface bind_interface, int local_bind_port,
992                          int port_range, int receive_buffer_size,
993                          int receive_sock_buf_size, int send_sock_buf_size,
994                          int ip_ttl, Receiver receiver) throws IOException {
995             this.bind_interface=bind_interface;
996             this.receiver=receiver;
997             this.receive_buffer=new byte[receive_buffer_size];
998
999             mcast_sock=createMulticastSocket(local_bind_port, port_range);
1000
1001            // changed Bela Dec 31 2003: if loopback is disabled other members on the same machine won't be able
1002
// to receive our multicasts
1003
// mcast_sock.setLoopbackMode(true); // we don't want to receive our own multicasts
1004
mcast_sock.setReceiveBufferSize(receive_sock_buf_size);
1005            mcast_sock.setSendBufferSize(send_sock_buf_size);
1006            mcast_sock.setTimeToLive(ip_ttl);
1007            System.out.println("ttl=" + mcast_sock.getTimeToLive());
1008            mcast_sock.setNetworkInterface(this.bind_interface); // for outgoing multicasts
1009
local_addr=mcast_sock.getLocalSocketAddress();
1010            System.out.println("-- local_addr=" + local_addr);
1011            System.out.println("-- mcast_sock: send_bufsize=" + mcast_sock.getSendBufferSize() +
1012                    ", recv_bufsize=" + mcast_sock.getReceiveBufferSize());
1013        }
1014
1015
1016        public SocketAddress getLocalAddress() {
1017            return local_addr;
1018        }
1019
1020        public NetworkInterface getBindInterface() {
1021            return bind_interface;
1022        }
1023
1024        public void start() throws Exception JavaDoc {
1025            if(mcast_sock == null)
1026                throw new Exception JavaDoc("UDP1_4.Connector.start(): connector has been stopped (start() cannot be called)");
1027
1028            if(t != null && t.isAlive()) {
1029                if(mylog.isWarnEnabled()) mylog.warn("connector thread is already running");
1030                return;
1031            }
1032            t=new Thread JavaDoc(this, "ConnectorThread for " + local_addr);
1033            t.setDaemon(true);
1034            t.start();
1035
1036            sender_thread=new SenderThread();
1037            sender_thread.start();
1038        }
1039
1040        /** Stops the connector. After this call, start() cannot be called, but a new connector has to
1041         * be created
1042         */

1043        public void stop() {
1044            if(mcast_sock != null)
1045                mcast_sock.close(); // terminates the thread if running
1046
t=null;
1047            mcast_sock=null;
1048        }
1049
1050
1051
1052        /** Sends a message using mcast_sock */
1053        public void send(DatagramPacket packet) throws Exception JavaDoc {
1054            //mcast_sock.send(packet);
1055

1056            byte[] buf=(byte[])packet.getData().clone();
1057            Object JavaDoc[] arr=new Object JavaDoc[]{buf, packet.getSocketAddress()};
1058            send_queue.add(arr);
1059        }
1060
1061        public void run() {
1062            DatagramPacket packet=new DatagramPacket(receive_buffer, receive_buffer.length);
1063            while(t != null) {
1064                try {
1065                    packet.setData(receive_buffer, 0, receive_buffer.length);
1066                    ConnectorTable.receivePacket(packet, mcast_sock, receiver);
1067                }
1068                catch(Throwable JavaDoc t) {
1069                    if(t == null || mcast_sock == null || mcast_sock.isClosed())
1070                        break;
1071                    if(mylog.isErrorEnabled()) mylog.error("[" + local_addr + "] exception=" + t);
1072                    Util.sleep(300); // so we don't get into 100% cpu spinning (should NEVER happen !)
1073
}
1074            }
1075            t=null;
1076        }
1077
1078
1079
1080
1081        public String JavaDoc toString() {
1082            StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
1083            sb.append("local_addr=").append(local_addr).append(", mcast_group=");
1084            return sb.toString();
1085        }
1086
1087
1088
1089
1090
1091        // 27-6-2003 bgooren, find available port in range (start_port, start_port+port_range)
1092
private MulticastSocket createMulticastSocket(int local_bind_port, int port_range) throws IOException {
1093            MulticastSocket sock=null;
1094            int tmp_port=local_bind_port;
1095
1096            int max_port=tmp_port + port_range;
1097            while(tmp_port <= max_port) {
1098                try {
1099                    sock=new MulticastSocket(tmp_port);
1100                    break;
1101                }
1102                catch(Exception JavaDoc bind_ex) {
1103                    tmp_port++;
1104                }
1105            }
1106            if(sock == null)
1107                throw new IOException("could not create a MulticastSocket (port range: " + local_bind_port +
1108                        " - " + (local_bind_port+port_range));
1109            return sock;
1110        }
1111    }
1112
1113
1114
1115
1116
1117    /** Manages a bunch of Connectors */
1118    public static class ConnectorTable implements Receiver, Runnable JavaDoc {
1119
1120        Thread JavaDoc t=null;
1121
1122        /** Socket to receive multicast packets. Will be bound to n interfaces */
1123        MulticastSocket mcast_sock=null;
1124
1125        /** The multicast address which mcast_sock will join (e.g. 230.1.2.3:7500) */
1126        InetSocketAddress mcast_addr=null;
1127
1128        Receiver receiver=null;
1129
1130        /** Buffer for incoming packets */
1131        byte[] receive_buffer=null;
1132
1133        /** Vector<Connector>. A list of Connectors, one for each interface we listen on */
1134        Vector connectors=new Vector();
1135
1136        boolean running=false;
1137
1138
1139
1140
1141
1142        public ConnectorTable(InetSocketAddress mcast_addr,
1143                              int receive_buffer_size, int receive_sock_buf_size,
1144                              boolean ip_mcast, Receiver receiver) throws IOException {
1145            this.receiver=receiver;
1146            this.mcast_addr=mcast_addr;
1147            this.receive_buffer=new byte[receive_buffer_size];
1148
1149            if(ip_mcast) {
1150                mcast_sock=new MulticastSocket(mcast_addr.getPort());
1151                // changed Bela Dec 31 2003: if loopback is disabled other members on the same machine won't be able
1152
// to receive our multicasts
1153
// mcast_sock.setLoopbackMode(true); // do not get own multicasts
1154
mcast_sock.setReceiveBufferSize(receive_sock_buf_size);
1155            }
1156        }
1157
1158
1159        public Receiver getReceiver() {
1160            return receiver;
1161        }
1162
1163        public void setReceiver(Receiver receiver) {
1164            this.receiver=receiver;
1165        }
1166
1167
1168        /** Get all interfaces, create one Connector per interface and call start() on it */
1169        public void start() throws Exception JavaDoc {
1170            Connector tmp;
1171            if(running)
1172                return;
1173
1174            if(mcast_sock != null) {
1175                // Start the thread servicing the incoming multicasts
1176
t=new Thread JavaDoc(this, "ConnectorTable thread");
1177                t.setDaemon(true);
1178                t.start();
1179            }
1180
1181
1182            // Start all Connectors
1183
for(Iterator it=connectors.iterator(); it.hasNext();) {
1184                tmp=(Connector)it.next();
1185                tmp.start();
1186            }
1187
1188            running=true;
1189        }
1190
1191
1192        public void stop() {
1193            Connector tmp;
1194            for(Iterator it=connectors.iterator(); it.hasNext();) {
1195                tmp=(Connector)it.next();
1196                tmp.stop();
1197            }
1198            connectors.clear();
1199            t=null;
1200            if(mcast_sock != null) {
1201                mcast_sock.close();
1202                mcast_sock=null;
1203            }
1204            running=false;
1205        }
1206
1207
1208        public void run() {
1209            // receive mcast packets on any interface of the list of interfaces we're listening on
1210
DatagramPacket p=new DatagramPacket(receive_buffer, receive_buffer.length);
1211            while(t != null && mcast_sock != null && !mcast_sock.isClosed()) {
1212                p.setData(receive_buffer, 0, receive_buffer.length);
1213                try {
1214                    receivePacket(p, mcast_sock, this);
1215                }
1216                catch(Throwable JavaDoc t) {
1217                    if(t == null || mcast_sock == null || mcast_sock.isClosed())
1218                        break;
1219                    if(mylog.isErrorEnabled()) mylog.error("exception=" + t);
1220                    Util.sleep(300); // so we don't get into 100% cpu spinning (should NEVER happen !)
1221
}
1222            }
1223            t=null;
1224        }
1225
1226
1227        /**
1228         * Returns a list of local addresses (one for each Connector)
1229         * @return List<SocketAddress>
1230         */

1231        public List getConnectorAddresses() {
1232            Connector c;
1233            ArrayList ret=new ArrayList();
1234            for(Iterator it=connectors.iterator(); it.hasNext();) {
1235                c=(Connector)it.next();
1236                ret.add(c.getLocalAddress());
1237            }
1238            return ret;
1239        }
1240
1241        /** Sends a packet. If the destination is a multicast address, call send() on all connectors.
1242         * If destination is not null, send the message using <em>any</em> Connector: if we send a unicast
1243         * message, it doesn't matter to which interface we are bound; the kernel will choose the correct
1244         * interface based on the destination and the routing table. Note that the receiver will have the
1245         * interface which was chosen by the kernel to send the message as the receiver's address, so the
1246         * correct Connector will receive a possible response.
1247         * @param msg
1248         * @throws Exception
1249         */

1250        public void send(DatagramPacket msg) throws Exception JavaDoc {
1251            InetAddress dest;
1252
1253            if(msg == null)
1254                return;
1255            dest=msg.getAddress();
1256            if(dest == null)
1257                throw new IOException("UDP1_4.ConnectorTable.send(): destination address is null");
1258
1259            if(dest.isMulticastAddress()) {
1260                // send to all Connectors
1261
for(int i=0; i < connectors.size(); i++) {
1262                    ((Connector)connectors.get(i)).send(msg);
1263                }
1264            }
1265            else {
1266                // send to a random connector
1267
Connector c=pickRandomConnector(connectors);
1268                c.send(msg);
1269            }
1270        }
1271
1272        private Connector pickRandomConnector(Vector conns) {
1273            int size=conns.size();
1274            int index=((int)(Util.random(size))) -1;
1275            return (Connector)conns.get(index);
1276        }
1277
1278        /**
1279         * Adds the given interface address to the list of interfaces on which the receiver mcast
1280         * socket has to listen.
1281         * Also creates a new Connector. Calling this method twice on the same interface will throw an exception
1282         * @param bind_interface
1283         * @param local_port
1284         * @param port_range
1285         * @param receive_buffer_size
1286         * @throws IOException
1287         */

1288        public void listenOn(String JavaDoc bind_interface, int local_port, int port_range,
1289                             int receive_buffer_size, int receiver_sock_buf_size, int send_sock_buf_size,
1290                             int ip_ttl, Receiver receiver) throws IOException {
1291            if(bind_interface == null)
1292                return;
1293
1294            NetworkInterface ni=NetworkInterface.getByInetAddress(InetAddress.getByName(bind_interface));
1295            if(ni == null)
1296                throw new IOException("UDP1_4.ConnectorTable.listenOn(): bind interface for " +
1297                        bind_interface + " not found");
1298
1299            Connector tmp=findConnector(ni);
1300            if(tmp != null) {
1301                if(mylog.isWarnEnabled()) mylog.warn("connector for interface " + bind_interface +
1302                        " is already present (will be skipped): " + tmp);
1303                return;
1304            }
1305
1306            // 1. join the group on this interface
1307
if(mcast_sock != null) {
1308                mcast_sock.joinGroup(mcast_addr, ni);
1309
1310                    if(mylog.isInfoEnabled()) mylog.info("joining " + mcast_addr + " on interface " + ni);
1311            }
1312
1313            // 2. create a new Connector
1314
tmp=new Connector(ni, local_port, port_range, receive_buffer_size, receiver_sock_buf_size,
1315                    send_sock_buf_size, ip_ttl, receiver);
1316            connectors.add(tmp);
1317        }
1318
1319        private Connector findConnector(NetworkInterface ni) {
1320            for(int i=0; i < connectors.size(); i++) {
1321                Connector c=(Connector)connectors.elementAt(i);
1322                if(c.getBindInterface().equals(ni))
1323                    return c;
1324            }
1325            return null;
1326        }
1327
1328
1329        public void receive(DatagramPacket packet) {
1330            if(receiver != null) {
1331                receiver.receive(packet);
1332            }
1333        }
1334
1335
1336        public String JavaDoc toString() {
1337            StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
1338            sb.append("*** todo: implement ***");
1339            return sb.toString();
1340        }
1341
1342
1343        public static void receivePacket(DatagramPacket packet, DatagramSocket sock, Receiver receiver) throws IOException {
1344            int len;
1345
1346            sock.receive(packet);
1347            len=packet.getLength();
1348            if(len == 1 && packet.getData()[0] == 0) {
1349                if(mylog.isTraceEnabled()) mylog.trace("received dummy packet");
1350                return;
1351            }
1352            if(receiver != null)
1353                receiver.receive(packet);
1354        }
1355    }
1356
1357
1358
1359
1360
1361
1362    public static class MyReceiver implements Receiver {
1363        ConnectorTable t=null;
1364
1365        public MyReceiver() {
1366
1367        }
1368
1369        public void setConnectorTable(ConnectorTable t) {
1370            this.t=t;
1371        }
1372
1373        public void receive(DatagramPacket packet) {
1374            System.out.println("-- received " + packet.getLength() + " bytes from " + packet.getSocketAddress());
1375            InetAddress sender=packet.getAddress();
1376            byte[] buf=packet.getData();
1377            int len=packet.getLength();
1378            String JavaDoc tmp=new String JavaDoc(buf, 0, len);
1379            if(len > 4) {
1380                if(tmp.startsWith("rsp:")) {
1381                    System.out.println("-- received respose: \"" + tmp + '\"');
1382                    return;
1383                }
1384            }
1385
1386            byte[] rsp_buf=("rsp: this is a response to " + tmp).getBytes();
1387            DatagramPacket response=new DatagramPacket(rsp_buf, rsp_buf.length, sender, packet.getPort());
1388
1389            try {
1390                t.send(response);
1391            }
1392            catch(Exception JavaDoc e) {
1393                e.printStackTrace();
1394                System.err.println("MyReceiver: problem sending response to " + sender);
1395            }
1396        }
1397    }
1398
1399
1400
1401    public static class MulticastReceiver implements Runnable JavaDoc {
1402        Unmarshaller m=null;
1403        DatagramSocket sock=null; // may be DatagramSocket or MulticastSocket
1404

1405        public void run() {
1406            // receives packet from socket
1407
// calls Unmarshaller.receive()
1408
}
1409
1410    }
1411
1412    public static class Unmarshaller {
1413        Queue q=null;
1414
1415        void receive(byte[] data, SocketAddress sender) {
1416            // if (q) --> q.add()
1417
// unserialize and call handleMessage()
1418
}
1419    }
1420
1421
1422
1423    public static class Mailman {
1424        
1425    }
1426
1427
1428    static void help() {
1429        System.out.println("UDP1_4 [-help] [-bind_addrs <list of interfaces>]");
1430    }
1431
1432
1433
1434    public static void main(String JavaDoc[] args) {
1435        MyReceiver r=new MyReceiver();
1436        ConnectorTable ct;
1437        String JavaDoc line;
1438        InetSocketAddress mcast_addr;
1439        BufferedReader in=null;
1440        DatagramPacket packet;
1441        byte[] send_buf;
1442        int receive_buffer_size=65000;
1443        boolean ip_mcast=true;
1444
1445        try {
1446            mcast_addr=new InetSocketAddress("230.1.2.3", 7500);
1447            ct=new ConnectorTable(mcast_addr, receive_buffer_size, 120000, ip_mcast, r);
1448            r.setConnectorTable(ct);
1449        }
1450        catch(Throwable JavaDoc t) {
1451            t.printStackTrace();
1452            return;
1453        }
1454
1455        for(int i=0; i < args.length; i++) {
1456            if("-help".equals(args[i])) {
1457                help();
1458                continue;
1459            }
1460            if("-bind_addrs".equals(args[i])) {
1461                while(++i < args.length && !args[i].trim().startsWith("-")) {
1462                    try {
1463                        ct.listenOn(args[i], 0, 1, receive_buffer_size, 120000, 12000, 32, r);
1464                    }
1465                    catch(IOException e) {
1466                        e.printStackTrace();
1467                        return;
1468                    }
1469                }
1470            }
1471        }
1472
1473
1474        try {
1475            ct.start(); // starts all Connectors in turn
1476
in=new BufferedReader(new InputStreamReader(System.in));
1477            while(true) {
1478                System.out.print("> "); System.out.flush();
1479                line=in.readLine();
1480                if(line.startsWith("quit") || line.startsWith("exit"))
1481                    break;
1482                send_buf=line.getBytes();
1483                packet=new DatagramPacket(send_buf, send_buf.length, mcast_addr);
1484                ct.send(packet);
1485            }
1486        }
1487        catch(Exception JavaDoc e) {
1488            e.printStackTrace();
1489        }
1490        finally {
1491            if(ct != null)
1492                ct.stop();
1493        }
1494    }
1495
1496
1497
1498
1499}
1500
1501
1502interface Receiver {
1503
1504    /** Called when data has been received on a socket. When the callback returns, the buffer will be
1505     * reused: therefore, if <code>buf</code> must be processed on a separate thread, it needs to be copied.
1506     * This method might be called concurrently by multiple threads, so it has to be reentrant
1507     * @param packet
1508     */

1509    void receive(DatagramPacket packet);
1510}
1511
Popular Tags