1 3 package org.jgroups.blocks; 4 5 import java.io.BufferedReader ; 6 import java.io.InputStreamReader ; 7 import java.net.InetAddress ; 8 import java.util.Vector ; 9 10 11 17 public class LogicalLink implements Link.Receiver { 18 Receiver receiver=null; 19 final Vector links=new Vector (); final int link_to_use=0; 21 22 23 public class NoLinksAvailable extends Exception { 24 public String toString() { 25 return "LogicalLinks.NoLinksAvailable: there are no physical links available"; 26 } 27 } 28 29 public class AllLinksDown extends Exception { 30 public String toString() { 31 return "LogicalLinks.AllLinksDown: all physical links are currently down"; 32 } 33 } 34 35 36 public interface Receiver { 37 void receive(byte[] buf); 38 void linkDown(InetAddress local, int local_port, InetAddress remote, int remote_port); 39 void linkUp(InetAddress local, int local_port, InetAddress remote, int remote_port); 40 void missedHeartbeat(InetAddress local, int local_port, InetAddress remote, int remote_port, int num_hbs); 41 void receivedHeartbeatAgain(InetAddress local, int local_port, InetAddress remote, int remote_port); 42 } 43 44 45 46 public LogicalLink(Receiver r) { 47 receiver=r; 48 49 } 50 51 public LogicalLink() { 52 53 } 54 55 56 public void addLink(String local_addr, int local_port, String remote_addr, int remote_port) { 57 Link new_link=new Link(local_addr, local_port, remote_addr, remote_port, this); 58 if(links.contains(new_link)) 59 System.err.println("LogicalLink.add(): link " + new_link + " is already present"); 60 else 61 links.addElement(new_link); 62 } 63 64 65 66 public void addLink(String local_addr, int local_port, String remote_addr, int remote_port, 67 long timeout, long hb_interval) { 68 Link new_link=new Link(local_addr, local_port, remote_addr, remote_port, timeout, hb_interval, this); 69 if(links.contains(new_link)) 70 System.err.println("LogicalLink.add(): link " + new_link + " is already present"); 71 else 72 links.addElement(new_link); 73 } 74 75 76 77 public void removeAllLinks() { 78 Link tmp; 79 for(int i=0; i < links.size(); i++) { 80 tmp=(Link)links.elementAt(i); 81 tmp.stop(); 82 } 83 links.removeAllElements(); 84 } 85 86 87 public Vector getLinks() {return links;} 88 89 90 public int numberOfLinks() { 91 return links.size(); 92 } 93 94 95 96 97 public int numberOfEstablishedLinks() { 98 int n=0; 99 100 for(int i=0; i < links.size(); i++) { 101 if(((Link)links.elementAt(i)).established()) 102 n++; 103 } 104 return n; 105 } 106 107 108 109 110 public void start() { 111 Link tmp; 112 for(int i=0; i < links.size(); i++) { 113 tmp=(Link)links.elementAt(i); 114 try { 115 tmp.start(); 116 } 117 catch(Exception ex) { 118 System.err.println("LogicalLink.start(): could not create physical link, reason: " + ex); 119 } 120 } 121 } 122 123 124 125 public void stop() { 126 Link tmp; 127 for(int i=0; i < links.size(); i++) { 128 tmp=(Link)links.elementAt(i); 129 tmp.stop(); 130 } 131 } 132 133 134 135 136 public boolean send(byte[] buf) throws AllLinksDown, NoLinksAvailable { 137 Link link; 138 int link_used=0; 139 140 if(buf == null || buf.length == 0) { 141 System.err.println("LogicalLink.send(): buf is null or empty"); 142 return false; 143 } 144 145 if(links.size() == 0) 146 throw new NoLinksAvailable(); 147 148 149 150 159 170 171 172 173 for(int i=0; i < links.size(); i++) { 177 link=(Link)links.elementAt(i); 178 if(link.established()) { 179 if(link.send(buf)) { 180 System.out.println("Send over link #" + link_used + ": " + link); 181 return true; 182 } 183 } 184 } 185 186 throw new AllLinksDown(); 187 } 188 189 190 191 192 public void setReceiver(Receiver r) { 193 receiver=r; 194 } 195 196 197 198 199 201 public synchronized void receive(byte[] buf) { 202 if(receiver != null) 203 receiver.receive(buf); 204 } 205 206 207 public synchronized void linkDown(InetAddress local, int local_port, InetAddress remote, int remote_port) { 208 if(receiver != null) 209 receiver.linkDown(local, local_port, remote, remote_port); 210 } 211 212 213 public synchronized void linkUp(InetAddress local, int local_port, InetAddress remote, int remote_port) { 214 if(receiver != null) 215 receiver.linkUp(local, local_port, remote, remote_port); 216 } 217 218 219 220 public synchronized void missedHeartbeat(InetAddress local, int local_port, 221 InetAddress remote, int remote_port, int num_missed_hbs) { 222 if(receiver != null) 223 receiver.missedHeartbeat(local, local_port, remote, remote_port, num_missed_hbs); 224 } 225 226 227 228 public synchronized void receivedHeartbeatAgain(InetAddress local, int local_port, 229 InetAddress remote, int remote_port) { 230 if(receiver != null) 231 receiver.receivedHeartbeatAgain(local, local_port, remote, remote_port); 232 } 233 234 235 236 237 238 239 private static class MyReceiver implements LogicalLink.Receiver { 240 241 public void receive(byte[] buf) { 242 System.out.println("<-- " + new String (buf)); 243 } 244 245 246 247 public synchronized void linkDown(InetAddress l, int lp, InetAddress r, int rp) { 248 System.out.println("** linkDown(): " + r + ':' + rp); 249 } 250 251 252 public synchronized void linkUp(InetAddress l, int lp, InetAddress r, int rp) { 253 System.out.println("** linkUp(): " + r + ':' + rp); 254 } 255 256 public synchronized void missedHeartbeat(InetAddress l, int lp, InetAddress r, int rp, int num) { 257 } 259 260 public synchronized void receivedHeartbeatAgain(InetAddress l, int lp, InetAddress r, int rp) { 261 } 263 264 } 265 266 267 268 public static void main(String [] args) { 269 LogicalLink ll=new LogicalLink(); 270 String local_host, remote_host; 271 int local_port, remote_port; 272 int i=0; 273 274 ll.setReceiver(new MyReceiver()); 275 276 if(args.length % 4 != 0 || args.length == 0) { 277 System.err.println("\nLogicalLink <link+>\nwhere <link> is " + 278 "<local host> <local port> <remote host> <remote port>\n"); 279 return; 280 } 281 282 while(i < args.length) { 283 local_host=args[i++]; 284 local_port=Integer.parseInt(args[i++]); 285 remote_host=args[i++]; 286 remote_port=Integer.parseInt(args[i++]); 287 ll.addLink(local_host, local_port, remote_host, remote_port); 288 } 289 290 try { 291 ll.start(); 292 } 293 catch(Exception e) { 294 System.err.println("LogicalLink.main(): " + e); 295 } 296 297 BufferedReader in= new BufferedReader (new InputStreamReader (System.in)); 298 while(true) { 299 try { 300 System.out.print("> "); System.out.flush(); 301 String line=in.readLine(); 302 ll.send(line.getBytes()); 303 } 304 catch(Exception e) { 305 System.err.println(e); 306 } 307 } 308 309 } 310 311 312 } 313 314 315 | Popular Tags |