1 package org.sapia.ubik.mcast; 2 3 import org.sapia.ubik.rmi.server.Log; 4 import org.sapia.ubik.rmi.server.UIDGenerator; 5 6 import java.lang.ref.SoftReference ; 7 8 import java.net.InetAddress ; 9 import java.net.UnknownHostException ; 10 11 import java.util.*; 12 13 14 31 public class EventConsumer { 32 private static int _count; 33 private static Random _rand = new Random(); 34 private Map _asyncListenersByEvent = new HashMap(); 35 private Map _syncListenersByEvent = new HashMap(); 36 private Map _reverseMap = new WeakHashMap(); 37 private DomainName _domain; 38 private String _node; 39 40 44 public EventConsumer(String node, String domain) { 45 _domain = DomainName.parse(domain); 46 _node = node; 47 if(Log.isDebug()){ 48 Log.debug(getClass(), "Starting node: " + node + "@" + domain); 49 } 50 } 51 52 56 public EventConsumer(String domain) throws UnknownHostException { 57 this(new String ("" + 58 (InetAddress.getLocalHost().getHostAddress().hashCode() ^ 59 new Object ().hashCode() ^ UIDGenerator.createdUID())) + "_" + 60 _rand.nextLong(), domain); 61 } 62 63 68 public String getNode() { 69 return _node; 70 } 71 72 77 public DomainName getDomainName() { 78 return _domain; 79 } 80 81 87 public synchronized void registerAsyncListener(String evtType, 88 AsyncEventListener listener) { 89 List lst = getAsyncListenersFor(evtType, true); 90 91 if (!contains(lst, listener)) { 92 lst.add(new SoftReference (listener)); 93 _reverseMap.put(listener, evtType); 94 } 95 else{ 96 Log.info(getClass(), "A listener is already registered for: " + evtType); 97 } 98 } 99 100 106 public synchronized void registerSyncListener(String evtType, 107 SyncEventListener listener) throws ListenerAlreadyRegisteredException { 108 if (_syncListenersByEvent.get(evtType) != null) { 109 throw new ListenerAlreadyRegisteredException(evtType); 110 } 111 112 _syncListenersByEvent.put(evtType, new SoftReference (listener)); 113 _reverseMap.put(listener, evtType); 114 } 115 116 121 public synchronized void unregisterListener(SyncEventListener listener) { 122 String evtId = (String ) _reverseMap.remove(listener); 123 124 if (evtId != null) { 125 _syncListenersByEvent.remove(evtId); 126 } 127 } 128 129 134 public synchronized void unregisterListener(AsyncEventListener listener) { 135 String evtId = (String ) _reverseMap.remove(listener); 136 137 if (evtId != null) { 138 List lst = getAsyncListenersFor(evtId, false); 139 140 if (lst != null) { 141 SoftReference contained; 142 AsyncEventListener instance; 143 144 for (int i = 0; i < lst.size(); i++) { 145 contained = (SoftReference ) lst.get(i); 146 147 if ((instance = (AsyncEventListener) contained.get()) == null) { 148 lst.remove(i); 149 i--; 150 151 continue; 152 } 153 154 if (contained.get().equals(instance)) { 155 lst.remove(i); 156 157 break; 158 } 159 } 160 } 161 } 162 } 163 164 170 public boolean containsAsyncListener(AsyncEventListener listener) { 171 String type = (String ) _reverseMap.get(listener); 172 173 if (type != null) { 174 List listeners = (List) _asyncListenersByEvent.get(type); 175 176 return contains(listeners, listener); 177 } 178 179 return false; 180 } 181 182 189 public boolean hasSyncListener(String evtType) { 190 return _syncListenersByEvent.get(evtType) != null; 191 } 192 193 201 public boolean containsSyncListener(SyncEventListener listener) { 202 return _reverseMap.get(listener) != null; 203 } 204 205 209 public int getCount() { 210 return _reverseMap.size(); 211 } 212 213 protected void onAsyncEvent(RemoteEvent evt) { 214 DomainName dn = null; 215 216 if(Log.isDebug()){ 217 Log.debug(getClass(), "Received remote event: " + evt.getType() + "@" + evt.getNode() + "@" + evt.getDomainName()); 218 Log.debug(getClass(), "Event from this node: " + evt.getNode().equals(_node)); 219 } 220 221 if (evt.getDomainName() != null) { 222 dn = DomainName.parse(evt.getDomainName()); 223 } 224 225 if (matchesAll(dn, evt.getNode())) { 226 if(Log.isDebug()){ 227 Log.debug(getClass(), "Notifying..."); 228 } 229 notifyAsyncListeners(evt); 230 } else if (matchesThis(dn, evt.getNode())) { 231 if(Log.isDebug()){ 232 Log.debug(getClass(), "Notifying..."); 233 } 234 notifyAsyncListeners(evt); 235 } 236 else { 237 if(Log.isDebug()) 238 Log.debug(getClass(), "Event was not matched: " + evt.getType()); 239 240 } 241 } 242 243 protected boolean matchesAll(DomainName dn, String node){ 244 return dn == null && node != null && 245 !node.equals(_node); 246 } 247 248 protected boolean matchesThis(DomainName dn, String node){ 249 return (dn != null) && _domain.contains(dn) && 250 (node != null) && 251 !node.equals(_node); 252 } 253 254 protected synchronized Object onSyncEvent(RemoteEvent evt) { 255 DomainName dn = null; 256 257 if(Log.isDebug()){ 258 Log.debug(getClass(), "Received remote event: " + evt.getType() + "@" + evt.getDomainName()); 259 Log.debug(getClass(), "Event from this node: " + evt.getNode().equals(_node)); 260 } 261 262 if (evt.getDomainName() != null) { 263 dn = DomainName.parse(evt.getDomainName()); 264 } 265 266 if ((dn == null) && (evt.getNode() != null) && 267 !evt.getNode().equals(_node)) { 268 SyncEventListener sync = (SyncEventListener) ((SoftReference ) _syncListenersByEvent.get(evt.getType())).get(); 269 270 if (sync != null) { 271 if(Log.isDebug()){ 272 Log.debug(getClass(), "Dispatching sync event to: " + sync); 273 } 274 return sync.onSyncEvent(evt); 275 } else { 276 Log.debug(getClass(), "No listener for event: " + evt.getType()); 277 _syncListenersByEvent.remove(evt.getType()); 278 } 279 } else if ((dn != null) && _domain.contains(dn) && (evt.getNode() != null) && 280 !evt.getNode().equals(_node)) { 281 SyncEventListener sync = (SyncEventListener) ((SoftReference ) _syncListenersByEvent.get(evt.getType())).get(); 282 283 if (sync != null) { 284 if(Log.isDebug()){ 285 Log.debug(getClass(), "Dispatching sync event to: " + sync); 286 } 287 return sync.onSyncEvent(evt); 288 } else { 289 Log.debug(getClass(), "No listener for event: " + evt.getType()); 290 _syncListenersByEvent.remove(evt.getType()); 291 } 292 } 293 294 return null; 295 } 296 297 private synchronized void notifyAsyncListeners(RemoteEvent evt) { 298 List lst = getAsyncListenersFor(evt.getType(), false); 299 AsyncEventListener listener; 300 301 if (lst != null) { 302 if(lst.size() == 0){ 303 if(Log.isDebug()) 304 Log.debug(getClass(), "No listener for event: " + evt.getType()); 305 } 306 for (int i = 0; i < lst.size(); i++) { 307 listener = (AsyncEventListener) ((SoftReference ) lst.get(i)).get(); 308 if (listener == null) { 309 Log.debug(getClass(), "Async listener reference is null for: " + evt.getType()); 310 lst.remove(i); 311 i--; 312 313 continue; 314 } 315 Log.debug(getClass(), "Notifying async listener for: " + evt.getType() + " -> " + listener); 316 listener.onAsyncEvent(evt); 317 } 318 } 319 else{ 320 Log.debug(getClass(), "No async listeners for: " + evt.getType()); 321 } 322 } 323 324 private List getAsyncListenersFor(String evtId, boolean create) { 325 List lst = (List) _asyncListenersByEvent.get(evtId); 326 327 if ((lst == null) && create) { 328 lst = new ArrayList(); 329 _asyncListenersByEvent.put(evtId, lst); 330 } 331 332 return lst; 333 } 334 335 protected boolean contains(List listeners, AsyncEventListener listener) { 336 if (listeners != null) { 337 SoftReference contained; 338 AsyncEventListener instance; 339 340 for (int i = 0; i < listeners.size(); i++) { 341 contained = (SoftReference ) listeners.get(i); 342 343 if ((instance = (AsyncEventListener) contained.get()) == null) { 344 listeners.remove(i); 345 i--; 346 347 continue; 348 } 349 350 if (contained.get().equals(listener)) { 351 return true; 352 } 353 } 354 } 355 356 return false; 357 } 358 359 private static synchronized int inc() { 360 return _count++; 361 } 362 } 363 | Popular Tags |