1 10 11 package org.mule.impl.internal.notifications; 12 13 import java.util.ArrayList ; 14 import java.util.Iterator ; 15 import java.util.List ; 16 import java.util.Map ; 17 18 import javax.resource.spi.work.Work ; 19 import javax.resource.spi.work.WorkException ; 20 import javax.resource.spi.work.WorkListener ; 21 import javax.resource.spi.work.WorkManager ; 22 23 import org.apache.commons.logging.Log; 24 import org.apache.commons.logging.LogFactory; 25 import org.mule.MuleManager; 26 import org.mule.config.i18n.Message; 27 import org.mule.config.i18n.Messages; 28 import org.mule.routing.filters.WildcardFilter; 29 import org.mule.umo.lifecycle.Disposable; 30 import org.mule.umo.lifecycle.LifecycleException; 31 import org.mule.umo.manager.UMOServerNotification; 32 import org.mule.umo.manager.UMOServerNotificationListener; 33 import org.mule.umo.manager.UMOWorkManager; 34 35 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 36 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; 37 import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; 38 39 43 public class ServerNotificationManager implements Work , Disposable 44 { 45 48 protected static final Log logger = LogFactory.getLog(ServerNotificationManager.class); 49 50 public static final String NULL_SUBSCRIPTION = "NULL"; 51 52 private Map eventsMap = null; 53 private LinkedBlockingQueue eventQueue; 54 private boolean disposed = false; 55 private List listeners; 56 private WorkListener workListener; 57 58 public ServerNotificationManager() 59 { 60 init(); 61 } 62 63 private synchronized void init() 64 { 65 eventsMap = new ConcurrentHashMap(); 67 eventQueue = new LinkedBlockingQueue(); 68 listeners = new CopyOnWriteArrayList(); 69 workListener = MuleManager.getConfiguration().getWorkListener(); 70 } 71 72 public void start(UMOWorkManager workManager) throws LifecycleException 73 { 74 try 75 { 76 workManager.scheduleWork(this, WorkManager.INDEFINITE, null, workListener); 77 } 78 catch (WorkException e) 79 { 80 throw new LifecycleException(e, this); 81 } 82 } 83 84 public void registerEventType(Class eventType, Class listenerType) 85 { 86 if (UMOServerNotification.class.isAssignableFrom(eventType)) 87 { 88 if (!eventsMap.containsKey(listenerType)) 89 { 90 eventsMap.put(listenerType, eventType); 92 if (logger.isDebugEnabled()) 93 { 94 logger.debug("Registered event type: " + eventType); 95 logger.debug("Binding listener type '" + listenerType + "' to event type '" + eventType 96 + "'"); 97 } 98 } 99 } 100 else 101 { 102 throw new IllegalArgumentException (new Message( 103 Messages.PROPERTY_X_IS_NOT_SUPPORTED_TYPE_X_IT_IS_TYPE_X, "eventType", 104 UMOServerNotification.class.getName(), eventType.getName()).getMessage()); 105 } 106 } 107 108 public void registerListener(UMOServerNotificationListener listener) throws NotificationException 109 { 110 registerListener(listener, null); 111 } 112 113 public void registerListener(UMOServerNotificationListener listener, String subscription) 114 throws NotificationException 115 { 116 listeners.add(new Listener (listener, subscription)); 117 } 118 119 public void unregisterListener(UMOServerNotificationListener listener) 120 { 121 for (Iterator iterator = listeners.iterator(); iterator.hasNext();) 122 { 123 Listener l = (Listener )iterator.next(); 124 if (l.getListenerObject().equals(listener)) 125 { 126 listeners.remove(l); 127 break; 128 } 129 } 130 } 131 132 public void clear() 133 { 134 listeners.clear(); 135 init(); 136 } 137 138 public void fireEvent(UMOServerNotification notification) 139 { 140 if (disposed) 141 { 142 return; 143 } 144 145 if (notification instanceof BlockingServerEvent) 146 { 147 notifyListeners(notification); 148 return; 149 } 150 try 151 { 152 153 eventQueue.put(notification); 154 155 } 156 catch (InterruptedException e) 157 { 158 logger.error("Failed to queue notification: " + notification, e); 159 } 160 } 161 162 public void dispose() 163 { 164 disposed = true; 165 clear(); 166 } 167 168 173 protected void notifyListeners(UMOServerNotification notification) 174 { 175 if (disposed) 176 { 177 return; 178 } 179 for (Iterator iterator = listeners.iterator(); iterator.hasNext();) 180 { 181 Listener listener = (Listener )iterator.next(); 182 if (listener.matches(notification)) 183 { 184 listener.getListenerObject().onNotification(notification); 185 } 186 } 187 } 188 189 public void release() 190 { 191 dispose(); 192 } 193 194 203 public void run() 204 { 205 UMOServerNotification notification = null; 206 while (!disposed) 207 { 208 try 209 { 210 notification = (UMOServerNotification)eventQueue.take(); 211 if (notification != null) 212 { 213 notifyListeners(notification); 214 } 215 } 216 catch (InterruptedException e) 217 { 218 if (!disposed) 219 { 220 logger.error("Failed to take notificationication from server notificationication queue", 221 e); 222 } 223 } 224 } 225 } 226 227 protected class Listener 228 { 229 private final UMOServerNotificationListener listener; 230 private final List notificationClasses; 231 private final String subscription; 232 private final WildcardFilter subscriptionFilter; 233 234 public Listener(UMOServerNotificationListener listener, String subscription) 235 { 236 this.listener = listener; 237 this.subscription = (subscription == null ? NULL_SUBSCRIPTION : subscription); 238 239 subscriptionFilter = new WildcardFilter(this.subscription); 240 subscriptionFilter.setCaseSensitive(false); 241 242 notificationClasses = new ArrayList (); 243 244 for (Iterator iterator = eventsMap.keySet().iterator(); iterator.hasNext();) 245 { 246 Class clazz = (Class )iterator.next(); 247 if (clazz.isAssignableFrom(listener.getClass())) 248 { 249 notificationClasses.add(eventsMap.get(clazz)); 250 } 251 } 252 } 253 254 public UMOServerNotificationListener getListenerObject() 255 { 256 return listener; 257 } 258 259 public List getNotificationClasses() 260 { 261 return notificationClasses; 262 } 263 264 public String getSubscription() 265 { 266 return subscription; 267 } 268 269 public boolean matches(UMOServerNotification notification) 270 { 271 if (subscriptionMatches(notification)) 272 { 273 for (Iterator iterator = notificationClasses.iterator(); iterator.hasNext();) 274 { 275 Class notificationClass = (Class )iterator.next(); 276 if (notificationClass.isAssignableFrom(notification.getClass())) 277 { 278 return true; 279 } 280 } 281 } 282 283 return false; 284 } 285 286 public boolean subscriptionMatches(UMOServerNotification notification) 287 { 288 String resourceId = notification.getResourceIdentifier(); 289 if (NULL_SUBSCRIPTION.equals(subscription) || subscriptionFilter.accept(resourceId)) 290 { 291 return true; 292 } 293 else 294 { 295 return false; 296 } 297 } 298 } 299 300 public WorkListener getWorkListener() 301 { 302 return workListener; 303 } 304 305 public void setWorkListener(WorkListener workListener) 306 { 307 if (workListener == null) 308 { 309 throw new NullPointerException ("workListener"); 310 } 311 this.workListener = workListener; 312 } 313 } 314 | Popular Tags |