1 package org.sapia.ubik.mcast; 2 3 import org.sapia.ubik.mcast.server.UDPServer; 4 import org.sapia.ubik.net.ServerAddress; 5 6 import java.io.*; 7 8 import java.net.*; 9 10 import java.util.List ; 11 import org.sapia.ubik.rmi.server.Log; 12 import org.sapia.ubik.util.Localhost; 13 14 15 25 public class UDPUnicastDispatcher extends UDPServer implements UnicastDispatcher { 26 static final int DEFAULT_BUFSZ = 5000; 27 private EventConsumer _consumer; 28 private int _bufsize = DEFAULT_BUFSZ; 29 private int _responseTimeout = 10000; 30 private String _domain; 31 private String _node; 32 private SocketTimeoutListener _listener; 33 private ServerAddress _addr; 34 35 38 public UDPUnicastDispatcher(int soTimeout, EventConsumer consumer) 39 throws SocketException { 40 super(consumer.getNode() + "Unicast@" + 41 consumer.getDomainName().toString(), soTimeout); 42 _consumer = consumer; 43 _domain = consumer.getDomainName().toString(); 44 _node = consumer.getNode(); 45 } 46 47 50 public UDPUnicastDispatcher(int soTimeout, int port, EventConsumer consumer) 51 throws SocketException { 52 super(consumer.getNode() + "@" + consumer.getDomainName().toString(), 53 soTimeout, port); 54 _consumer = consumer; 55 _domain = consumer.getDomainName().toString(); 56 _node = consumer.getNode(); 57 } 58 59 62 public void setBufsize(int size) { 63 super.setBufsize(size); 64 _bufsize = size; 65 } 66 67 71 public void setSoTimeoutListener(SocketTimeoutListener listener) { 72 _listener = listener; 73 } 74 75 78 public void start() { 79 super.start(); 80 81 try { 82 InetAddress addr = Localhost.getLocalAddress(); 83 if(Log.isDebug()){ 84 Log.debug(getClass(), "Local address: " + addr.getHostAddress()); 85 } 86 _addr = new InetServerAddress(addr, getPort()); 87 } catch (UnknownHostException e) { 88 throw new IllegalStateException (e.getMessage()); 89 } 90 } 91 92 95 public void close() { 96 if (super._sock != null) { 97 super._sock.close(); 98 } 99 } 100 101 104 public void dispatch(ServerAddress addr, String type, Object data) 105 throws IOException { 106 DatagramSocket sock = new DatagramSocket(); 107 108 sock.setSoTimeout(_responseTimeout); 109 110 try { 111 RemoteEvent evt = new RemoteEvent(null, type, data).setNode(_node); 112 InetServerAddress inet = (InetServerAddress) addr; 113 114 if(Log.isDebug()){ 115 Log.debug(getClass(), "dispatch() : " + addr + ", type: " + type + ", data: " + data); 116 } 117 118 doSend(inet.getInetAddress(), inet.getPort(), sock, 119 Util.toBytes(evt, bufSize()), false, type); 120 } catch (TimeoutException e) { 121 } finally { 123 try{ 124 sock.close(); 125 }catch(RuntimeException e){} 126 } 127 } 128 129 132 public Response send(ServerAddress addr, String type, Object data) 133 throws IOException { 134 DatagramSocket sock = new DatagramSocket(); 135 136 sock.setSoTimeout(_responseTimeout); 137 138 RemoteEvent evt = new RemoteEvent(null, type, data).setNode(_node) 139 .setSync(); 140 InetServerAddress inet = (InetServerAddress) addr; 141 142 try { 143 return (Response) doSend(inet.getInetAddress(), inet.getPort(), sock, 144 Util.toBytes(evt, bufSize()), true, type); 145 } catch (TimeoutException e) { 146 return new Response(evt.getId(), null).setStatusSuspect(); 147 } finally { 148 try{ 149 sock.close(); 150 }catch(RuntimeException e){} 151 } 152 } 153 154 157 public RespList send(List addresses, String type, Object data) 158 throws IOException { 159 DatagramSocket sock = new DatagramSocket(); 160 161 sock.setSoTimeout(_responseTimeout); 162 163 try{ 164 RemoteEvent evt = new RemoteEvent(null, type, data).setNode(_node) 165 .setSync(); 166 byte[] bytes = Util.toBytes(evt, bufSize()); 167 InetServerAddress current; 168 RespList resps = new RespList(addresses.size()); 169 Response resp; 170 171 for (int i = 0; i < addresses.size(); i++) { 172 current = (InetServerAddress) addresses.get(i); 173 174 try { 175 resp = (Response) (Response) doSend(current.getInetAddress(), 176 current.getPort(), sock, bytes, true, type); 177 } catch (TimeoutException e) { 178 resp = new Response(evt.getId(), null).setStatusSuspect(); 179 } 180 181 if (!resp.isNone()) { 182 resps.addResponse(resp); 183 } 184 185 } 186 187 return resps; 188 } 189 finally { 190 try{ 191 sock.close(); 192 }catch(RuntimeException e){} 193 } 194 } 195 196 199 public ServerAddress getAddress() throws IllegalStateException { 200 if (_addr == null) { 201 throw new IllegalStateException ( 202 "The address of this instance is not yet available"); 203 } 204 205 return _addr; 206 } 207 208 211 protected void handleSoTimeout() { 212 if (_listener != null) { 213 _listener.handleSoTimeout(); 214 } 215 } 216 217 220 protected void handlePacketSizeToShort(DatagramPacket pack) { 221 String msg = "Buffer size to short; set to: " + bufSize() + 222 ". This size is not enough to receive some incoming packets"; 223 224 System.err.println(msg); 225 } 226 227 230 protected int bufSize() { 231 return super.bufSize(); 232 } 233 234 237 protected void handle(DatagramPacket pack, DatagramSocket sock) { 238 try { 239 Object o = Util.fromDatagram(pack); 240 241 if (o instanceof RemoteEvent) { 242 RemoteEvent evt = (RemoteEvent) o; 243 244 if (evt.isSync()) { 245 InetAddress addr = pack.getAddress(); 246 int port = pack.getPort(); 247 248 if (_consumer.hasSyncListener(evt.getType())) { 249 Object response = _consumer.onSyncEvent(evt); 250 251 try { 252 doSend(addr, port, sock, 253 Util.toBytes(new Response(evt.getId(), response), bufSize()), 254 false, evt.getType()); 255 } catch (TimeoutException e) { 256 } 258 } else { 259 try { 260 doSend(addr, port, sock, 261 Util.toBytes(new Response(evt.getId(), null).setNone(), 262 bufSize()), false, evt.getType()); 263 } catch (TimeoutException e) { 264 } 266 } 267 } else { 268 _consumer.onAsyncEvent(evt); 269 } 270 } else { 271 System.out.println("Object not a remote event: " + 272 o.getClass().getName() + "; " + o); 273 } 274 } catch (IOException e) { 275 e.printStackTrace(); 276 } catch (ClassNotFoundException e) { 277 e.printStackTrace(); 278 } 279 } 280 281 private Object doSend(InetAddress addr, int port, DatagramSocket sock, 282 byte[] bytes, boolean synchro, String type) throws IOException, TimeoutException { 283 if (bytes.length > _bufsize) { 284 throw new IOException( 285 "Size of data larger than buffer size; increase this instance's buffer size through the setBufsize() method"); 286 } 287 288 if(Log.isDebug()){ 289 Log.debug(getClass(), "doSend() : " + addr + ", event type: " + type); 290 } 291 DatagramPacket pack = new DatagramPacket(bytes, 0, bytes.length, addr, port); 292 293 sock.send(pack); 294 295 if (synchro) { 296 bytes = new byte[bufSize()]; 297 pack = new DatagramPacket(bytes, bytes.length); 298 299 try { 300 sock.receive(pack); 301 } catch (InterruptedIOException e) { 302 throw new TimeoutException(); 303 } 304 305 try { 306 return Util.fromDatagram(pack); 307 } catch (ClassNotFoundException e) { 308 throw new IOException(e.getClass().getName() + ": " + e.getMessage()); 309 } 310 } else { 311 return null; 312 } 313 } 314 } 315 | Popular Tags |