KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > LogicalLink


1 // $Id: LogicalLink.java,v 1.3 2004/09/23 16:29:11 belaban Exp $
2

3 package org.jgroups.blocks;
4
5 import java.io.BufferedReader JavaDoc;
6 import java.io.InputStreamReader JavaDoc;
7 import java.net.InetAddress JavaDoc;
8 import java.util.Vector JavaDoc;
9
10
11 /**
12    Implements a logical point-to-point link between 2 entities consisting of a number of physical links.
13    Traffic is routed over any of the physical link, according to policies. Examples are: send traffic
14    over all links, round-robin, use first link for 70% of traffic, other links for the remaining 30%.
15    @author Bela Ban, June 2000
16 */

17 public class LogicalLink implements Link.Receiver {
18     Receiver receiver=null;
19     final Vector JavaDoc links=new Vector JavaDoc(); // of Links
20
final int link_to_use=0;
21
22
23     public class NoLinksAvailable extends Exception JavaDoc {
24     public String JavaDoc toString() {
25         return "LogicalLinks.NoLinksAvailable: there are no physical links available";
26     }
27     }
28
29     public class AllLinksDown extends Exception JavaDoc {
30     public String JavaDoc toString() {
31         return "LogicalLinks.AllLinksDown: all physical links are currently down";
32     }
33     }
34
35     
36     public interface Receiver {
37     void receive(byte[] buf);
38     void linkDown(InetAddress JavaDoc local, int local_port, InetAddress JavaDoc remote, int remote_port);
39     void linkUp(InetAddress JavaDoc local, int local_port, InetAddress JavaDoc remote, int remote_port);
40     void missedHeartbeat(InetAddress JavaDoc local, int local_port, InetAddress JavaDoc remote, int remote_port, int num_hbs);
41     void receivedHeartbeatAgain(InetAddress JavaDoc local, int local_port, InetAddress JavaDoc remote, int remote_port);
42     }
43
44
45
46     public LogicalLink(Receiver r) {
47     receiver=r;
48
49     }
50
51     public LogicalLink() {
52
53     }
54
55
56     public void addLink(String JavaDoc local_addr, int local_port, String JavaDoc remote_addr, int remote_port) {
57     Link new_link=new Link(local_addr, local_port, remote_addr, remote_port, this);
58     if(links.contains(new_link))
59         System.err.println("LogicalLink.add(): link " + new_link + " is already present");
60     else
61         links.addElement(new_link);
62     }
63
64
65
66     public void addLink(String JavaDoc local_addr, int local_port, String JavaDoc remote_addr, int remote_port,
67             long timeout, long hb_interval) {
68     Link new_link=new Link(local_addr, local_port, remote_addr, remote_port, timeout, hb_interval, this);
69     if(links.contains(new_link))
70         System.err.println("LogicalLink.add(): link " + new_link + " is already present");
71     else
72         links.addElement(new_link);
73     }
74
75
76
77     public void removeAllLinks() {
78     Link tmp;
79     for(int i=0; i < links.size(); i++) {
80         tmp=(Link)links.elementAt(i);
81         tmp.stop();
82     }
83     links.removeAllElements();
84     }
85
86
87     public Vector JavaDoc getLinks() {return links;}
88
89
90     public int numberOfLinks() {
91     return links.size();
92     }
93
94
95
96
97     public int numberOfEstablishedLinks() {
98     int n=0;
99
100     for(int i=0; i < links.size(); i++) {
101         if(((Link)links.elementAt(i)).established())
102         n++;
103     }
104     return n;
105     }
106
107
108
109     /** Start all links */
110     public void start() {
111     Link tmp;
112     for(int i=0; i < links.size(); i++) {
113         tmp=(Link)links.elementAt(i);
114         try {
115         tmp.start();
116         }
117         catch(Exception JavaDoc ex) {
118         System.err.println("LogicalLink.start(): could not create physical link, reason: " + ex);
119         }
120     }
121     }
122
123
124     /** Stop all links */
125     public void stop() {
126     Link tmp;
127     for(int i=0; i < links.size(); i++) {
128         tmp=(Link)links.elementAt(i);
129         tmp.stop();
130     }
131     }
132     
133
134
135     /** Send a message to the other side */
136     public boolean send(byte[] buf) throws AllLinksDown, NoLinksAvailable {
137     Link link;
138     int link_used=0;
139
140     if(buf == null || buf.length == 0) {
141         System.err.println("LogicalLink.send(): buf is null or empty");
142         return false;
143     }
144
145     if(links.size() == 0)
146         throw new NoLinksAvailable();
147     
148
149     
150     // current policy (make policies configurable later !): alternate between links.
151
// if failure, take first link that works
152
// link=(Link)links.elementAt(link_to_use);
153
// if(link.send(buf)) {
154
// System.out.println("Send over link #" + link_to_use + ": " + link);
155
// link_to_use=(link_to_use + 1) % links.size();
156
// return true;
157
// }
158

159     // link_used=(link_to_use + 1) % links.size();
160
// while(link_used != link_to_use) {
161
// link=(Link)links.elementAt(link_used);
162
// if(link.send(buf)) {
163
// System.out.println("Send over link #" + link_used + ": " + link);
164
// link_to_use=(link_to_use + 1) % links.size();
165
// return true;
166
// }
167
// link_used=(link_used + 1) % links.size();
168
// }
169

170
171
172
173     // take first available link. use other links only if first is down. if we have smaller and bigger
174
// pipes, the bigger ones should be specified first (so we're using them first, and only when they
175
// are not available we use the smaller ones)
176
for(int i=0; i < links.size(); i++) {
177         link=(Link)links.elementAt(i);
178         if(link.established()) {
179         if(link.send(buf)) {
180             System.out.println("Send over link #" + link_used + ": " + link);
181             return true;
182         }
183         }
184     }
185
186     throw new AllLinksDown();
187     }
188
189
190
191
192     public void setReceiver(Receiver r) {
193     receiver=r;
194     }
195
196
197     /*-------- Interface Link.Receiver ---------*/
198
199     /** Receive a message from any of the physical links. That's why this and the next methods have to be
200     synchronized */

201     public synchronized void receive(byte[] buf) {
202     if(receiver != null)
203         receiver.receive(buf);
204     }
205
206     /** One of the physical links went down */
207     public synchronized void linkDown(InetAddress JavaDoc local, int local_port, InetAddress JavaDoc remote, int remote_port) {
208     if(receiver != null)
209         receiver.linkDown(local, local_port, remote, remote_port);
210     }
211
212     /** One of the physical links came up */
213     public synchronized void linkUp(InetAddress JavaDoc local, int local_port, InetAddress JavaDoc remote, int remote_port) {
214     if(receiver != null)
215         receiver.linkUp(local, local_port, remote, remote_port);
216     }
217
218     
219     /** Missed one or more heartbeats. Link is not yet down, though */
220     public synchronized void missedHeartbeat(InetAddress JavaDoc local, int local_port,
221                          InetAddress JavaDoc remote, int remote_port, int num_missed_hbs) {
222     if(receiver != null)
223         receiver.missedHeartbeat(local, local_port, remote, remote_port, num_missed_hbs);
224     }
225     
226     
227     /** Heartbeat came back again (before link was taken down) after missing some heartbeats */
228     public synchronized void receivedHeartbeatAgain(InetAddress JavaDoc local, int local_port,
229                             InetAddress JavaDoc remote, int remote_port) {
230     if(receiver != null)
231         receiver.receivedHeartbeatAgain(local, local_port, remote, remote_port);
232     }
233
234
235
236
237
238
239     private static class MyReceiver implements LogicalLink.Receiver {
240     
241     public void receive(byte[] buf) {
242         System.out.println("<-- " + new String JavaDoc(buf));
243     }
244     
245     
246     /** All of the physical links are down --> logical link is down too */
247     public synchronized void linkDown(InetAddress JavaDoc l, int lp, InetAddress JavaDoc r, int rp) {
248         System.out.println("** linkDown(): " + r + ':' + rp);
249     }
250     
251     /** At least 1 physical links is up again */
252     public synchronized void linkUp(InetAddress JavaDoc l, int lp, InetAddress JavaDoc r, int rp) {
253         System.out.println("** linkUp(): " + r + ':' + rp);
254     }
255     
256     public synchronized void missedHeartbeat(InetAddress JavaDoc l, int lp, InetAddress JavaDoc r, int rp, int num) {
257         //System.out.println("missedHeartbeat(): " + r + ":" + rp);
258
}
259     
260     public synchronized void receivedHeartbeatAgain(InetAddress JavaDoc l, int lp, InetAddress JavaDoc r, int rp) {
261         //System.out.println("receivedHeartbeatAgain(): " + r + ":" + rp);
262
}
263         
264     }
265
266
267
268     public static void main(String JavaDoc[] args) {
269     LogicalLink ll=new LogicalLink();
270     String JavaDoc local_host, remote_host;
271     int local_port, remote_port;
272     int i=0;
273
274     ll.setReceiver(new MyReceiver());
275
276     if(args.length % 4 != 0 || args.length == 0) {
277         System.err.println("\nLogicalLink <link+>\nwhere <link> is " +
278                    "<local host> <local port> <remote host> <remote port>\n");
279         return;
280     }
281     
282     while(i < args.length) {
283         local_host=args[i++];
284         local_port=Integer.parseInt(args[i++]);
285         remote_host=args[i++];
286         remote_port=Integer.parseInt(args[i++]);
287         ll.addLink(local_host, local_port, remote_host, remote_port);
288     }
289     
290     try {
291         ll.start();
292     }
293     catch(Exception JavaDoc e) {
294         System.err.println("LogicalLink.main(): " + e);
295     }
296     
297     BufferedReader JavaDoc in= new BufferedReader JavaDoc(new InputStreamReader JavaDoc(System.in));
298     while(true) {
299         try {
300         System.out.print("> "); System.out.flush();
301         String JavaDoc line=in.readLine();
302         ll.send(line.getBytes());
303         }
304         catch(Exception JavaDoc e) {
305         System.err.println(e);
306         }
307     }
308     
309     }
310     
311     
312 }
313
314
315
Popular Tags