KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > WANPIPE


1 // $Id: WANPIPE.java,v 1.6 2004/09/23 16:29:43 belaban Exp $
2

3 package org.jgroups.protocols;
4
5
6 import org.jgroups.*;
7 import org.jgroups.blocks.LogicalLink;
8 import org.jgroups.stack.Protocol;
9 import org.jgroups.util.List;
10 import org.jgroups.util.Util;
11
12 import java.io.IOException JavaDoc;
13 import java.io.ObjectInput JavaDoc;
14 import java.io.ObjectOutput JavaDoc;
15 import java.net.InetAddress JavaDoc;
16 import java.util.Enumeration JavaDoc;
17 import java.util.Properties JavaDoc;
18 import java.util.StringTokenizer JavaDoc;
19 import java.util.Vector JavaDoc;
20
21
22
23 /**
24    Logical point-to-point link. Uses multiple physical links to provide a reliable transport. For example,
25    if there are 2 physical links over different networks, and one of them fails, the WAN pipe will still be
26    able to send traffic over the other link. Currently traffic is sent over the physical links round-robin,
27    but this will be made configurable in the future. Example: 70% over first link, 30% over second, or
28    packets are split and sent across both links (increasing the available bandwidth).
29  */

30 public class WANPIPE extends Protocol implements LogicalLink.Receiver {
31     LogicalLink pipe=null;
32     String JavaDoc name=null; // logical name of WAN pipe
33
final List links=new List(); // contains the parsed link descriptions
34

35     Address JavaDoc local_addr=null;
36     String JavaDoc group_addr=null;
37     final Properties JavaDoc properties=null;
38     final Vector JavaDoc members=new Vector JavaDoc();
39
40
41
42     public WANPIPE() {
43     pipe=new LogicalLink(this);
44     }
45
46
47     public String JavaDoc toString() {
48     return "Protocol WANPIPE(local address: " + local_addr + ')';
49     }
50
51
52     public String JavaDoc getName() {return "WANPIPE";}
53
54
55
56
57
58
59     /**
60        Sent to destination(s) using the WAN pipe. Send local messages directly back up the stack
61      */

62     public void down(Event evt) {
63     Message msg, rsp, copy;
64     Address JavaDoc dest_addr;
65
66     if(evt.getType() != Event.MSG) {
67         handleDownEvent(evt);
68         return;
69     }
70
71     msg=(Message)evt.getArg();
72     dest_addr=msg.getDest();
73     
74     if(dest_addr == null) { // send both local and remote
75
for(int i=0; i < members.size(); i++) {
76         dest_addr=(Address JavaDoc)members.elementAt(i);
77
78         if(dest_addr.equals(local_addr)) { // local or ...
79
returnLocal(msg);
80         }
81         else { // remote
82
copy=msg.copy();
83             copy.setDest(dest_addr);
84             copy.putHeader(getName(), new WanPipeHeader(group_addr));
85             sendUnicastMessage(copy);
86         }
87         }
88     }
89     else {
90         if(dest_addr.equals(local_addr)) { // destination can either be local ...
91
returnLocal(msg);
92         }
93         else { // or remote
94
msg.putHeader(getName(), new WanPipeHeader(group_addr));
95         sendUnicastMessage(msg);
96         }
97     }
98     }
99
100
101     /** Make a response and send back up the same stack it came down */
102     void returnLocal(Message msg) {
103     Message rsp=msg.copy();
104     rsp.setDest(local_addr);
105     rsp.setSrc(local_addr);
106     passUp(new Event(Event.MSG, rsp));
107     }
108
109
110
111
112     public void start() throws Exception JavaDoc {
113     LinkInfo l;
114
115         for(Enumeration JavaDoc e=links.elements(); e.hasMoreElements();) {
116             l=(LinkInfo)e.nextElement();
117             pipe.addLink(l.local_addr, l.local_port, l.remote_addr, l.remote_port);
118         }
119         pipe.start();
120         local_addr=new WanPipeAddress(name); // logical address for the WAN pipe
121
passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
122     }
123
124
125     public void stop() {
126     pipe.stop();
127     pipe.removeAllLinks();
128     }
129
130
131
132
133
134
135
136     // LogicalLink.Receiver interface
137
public void receive(byte[] buf) {
138     WanPipeHeader hdr=null;
139     Message msg=null;
140
141     try {
142         msg=(Message)Util.objectFromByteBuffer(buf);
143     }
144     catch(Exception JavaDoc e) {
145         System.err.println("WANPIPE.receive(): " + e);
146         return;
147     }
148     
149      if(log.isInfoEnabled()) log.info("received msg " + msg);
150     hdr=(WanPipeHeader)msg.removeHeader(getName());
151     
152     /* Discard all messages destined for a channel with a different name */
153     String JavaDoc ch_name=null;
154
155     if(hdr.group_addr != null)
156         ch_name=hdr.group_addr;
157
158     if(group_addr == null) {
159          System.err.println("WANPIPE.receive(): group address in header was null, discarded");
160         return;
161     }
162
163     if(ch_name != null && !group_addr.equals(ch_name))
164         return;
165
166     passUp(new Event(Event.MSG, msg));
167     }
168
169
170
171
172
173     public void linkDown(InetAddress JavaDoc local, int local_port, InetAddress JavaDoc remote, int remote_port) {
174     Object JavaDoc p=getPeer();
175
176     passUp(new Event(Event.SUSPECT, p));
177     }
178
179
180     public void linkUp(InetAddress JavaDoc local, int local_port, InetAddress JavaDoc remote, int remote_port) {
181     
182     }
183
184
185     public void missedHeartbeat(InetAddress JavaDoc local, int local_port, InetAddress JavaDoc remote, int remote_port, int num_hbs) {
186     
187     }
188
189     public void receivedHeartbeatAgain(InetAddress JavaDoc local, int local_port, InetAddress JavaDoc remote, int remote_port) {
190     
191     }
192
193
194
195     /** Setup the Protocol instance acording to the configuration string */
196     public boolean setProperties(Properties JavaDoc props) {super.setProperties(props);
197     String JavaDoc str;
198
199     str=props.getProperty("name");
200     if(str != null) {
201         name=str;
202         props.remove("name");
203     }
204
205     str=props.getProperty("links");
206     if(str != null) {
207
208         // parse links and put them in list (as LinkInfos)
209
if(parseLinks(str) == false)
210         return false;
211         props.remove("links");
212     }
213
214     if(name == null || name.length() == 0) {
215         System.err.println("WANPIPE.setProperties(): 'name' must be set");
216         return false;
217     }
218     if(links.size() == 0) {
219         System.err.println("WANPIPE.setProperties(): no links specified (at least 1 link must be present)");
220         return false;
221     }
222
223     if(props.size() > 0) {
224         System.err.println("WANPIPE.setProperties(): the following properties are not recognized:");
225         props.list(System.out);
226         return false;
227     }
228     return true;
229     }
230
231
232
233     /** Parse link spec and put each link into 'links' (as LinkInfo) <br>
234     Example: <pre> [daddy@6666,daddy@7777,daddy@7777,sindhu@6666] </pre>*/

235     boolean parseLinks(String JavaDoc s) {
236     LinkInfo info;
237     StringTokenizer JavaDoc tok;
238     String JavaDoc src, dst;
239     int index=0; // holds position of '@'
240

241     s=s.replace('[', ' ');
242     s=s.replace(']', ' ');
243     s=s.trim();
244     tok=new StringTokenizer JavaDoc(s, ",");
245     while(tok.hasMoreElements()) {
246         src=tok.nextToken().trim();
247         dst=tok.nextToken().trim();
248         info=new LinkInfo();
249
250         index=src.indexOf('@');
251         if(index == -1) {
252         System.err.println("WANPIPE.parseLinks(): local address " + src + " must have a @ separator");
253         return false;
254         }
255         info.local_addr=src.substring(0, index);
256         info.local_port=Integer.parseInt(src.substring(index + 1, src.length()));
257
258         index=dst.indexOf('@');
259         if(index == -1) {
260         System.err.println("WANPIPE.parseLinks(): remote address " + dst + " must have a @ separator");
261         return false;
262         }
263         info.remote_addr=dst.substring(0, index);
264         info.remote_port=Integer.parseInt(dst.substring(index + 1, dst.length()));
265
266         links.add(info);
267     }
268     
269     return true;
270     }
271
272
273     Object JavaDoc getPeer() {
274     Object JavaDoc ret=null;
275     if(members == null || members.size() == 0 || local_addr == null)
276         return null;
277     for(int i=0; i < members.size(); i++)
278         if(!members.elementAt(i).equals(local_addr))
279         return members.elementAt(i);
280     return ret;
281     }
282
283
284     
285
286
287     /**
288        If the sender is null, set our own address. We cannot just go ahead and set the address
289        anyway, as we might be sending a message on behalf of someone else ! E.g. in case of
290        retransmission, when the original sender has crashed, or in a FLUSH protocol when we
291        have to return all unstable messages with the FLUSH_OK response.
292      */

293     private void setSourceAddress(Message msg) {
294     if(msg.getSrc() == null)
295         msg.setSrc(local_addr);
296     }
297
298
299
300
301     /** Send a message to the address specified in msg.dest */
302     private void sendUnicastMessage(Message msg) {
303     byte[] buf=null;
304
305     setSourceAddress(msg);
306     try {
307         buf=Util.objectToByteBuffer(msg);
308     }
309     catch(Exception JavaDoc e) {
310         System.err.println("WANPIPE.sendUnicastMessage(): " + e);
311         return;
312     }
313     
314     try {
315         pipe.send(buf);
316     }
317     catch(LogicalLink.AllLinksDown links_down) {
318         System.err.println("WANPIPE.sendUnicastMessage(): WAN pipe has no currently operational " +
319                    "link to send message. Discarding it.");
320     }
321     catch(LogicalLink.NoLinksAvailable no_links) {
322         System.err.println("WANPIPE.sendUnicastMessage(): WAN pipe has no physical links configured;" +
323                    " cannot send message");
324     }
325     catch(Exception JavaDoc e) {
326         System.err.println("WANPIPE.sendUnicastMessage(): " + e);
327     }
328     }
329
330
331
332
333
334     private void handleUpEvent(Event evt) {
335     switch(evt.getType()) {
336         
337     case Event.SUSPECT:
338         break;
339     }
340     }
341
342
343
344     private void handleDownEvent(Event evt) {
345     switch(evt.getType()) {
346
347     case Event.TMP_VIEW:
348     case Event.VIEW_CHANGE:
349         synchronized(members) {
350         members.removeAllElements();
351         Vector JavaDoc tmpvec=((View)evt.getArg()).getMembers();
352         for(int i=0; i < tmpvec.size(); i++)
353             members.addElement(tmpvec.elementAt(i));
354         }
355         break;
356
357     case Event.SUSPECT:
358         break;
359
360     case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local)
361
passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
362         break;
363
364     case Event.CONNECT:
365         group_addr=(String JavaDoc)evt.getArg();
366         passUp(new Event(Event.CONNECT_OK));
367         break;
368
369     case Event.DISCONNECT:
370         passUp(new Event(Event.DISCONNECT_OK));
371         break;
372
373     }
374     }
375
376
377
378
379     private static class LinkInfo {
380     String JavaDoc local_addr=null, remote_addr=null;
381     int local_port=0, remote_port=0;
382     
383     public String JavaDoc toString() {
384         StringBuffer JavaDoc ret=new StringBuffer JavaDoc();
385         
386         ret.append("local_addr=" + (local_addr != null? local_addr : "null"));
387         ret.append(":" + local_port);
388         ret.append(", remote_addr=" + (remote_addr != null ? remote_addr : "null"));
389         ret.append(":" + remote_port);
390         return ret.toString();
391     }
392     }
393     
394     
395     public class WanPipeHeader extends Header {
396     public String JavaDoc group_addr=null;
397
398     
399     public WanPipeHeader() {} // used for externalization
400

401     public WanPipeHeader(String JavaDoc n) {group_addr=n;}
402     
403     
404     public long size() {
405         return Header.HDR_OVERHEAD;
406     }
407     
408     public String JavaDoc toString() {
409         return "[WanPipe: group_addr=" + group_addr + ']';
410     }
411     
412     public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
413         out.writeObject(group_addr);
414     }
415     
416     
417     
418     public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
419         group_addr=(String JavaDoc)in.readObject();
420     }
421     
422     }
423     
424     
425 }
426
427
428
429
Popular Tags