1 package org.sapia.ubik.mcast; 2 3 import org.sapia.ubik.net.ServerAddress; 4 5 import java.io.IOException ; 6 7 import java.util.ArrayList ; 8 import java.util.List ; 9 import org.sapia.ubik.rmi.Consts; 10 import org.sapia.ubik.rmi.server.Log; 11 12 13 31 public class EventChannel { 32 static final String DISCOVER_EVT = "ubik/mcast/discover"; 33 static final String PUBLISH_EVT = "ubik/mcast/publish"; 34 static final String HEARTBEAT_EVT = "ubik/mcast/heartbeat"; 35 BroadcastDispatcher _broadcast; 36 UnicastDispatcher _unicast; 37 EventConsumer _consumer; 38 ChannelEventListener _listener; 39 View _view = new View(30000); 40 ServerAddress _address; 41 List _discoListeners = new ArrayList (); 42 boolean _started; 43 boolean _closed; 44 45 55 public EventChannel(String domain, String mcastHost, int mcastPort) 56 throws IOException { 57 _consumer = new EventConsumer(domain); 58 _broadcast = new BroadcastDispatcherImpl(_consumer, mcastHost, mcastPort); 59 _unicast = new UDPUnicastDispatcher(10000, _consumer); 60 init(); 61 } 62 63 74 public EventChannel(String domain, String mcastHost, int mcastPort, 75 int unicastPort) throws IOException { 76 _consumer = new EventConsumer(domain); 77 78 String soTimeoutProp = System.getProperty(Consts.MCAST_HEARTBEAT_INTERVAL); 79 int soTimeout = 20000; 80 if(soTimeoutProp != null){ 81 try{ 82 soTimeout = Integer.parseInt(soTimeoutProp); 83 }catch(NumberFormatException e){ 84 } 86 } 87 _unicast = new UDPUnicastDispatcher(soTimeout, unicastPort, _consumer); 88 _broadcast = new BroadcastDispatcherImpl(_consumer, mcastHost, mcastPort); 89 init(); 90 } 91 92 97 public DomainName getDomainName() { 98 return _consumer.getDomainName(); 99 } 100 101 106 public String getMulticastHost() { 107 return _broadcast.getMulticastAddress(); 108 } 109 110 115 public int getMulticastPort() { 116 return _broadcast.getMulticastPort(); 117 } 118 119 125 public void start() throws IOException { 126 _listener = new ChannelEventListener(this); 127 _consumer.registerAsyncListener(PUBLISH_EVT, _listener); 128 _consumer.registerAsyncListener(DISCOVER_EVT, _listener); 129 _consumer.registerAsyncListener(HEARTBEAT_EVT, _listener); 130 _unicast.setSoTimeoutListener(_listener); 131 _unicast.setBufsize(2000); 132 _broadcast.start(); 133 _unicast.start(); 134 _address = _unicast.getAddress(); 135 _broadcast.dispatch(false, PUBLISH_EVT, _address); 136 _started = true; 137 } 138 139 144 public boolean isStarted() { 145 return _started; 146 } 147 148 151 public void close() { 152 _broadcast.close(); 153 _unicast.close(); 154 _closed = true; 155 } 156 157 162 public boolean isClosed() { 163 return _closed; 164 } 165 166 169 public void dispatch(boolean alldomains, String type, Object data) 170 throws IOException { 171 _broadcast.dispatch(alldomains, type, data); 172 } 173 174 177 public void dispatch(ServerAddress addr, String type, Object data) 178 throws IOException { 179 _unicast.dispatch(addr, type, data); 180 } 181 182 187 public void dispatch(String type, Object data) throws IOException { 188 if(Log.isDebug()){ 189 Log.debug(getClass(), "Sending event " + type + " - " + data); 190 } 191 _broadcast.dispatch(_consumer.getDomainName().toString(), type, data); 192 } 193 194 199 public void addDiscoveryListener(DiscoveryListener listener) { 200 _discoListeners.add(listener); 201 } 202 203 215 public Response send(ServerAddress addr, String type, Object data) 216 throws IOException , TimeoutException { 217 return _unicast.send(addr, type, data); 218 } 219 220 230 public RespList send(String type, Object data) throws IOException { 231 return _unicast.send(_view.getHosts(), type, data); 232 } 233 234 240 public synchronized void registerAsyncListener(String type, 241 AsyncEventListener listener) { 242 _consumer.registerAsyncListener(type, listener); 243 } 244 245 254 public synchronized void registerSyncListener(String type, 255 SyncEventListener listener) throws ListenerAlreadyRegisteredException { 256 _consumer.registerSyncListener(type, listener); 257 } 258 259 264 public synchronized void unregisterListener(AsyncEventListener listener) { 265 _consumer.unregisterListener(listener); 266 } 267 268 273 public View getView() { 274 return _view; 275 } 276 277 280 public synchronized boolean containsAsyncListener(AsyncEventListener listener) { 281 return _consumer.containsAsyncListener(listener); 282 } 283 284 287 public synchronized boolean containsSyncListener(SyncEventListener listener) { 288 return _consumer.containsSyncListener(listener); 289 } 290 291 295 public void setBufsize(int size) { 296 _broadcast.setBufsize(size); 297 _unicast.setBufsize(size); 298 } 299 300 303 public String getNode() { 304 return _broadcast.getNode(); 305 } 306 307 public static class ChannelEventListener implements AsyncEventListener, 308 SocketTimeoutListener { 309 private EventChannel _owner; 310 311 ChannelEventListener(EventChannel channel) { 312 _owner = channel; 313 } 314 315 318 public void handleSoTimeout() { 319 _owner._view.removeDeadHosts(); 320 321 List siblings = _owner._view.getHosts(); 322 323 if (_owner._address != null) { 324 for (int i = 0; i < siblings.size(); i++) { 325 try { 326 _owner.dispatch((ServerAddress) siblings.get(i), HEARTBEAT_EVT, 328 _owner._address); 329 } catch (IOException e) { 330 e.printStackTrace(); 331 332 break; 333 } 334 } 335 } 336 } 337 338 341 public void onAsyncEvent(RemoteEvent evt) { 342 if (evt.getType().equals(DISCOVER_EVT)) { 343 ServerAddress addr; 344 345 try { 346 addr = (ServerAddress) evt.getData(); 347 if(addr == null){ 348 return; 349 } 350 _owner._view.addHost(addr, evt.getNode()); 351 352 List listeners = _owner._discoListeners; 353 354 for (int i = 0; i < listeners.size(); i++) { 355 ((DiscoveryListener) listeners.get(i)).onDiscovery(addr, evt); 356 } 357 } catch (IOException e) { 358 e.printStackTrace(); 359 } 360 } else if (evt.getType().equals(PUBLISH_EVT)) { 361 try { 362 ServerAddress addr = (ServerAddress) evt.getData(); 363 if(addr == null){ 364 return; 365 } 366 367 _owner._view.addHost(addr, evt.getNode()); 368 _owner.dispatch(false, DISCOVER_EVT, _owner._address); 369 370 List listeners = _owner._discoListeners; 371 372 for (int i = 0; i < listeners.size(); i++) { 373 ((DiscoveryListener) listeners.get(i)).onDiscovery(addr, evt); 374 } 375 } catch (IOException e) { 376 e.printStackTrace(); 377 } 378 } else if (evt.getType().equals(HEARTBEAT_EVT)) { 379 try { 380 ServerAddress addr = (ServerAddress) evt.getData(); 381 382 _owner._view.heartbeat(addr, evt.getNode()); 383 } catch (IOException e) { 384 e.printStackTrace(); 385 } 386 } 387 } 388 } 389 390 private void init(){ 391 String bufsizeStr = System.getProperty(Consts.MCAST_BUFSIZE_KEY); 392 if(bufsizeStr != null){ 393 try{ 394 int buf = Integer.parseInt(bufsizeStr); 395 if(buf > 0){ 396 _broadcast.setBufsize(buf); 397 _unicast.setBufsize(buf); 398 } 399 }catch(NumberFormatException e){ 400 } 402 } 403 404 String heartBeatTimeout = System.getProperty(Consts.MCAST_HEARTBEAT_TIMEOUT); 405 if(heartBeatTimeout != null){ 406 try{ 407 _view.setTimeout(Long.parseLong(heartBeatTimeout)); 408 }catch(NumberFormatException e){ 409 } 411 } 412 } 413 } 414 | Popular Tags |