KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > BasicTCP


1 package org.jgroups.protocols;
2
3 import org.jgroups.Address;
4 import org.jgroups.Event;
5 import org.jgroups.Message;
6 import org.jgroups.View;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.util.BoundedList;
9 import org.jgroups.util.Util;
10
11 import java.net.InetAddress JavaDoc;
12 import java.net.UnknownHostException JavaDoc;
13 import java.util.Collection JavaDoc;
14 import java.util.Properties JavaDoc;
15 import java.util.Vector JavaDoc;
16
17 /**
18  * Shared base class for tcpip protocols
19  * @author Scott Marlow
20  */

21 public abstract class BasicTCP extends TP {
22
23     /** Should we drop unicast messages to suspected members or not */
24     boolean skip_suspected_members=true;
25
26     /** When we cannot send a message to P (on an exception), then we send a SUSPECT message up the stack */
27     boolean suspect_on_send_failure=false;
28
29
30     /** List the maintains the currently suspected members. This is used so we don't send too many SUSPECT
31      * events up the stack (one per message !)
32      */

33     final BoundedList suspected_mbrs=new BoundedList(20);
34     protected InetAddress JavaDoc external_addr=null; // the IP address which is broadcast to other group members
35
protected int start_port=7800; // find first available port starting at this port
36
protected int end_port=0; // maximum port to bind to
37
protected long reaper_interval=0; // time in msecs between connection reaps
38
protected long conn_expire_time=0; // max time a conn can be idle before being reaped
39
/** Use separate send queues for each connection */
40     boolean use_send_queues=true;
41     int send_queue_size=10000; // max number of messages in a send queue
42
int recv_buf_size=150000;
43     int send_buf_size=150000;
44     int sock_conn_timeout=2000; // max time in millis for a socket creation in ConnectionTable
45
boolean tcp_nodelay=false;
46     int linger=-1; // SO_LINGER (number of ms, -1 disables it)
47

48
49
50     public int getStartPort() {return start_port;}
51     public void setStartPort(int start_port) {this.start_port=start_port;}
52     public int getEndPort() {return end_port;}
53     public void setEndPort(int end_port) {this.end_port=end_port;}
54     public long getReaperInterval() {return reaper_interval;}
55     public void setReaperInterval(long reaper_interval) {this.reaper_interval=reaper_interval;}
56     public long getConnExpireTime() {return conn_expire_time;}
57     public void setConnExpireTime(long conn_expire_time) {this.conn_expire_time=conn_expire_time;}
58
59     public boolean setProperties(Properties JavaDoc props) {
60         String JavaDoc str;
61
62         super.setProperties(props);
63
64         str=props.getProperty("start_port");
65         if(str != null) {
66             start_port=Integer.parseInt(str);
67             props.remove("start_port");
68         }
69
70         str=props.getProperty("end_port");
71         if(str != null) {
72             end_port=Integer.parseInt(str);
73             props.remove("end_port");
74         }
75
76         str=props.getProperty("external_addr");
77         if(str != null) {
78             try {
79                 external_addr=InetAddress.getByName(str);
80             }
81             catch(UnknownHostException JavaDoc unknown) {
82                 if(log.isFatalEnabled()) log.fatal("(external_addr): host " + str + " not known");
83                 return false;
84             }
85             props.remove("external_addr");
86         }
87
88         str=props.getProperty("reaper_interval");
89         if(str != null) {
90             reaper_interval=Long.parseLong(str);
91             props.remove("reaper_interval");
92         }
93
94         str=props.getProperty("conn_expire_time");
95         if(str != null) {
96             conn_expire_time=Long.parseLong(str);
97             props.remove("conn_expire_time");
98         }
99
100         str=props.getProperty("sock_conn_timeout");
101         if(str != null) {
102             sock_conn_timeout=Integer.parseInt(str);
103             props.remove("sock_conn_timeout");
104         }
105
106         str=props.getProperty("recv_buf_size");
107         if(str != null) {
108             recv_buf_size=Integer.parseInt(str);
109             props.remove("recv_buf_size");
110         }
111
112         str=props.getProperty("send_buf_size");
113         if(str != null) {
114             send_buf_size=Integer.parseInt(str);
115             props.remove("send_buf_size");
116         }
117
118         str=props.getProperty("skip_suspected_members");
119         if(str != null) {
120             skip_suspected_members=Boolean.valueOf(str).booleanValue();
121             props.remove("skip_suspected_members");
122         }
123
124         str=props.getProperty("suspect_on_send_failure");
125         if(str != null) {
126             suspect_on_send_failure=Boolean.valueOf(str).booleanValue();
127             props.remove("suspect_on_send_failure");
128         }
129
130         str=props.getProperty("use_send_queues");
131         if(str != null) {
132             use_send_queues=Boolean.valueOf(str).booleanValue();
133             props.remove("use_send_queues");
134         }
135
136         str=props.getProperty("send_queue_size");
137         if(str != null) {
138             send_queue_size=Integer.parseInt(str);
139             props.remove("send_queue_size");
140         }
141
142         str=props.getProperty("tcp_nodelay");
143         if(str != null) {
144             tcp_nodelay=Boolean.parseBoolean(str);
145             props.remove("tcp_nodelay");
146         }
147
148         str=props.getProperty("linger");
149         if(str != null) {
150             linger=Integer.parseInt(str);
151             props.remove("linger");
152         }
153
154
155         Util.checkBufferSize(getName() + ".recv_buf_size", recv_buf_size);
156         Util.checkBufferSize(getName() + ".send_buf_size", send_buf_size);
157
158         return true;
159     }
160
161     public void init() throws Exception JavaDoc {
162         super.init();
163         if(start_port <= 0) {
164             Protocol dynamic_discovery_prot=stack.findProtocol("MPING");
165             if(dynamic_discovery_prot == null)
166                 dynamic_discovery_prot=stack.findProtocol("TCPGOSSIP");
167
168             if(dynamic_discovery_prot != null) {
169                 if(log.isDebugEnabled())
170                     log.debug("dynamic discovery is present (" + dynamic_discovery_prot + "), so start_port=" + start_port + " is okay");
171             }
172             else {
173                 throw new IllegalArgumentException JavaDoc("start_port cannot be set to " + start_port +
174                         ", as no dynamic discovery protocol (e.g. MPING or TCPGOSSIP) has been detected.");
175             }
176         }
177     }
178
179
180
181     public void sendToAllMembers(byte[] data, int offset, int length) throws Exception JavaDoc {
182         Address dest;
183         Vector JavaDoc mbrs=(Vector JavaDoc)members.clone();
184         for(int i=0; i < mbrs.size(); i++) {
185             dest=(Address)mbrs.elementAt(i);
186             sendToSingleMember(dest, data, offset, length);
187         }
188     }
189
190     public void sendToSingleMember(Address dest, byte[] data, int offset, int length) throws Exception JavaDoc {
191         if(log.isTraceEnabled()) log.trace("dest=" + dest + " (" + length + " bytes)");
192         if(skip_suspected_members) {
193             if(suspected_mbrs.contains(dest)) {
194                 if(log.isTraceEnabled())
195                     log.trace("will not send unicast message to " + dest + " as it is currently suspected");
196                 return;
197             }
198         }
199
200         try {
201             send(dest, data, offset, length);
202         }
203         catch(Exception JavaDoc e) {
204             if(log.isTraceEnabled())
205                 log.trace("failure sending message to " + dest, e);
206             if(suspect_on_send_failure && members.contains(dest)) {
207                 if(!suspected_mbrs.contains(dest)) {
208                     suspected_mbrs.add(dest);
209                     up_prot.up(new Event(Event.SUSPECT, dest));
210                 }
211             }
212         }
213     }
214
215     public String JavaDoc getInfo() {
216         StringBuilder JavaDoc sb=new StringBuilder JavaDoc();
217         sb.append("connections: ").append(printConnections()).append("\n");
218         return sb.toString();
219     }
220
221     public void postUnmarshalling(Message msg, Address dest, Address src, boolean multicast) {
222         if(multicast)
223             msg.setDest(null);
224         else
225             msg.setDest(dest);
226     }
227
228     public void postUnmarshallingList(Message msg, Address dest, boolean multicast) {
229         postUnmarshalling(msg, dest, null, multicast);
230     }
231
232     public abstract String JavaDoc printConnections();
233
234     public abstract void send(Address dest, byte[] data, int offset, int length) throws Exception JavaDoc;
235
236     public abstract void retainAll(Collection JavaDoc members);
237
238     /** ConnectionTable.Receiver interface */
239     public void receive(Address sender, byte[] data, int offset, int length) {
240         receive(local_addr, sender, data, offset, length);
241     }
242
243     protected Object JavaDoc handleDownEvent(Event evt) {
244         Object JavaDoc ret=super.handleDownEvent(evt);
245         if(evt.getType() == Event.VIEW_CHANGE) {
246             suspected_mbrs.removeAll();
247             View v=(View)evt.getArg();
248             Vector JavaDoc tmp_mbrs=v != null? v.getMembers() : null;
249             if(tmp_mbrs != null) {
250                 retainAll(tmp_mbrs); // remove all connections from the ConnectionTable which are not members
251
}
252         }
253         else if(evt.getType() == Event.UNSUSPECT) {
254             suspected_mbrs.removeElement(evt.getArg());
255         }
256         return ret;
257     }
258 }
259
Popular Tags