1 10 11 package org.mule.impl.space; 12 13 import java.util.Iterator ; 14 import java.util.List ; 15 16 import org.apache.commons.logging.Log; 17 import org.apache.commons.logging.LogFactory; 18 import org.mule.MuleManager; 19 import org.mule.umo.UMOTransactionFactory; 20 import org.mule.umo.space.UMOSpace; 21 import org.mule.umo.space.UMOSpaceEvent; 22 import org.mule.umo.space.UMOSpaceEventListener; 23 import org.mule.umo.space.UMOSpaceException; 24 25 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; 26 27 31 public abstract class AbstractSpace implements UMOSpace 32 { 33 34 37 protected transient Log logger = LogFactory.getLog(getClass()); 38 39 protected List listeners = new CopyOnWriteArrayList(); 40 protected List monitorListeners = new CopyOnWriteArrayList(); 41 protected String name; 42 protected boolean enableMonitorEvents; 43 protected UMOTransactionFactory transactionFactory = null; 44 45 protected AbstractSpace(String name) 46 { 47 this(name, true); 48 } 49 50 protected AbstractSpace(String name, boolean enableMonitorEvents) 51 { 52 super(); 53 this.name = name; 54 this.enableMonitorEvents = enableMonitorEvents; 55 fireMonitorEvent(SpaceMonitorNotification.SPACE_CREATED, this); 56 } 57 58 public void addListener(UMOSpaceEventListener listener) 59 { 60 listeners.add(listener); 61 fireMonitorEvent(SpaceMonitorNotification.SPACE_LISTENER_ADDED, listener); 62 } 63 64 public void removeListener(UMOSpaceEventListener listener) 65 { 66 listeners.remove(listener); 67 fireMonitorEvent(SpaceMonitorNotification.SPACE_LISTENER_REMOVED, listener); 68 } 69 70 public void addMonitorListener(SpaceMonitorNotificationListener listener) 71 { 72 if (!enableMonitorEvents) 73 { 74 logger.warn("Space monitor notifications for " + name + " space are currently disabled"); 75 } 76 monitorListeners.add(listener); 77 } 78 79 public void removeMonitorListener(SpaceMonitorNotificationListener listener) 80 { 81 listeners.remove(listener); 82 } 83 84 public String getName() 85 { 86 return name; 87 } 88 89 public final void put(Object value) throws UMOSpaceException 90 { 91 doPut(value); 92 fireListeners(); 93 fireMonitorEvent(SpaceMonitorNotification.SPACE_ITEM_ADDED, value); 94 } 95 96 public void put(Object value, long lease) throws UMOSpaceException 97 { 98 if (logger.isTraceEnabled()) 99 { 100 logger.trace("Writing value to space: " + name + ", with lease: " + lease + ", Value is: " 101 + value); 102 } 103 doPut(value, lease); 104 fireListeners(); 105 fireMonitorEvent(SpaceMonitorNotification.SPACE_ITEM_ADDED, value); 106 } 107 108 public Object take() throws UMOSpaceException 109 { 110 Object item = doTake(); 111 112 if (item == null) 113 { 114 if (logger.isTraceEnabled()) 115 { 116 logger.trace("Taking from space: " + name + " returned null"); 117 } 118 fireMonitorEvent(SpaceMonitorNotification.SPACE_ITEM_MISS, item); 119 } 120 else 121 { 122 if (logger.isTraceEnabled()) 123 { 124 logger.trace("Taking from space: " + name + " returned:" + item); 125 } 126 fireMonitorEvent(SpaceMonitorNotification.SPACE_ITEM_REMOVED, item); 127 } 128 return item; 129 } 130 131 public Object take(long timeout) throws UMOSpaceException 132 { 133 Object item = doTake(timeout); 134 if (item == null) 135 { 136 if (logger.isTraceEnabled()) 137 { 138 logger.trace("Taking from space (timeout " + timeout + "): returned null"); 139 } 140 fireMonitorEvent(SpaceMonitorNotification.SPACE_ITEM_MISS, item); 141 } 142 else 143 { 144 fireMonitorEvent(SpaceMonitorNotification.SPACE_ITEM_REMOVED, item); 145 if (logger.isTraceEnabled()) 146 { 147 logger.trace("Taking from space (timeout " + timeout + "): " + name + " returned:" + item); 148 } 149 } 150 return item; 151 } 152 153 public Object takeNoWait() throws UMOSpaceException 154 { 155 Object item = doTakeNoWait(); 156 if (item == null) 157 { 158 fireMonitorEvent(SpaceMonitorNotification.SPACE_ITEM_MISS, item); 159 if (logger.isTraceEnabled()) 160 { 161 logger.trace("Taking from space (no wait): " + name + " returned: null"); 162 } 163 } 164 else 165 { 166 fireMonitorEvent(SpaceMonitorNotification.SPACE_ITEM_REMOVED, item); 167 if (logger.isTraceEnabled()) 168 { 169 logger.trace("Taking from space (no wait): " + name + " returned:" + item); 170 } 171 } 172 return item; 173 } 174 175 protected void fireListeners() 176 { 177 if (listeners.size() > 0) 178 { 179 Object item = null; 180 try 181 { 182 item = takeNoWait(); 183 } 184 catch (UMOSpaceException e) 185 { 186 logger.error(e.getMessage(), e); 187 } 188 if (item == null) 189 { 190 logger.warn("Item was taken before listeners could be updated, try using a different type of space"); 191 return; 192 } 193 for (Iterator iterator = listeners.iterator(); iterator.hasNext();) 194 { 195 UMOSpaceEventListener spaceEventListener = (UMOSpaceEventListener)iterator.next(); 196 spaceEventListener.onEvent(new UMOSpaceEvent(item, this)); 197 } 198 } 199 } 200 201 protected void fireMonitorEvent(int action, Object item) 202 { 203 if (enableMonitorEvents) 204 { 205 MuleManager.getInstance().fireNotification(new SpaceMonitorNotification(this, action, item)); 206 } 207 } 208 209 public void dispose() 210 { 211 doDispose(); 212 fireMonitorEvent(SpaceMonitorNotification.SPACE_DISPOSED, this); 213 } 214 215 public UMOTransactionFactory getTransactionFactory() 216 { 217 return transactionFactory; 218 } 219 220 public void setTransactionFactory(UMOTransactionFactory transactionFactory) 221 { 222 this.transactionFactory = transactionFactory; 223 } 224 225 protected abstract void doPut(Object value) throws UMOSpaceException; 226 227 protected abstract void doPut(Object value, long lease) throws UMOSpaceException; 228 229 protected abstract Object doTake() throws UMOSpaceException; 230 231 protected abstract Object doTake(long timeout) throws UMOSpaceException; 232 233 protected abstract Object doTakeNoWait() throws UMOSpaceException; 234 235 protected abstract void doDispose(); 236 } 237 | Popular Tags |