KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > TCP


1 // $Id: TCP.java,v 1.21 2005/04/20 20:25:47 belaban Exp $
2

3 package org.jgroups.protocols;
4
5
6 import org.jgroups.Address;
7 import org.jgroups.Event;
8 import org.jgroups.Message;
9 import org.jgroups.View;
10 import org.jgroups.blocks.ConnectionTable;
11 import org.jgroups.stack.IpAddress;
12 import org.jgroups.stack.Protocol;
13 import org.jgroups.util.BoundedList;
14 import org.jgroups.util.Util;
15
16 import java.net.InetAddress JavaDoc;
17 import java.net.SocketException JavaDoc;
18 import java.net.UnknownHostException JavaDoc;
19 import java.util.HashMap JavaDoc;
20 import java.util.Properties JavaDoc;
21 import java.util.Vector JavaDoc;
22
23
24
25
26 /**
27  * TCP based protocol. Creates a server socket, which gives us the local address of this group member. For
28  * each accept() on the server socket, a new thread is created that listens on the socket.
29  * For each outgoing message m, if m.dest is in the ougoing hashtable, the associated socket will be reused
30  * to send message, otherwise a new socket is created and put in the hashtable.
31  * When a socket connection breaks or a member is removed from the group, the corresponding items in the
32  * incoming and outgoing hashtables will be removed as well.<br>
33  * This functionality is in ConnectionTable, which isT used by TCP. TCP sends messages using ct.send() and
34  * registers with the connection table to receive all incoming messages.
35  * @author Bela Ban
36  */

37 public class TCP extends Protocol implements ConnectionTable.Receiver {
38     private ConnectionTable ct=null;
39     protected Address local_addr=null;
40     private String JavaDoc group_addr=null;
41     private InetAddress JavaDoc bind_addr=null; // local IP address to bind srv sock to (m-homed systems)
42
private InetAddress JavaDoc external_addr=null; // the IP address which is broadcast to other group members
43
private int start_port=7800; // find first available port starting at this port
44
private int end_port=0; // maximum port to bind to
45
private final Vector JavaDoc members=new Vector JavaDoc(11);
46     private long reaper_interval=0; // time in msecs between connection reaps
47
private long conn_expire_time=0; // max time a conn can be idle before being reaped
48
boolean loopback=false; // loops back msgs to self if true
49

50     /** If set it will be added to <tt>local_addr</tt>. Used to implement
51      * for example transport independent addresses */

52     byte[] additional_data=null;
53
54     /** List the maintains the currently suspected members. This is used so we don't send too many SUSPECT
55      * events up the stack (one per message !)
56      */

57     final BoundedList suspected_mbrs=new BoundedList(20);
58
59     /** Should we drop unicast messages to suspected members or not */
60     boolean skip_suspected_members=true;
61
62     int recv_buf_size=150000;
63     int send_buf_size=150000;
64     int sock_conn_timeout=2000; // max time in millis for a socket creation in ConnectionTable
65

66     static final String JavaDoc name="TCP";
67     static final String JavaDoc IGNORE_BIND_ADDRESS_PROPERTY="ignore.bind.address";
68
69
70     public TCP() {
71     }
72
73     public String JavaDoc toString() {
74         return "Protocol TCP(local address: " + local_addr + ')';
75     }
76
77     public String JavaDoc getName() {
78         return "TCP";
79     }
80
81
82     protected final Vector JavaDoc getMembers() {
83         return members;
84     }
85
86     /**
87      DON'T REMOVE ! This prevents the up-handler thread to be created, which essentially is superfluous:
88      messages are received from the network rather than from a layer below.
89      */

90     public void startUpHandler() {
91         ;
92     }
93
94
95     public void start() throws Exception JavaDoc {
96         ct=getConnectionTable(reaper_interval,conn_expire_time,bind_addr,external_addr,start_port,end_port);
97         // ct.addConnectionListener(this);
98
ct.setReceiveBufferSize(recv_buf_size);
99         ct.setSendBufferSize(send_buf_size);
100         ct.setSocketConnectionTimeout(sock_conn_timeout);
101         local_addr=ct.getLocalAddress();
102         if(additional_data != null && local_addr instanceof IpAddress)
103             ((IpAddress)local_addr).setAdditionalData(additional_data);
104         passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
105     }
106
107    /**
108     * @param reaperInterval
109     * @param connExpireTime
110     * @param bindAddress
111     * @param startPort
112     * @throws Exception
113     * @return ConnectionTable
114     * Sub classes overrides this method to initialize a different version of
115     * ConnectionTable.
116     */

117    protected ConnectionTable getConnectionTable(long reaperInterval, long connExpireTime, InetAddress JavaDoc bindAddress,
118                                                 InetAddress JavaDoc externalAddress, int startPort, int endPort) throws Exception JavaDoc {
119        ConnectionTable cTable=null;
120        if(reaperInterval == 0 && connExpireTime == 0) {
121            cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort);
122        }
123        else {
124            if(reaperInterval == 0) {
125                reaperInterval=5000;
126                if(log.isWarnEnabled()) log.warn("reaper_interval was 0, set it to " + reaperInterval);
127            }
128            if(connExpireTime == 0) {
129                connExpireTime=1000 * 60 * 5;
130                if(log.isWarnEnabled()) log.warn("conn_expire_time was 0, set it to " + connExpireTime);
131            }
132            cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort,
133                                       reaperInterval, connExpireTime);
134        }
135        return cTable;
136    }
137
138     public void stop() {
139         ct.stop();
140     }
141
142
143     /**
144      Sent to destination(s) using the ConnectionTable class.
145      */

146     public void down(Event evt) {
147         Message msg;
148         Object JavaDoc dest_addr;
149
150         if(evt.getType() != Event.MSG) {
151             handleDownEvent(evt);
152             return;
153         }
154
155         msg=(Message)evt.getArg();
156
157         if(group_addr != null) { // added patch sent by Roland Kurmann (bela March 20 2003)
158
/* Add header (includes channel name) */
159             msg.putHeader(name, new TcpHeader(group_addr));
160         }
161
162         dest_addr=msg.getDest();
163
164
165         /* Because we don't call Protocol.passDown(), we notify the observer directly (e.g. PerfObserver). This way,
166            we still have performance numbers for TCP */

167         if(observer != null)
168             observer.passDown(evt);
169
170         if(dest_addr == null) { // broadcast (to all members)
171
if(group_addr == null) {
172                 if(log.isWarnEnabled()) log.warn("dest address of message is null, and " +
173                                          "sending to default address fails as group_addr is null, too !" +
174                                          " Discarding message.");
175                 return;
176             }
177             else {
178                 sendMulticastMessage(msg); // send to current membership
179
}
180         }
181         else {
182             sendUnicastMessage(msg); // send to a single member
183
}
184     }
185
186
187     /** ConnectionTable.Receiver interface */
188     public void receive(Message msg) {
189         TcpHeader hdr=null;
190         Event evt=new Event(Event.MSG, msg);
191
192
193         /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
194            This allows e.g. PerfObserver to get the time of reception of a message */

195         if(observer != null)
196             observer.up(evt, up_queue.size());
197
198         if(log.isTraceEnabled()) log.trace("received msg " + msg);
199
200         hdr=(TcpHeader)msg.removeHeader(name);
201
202         if(hdr != null) {
203             /* Discard all messages destined for a channel with a different name */
204             String JavaDoc ch_name=null;
205
206             if(hdr.group_addr != null)
207                 ch_name=hdr.group_addr;
208
209             // below lines were commented as patch sent by Roland Kurmann (bela March 20 2003)
210

211 // if(group_addr == null) {
212
// if(log.isWarnEnabled()) log.warn("TCP.receive()", "group address in header was null, discarded");
213
// return;
214
// }
215

216             // Discard if message's group name is not the same as our group name unless the
217
// message is a diagnosis message (special group name DIAG_GROUP)
218
if(ch_name != null && !group_addr.equals(ch_name) &&
219                     !ch_name.equals(Util.DIAG_GROUP)) {
220                 if(log.isWarnEnabled()) log.warn("discarded message from different group (" +
221                                             ch_name + "). Sender was " + msg.getSrc());
222                 return;
223             }
224         }
225
226         passUp(evt);
227     }
228
229
230     // ConnectionTable.ConnectionListener interface
231
// public void connectionOpened(Address peer_addr) {
232
// if(log.isTraceEnabled()) log.trace("opened connection to " + peer_addr);
233
// }
234
//
235
// public void connectionClosed(Address peer_addr) {
236
// if(peer_addr != null)
237
// if(log.isTraceEnabled()) log.trace("closed connection to " + peer_addr);
238
// }
239

240
241     /** Setup the Protocol instance acording to the configuration string */
242     public boolean setProperties(Properties JavaDoc props) {
243         String JavaDoc str, tmp=null;
244
245         super.setProperties(props);
246         str=props.getProperty("start_port");
247         if(str != null) {
248             start_port=Integer.parseInt(str);
249             props.remove("start_port");
250         }
251     
252         str=props.getProperty("end_port");
253         if(str != null) {
254             end_port=Integer.parseInt(str);
255             props.remove("end_port");
256         }
257
258         // PropertyPermission not granted if running in an untrusted environment with JNLP.
259
try {
260             tmp=System.getProperty("bind.address"); // set by JBoss
261
if(Boolean.getBoolean(IGNORE_BIND_ADDRESS_PROPERTY)) {
262                 tmp=null;
263             }
264         }
265         catch (SecurityException JavaDoc ex){
266         }
267
268         if(tmp != null)
269             str=tmp;
270         else
271             str=props.getProperty("bind_addr");
272         if(str != null) {
273             try {
274                 bind_addr=InetAddress.getByName(str);
275             }
276             catch(UnknownHostException JavaDoc unknown) {
277                 if(log.isFatalEnabled()) log.fatal("(bind_addr): host " + str + " not known");
278                 return false;
279             }
280             props.remove("bind_addr");
281         }
282
283         str=props.getProperty("external_addr");
284         if(str != null) {
285             try {
286                 external_addr=InetAddress.getByName(str);
287             }
288             catch(UnknownHostException JavaDoc unknown) {
289                 if(log.isFatalEnabled()) log.fatal("(external_addr): host " + str + " not known");
290                 return false;
291             }
292             props.remove("external_addr");
293         }
294
295         str=props.getProperty("reaper_interval");
296         if(str != null) {
297             reaper_interval=Long.parseLong(str);
298             props.remove("reaper_interval");
299         }
300
301         str=props.getProperty("conn_expire_time");
302         if(str != null) {
303             conn_expire_time=Long.parseLong(str);
304             props.remove("conn_expire_time");
305         }
306
307         str=props.getProperty("sock_conn_timeout");
308         if(str != null) {
309             sock_conn_timeout=Integer.parseInt(str);
310             props.remove("sock_conn_timeout");
311         }
312
313         str=props.getProperty("recv_buf_size");
314         if(str != null) {
315             recv_buf_size=Integer.parseInt(str);
316             props.remove("recv_buf_size");
317         }
318
319         str=props.getProperty("send_buf_size");
320         if(str != null) {
321             send_buf_size=Integer.parseInt(str);
322             props.remove("send_buf_size");
323         }
324
325         str=props.getProperty("loopback");
326         if(str != null) {
327             loopback=Boolean.valueOf(str).booleanValue();
328             props.remove("loopback");
329         }
330
331         str=props.getProperty("skip_suspected_members");
332         if(str != null) {
333             skip_suspected_members=Boolean.valueOf(str).booleanValue();
334             props.remove("skip_suspected_members");
335         }
336
337         if(props.size() > 0) {
338             System.err.println("TCP.setProperties(): the following properties are not recognized:");
339             props.list(System.out);
340             return false;
341         }
342         return true;
343     }
344
345
346     /**
347      If the sender is null, set our own address. We cannot just go ahead and set the address
348      anyway, as we might be sending a message on behalf of someone else ! E.g. in case of
349      retransmission, when the original sender has crashed, or in a FLUSH protocol when we
350      have to return all unstable messages with the FLUSH_OK response.
351      */

352     private void setSourceAddress(Message msg) {
353         if(msg.getSrc() == null)
354             msg.setSrc(local_addr);
355     }
356
357
358     /** Send a message to the address specified in msg.dest */
359     private void sendUnicastMessage(Message msg) {
360         IpAddress dest;
361         Message copy;
362         Object JavaDoc hdr;
363         Event evt;
364
365         dest=(IpAddress)msg.getDest(); // guaranteed not to be null
366
if(!(dest instanceof IpAddress)) {
367             if(log.isErrorEnabled()) log.error("destination address is not of type IpAddress !");
368             return;
369         }
370         setSourceAddress(msg);
371
372         /* Don't send if destination is local address. Instead, switch dst and src and put in up_queue */
373         if(loopback && local_addr != null && dest != null && dest.equals(local_addr)) {
374             copy=msg.copy();
375             hdr=copy.getHeader(name);
376             if(hdr != null && hdr instanceof TcpHeader)
377                 copy.removeHeader(name);
378             copy.setSrc(local_addr);
379             copy.setDest(local_addr);
380
381             evt=new Event(Event.MSG, copy);
382
383             /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
384                This allows e.g. PerfObserver to get the time of reception of a message */

385             if(observer != null)
386                 observer.up(evt, up_queue.size());
387
388             passUp(evt);
389             return;
390         }
391         if(log.isTraceEnabled()) log.trace("dest=" + msg.getDest() + ", hdrs:\n" + msg.printObjectHeaders());
392         try {
393             if(skip_suspected_members) {
394                 if(suspected_mbrs.contains(dest)) {
395                     if(log.isTraceEnabled()) log.trace("will not send unicast message to " + dest +
396                                                        " as it is currently suspected");
397                     return;
398                 }
399             }
400             ct.send(msg);
401         }
402         catch(SocketException JavaDoc e) {
403             if(members.contains(dest)) {
404                 if(!suspected_mbrs.contains(dest)) {
405                     suspected_mbrs.add(dest);
406                     passUp(new Event(Event.SUSPECT, dest));
407                 }
408             }
409         }
410     }
411
412
413     protected void sendMulticastMessage(Message msg) {
414         Address dest;
415         Vector JavaDoc mbrs=(Vector JavaDoc)members.clone();
416         for(int i=0; i < mbrs.size(); i++) {
417             dest=(Address)mbrs.elementAt(i);
418             msg.setDest(dest);
419             sendUnicastMessage(msg);
420         }
421     }
422
423
424     protected void handleDownEvent(Event evt) {
425         switch(evt.getType()) {
426
427             case Event.TMP_VIEW:
428             case Event.VIEW_CHANGE:
429                 suspected_mbrs.removeAll();
430                 synchronized(members) {
431                     members.clear();
432                     members.addAll(((View)evt.getArg()).getMembers());
433                 }
434                 break;
435
436             case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local)
437
passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
438                 break;
439
440             case Event.CONNECT:
441                 group_addr=(String JavaDoc)evt.getArg();
442
443                 // removed March 18 2003 (bela), not needed (handled by GMS)
444
// Can't remove it; otherwise TCPGOSSIP breaks (bela May 8 2003) !
445
passUp(new Event(Event.CONNECT_OK));
446                 break;
447
448             case Event.DISCONNECT:
449                 passUp(new Event(Event.DISCONNECT_OK));
450                 break;
451
452             case Event.CONFIG:
453             if(log.isTraceEnabled()) log.trace("received CONFIG event: " + evt.getArg());
454                 handleConfigEvent((HashMap JavaDoc)evt.getArg());
455                 break;
456         }
457     }
458
459
460     void handleConfigEvent(HashMap JavaDoc map) {
461         if(map == null) return;
462         if(map.containsKey("additional_data"))
463             additional_data=(byte[])map.get("additional_data");
464     }
465
466
467 }
468
Popular Tags