1 29 package jegg.impl; 30 31 import java.util.HashMap ; 32 import java.util.HashSet ; 33 import java.util.Iterator ; 34 import java.util.Map ; 35 import java.util.Set ; 36 37 import jegg.Message; 38 import jegg.btree.BinaryTree; 39 40 import org.apache.commons.logging.Log; 41 import org.apache.commons.logging.LogFactory; 42 43 60 public class Dispatcher extends Thread 61 { 62 68 69 private static Log LOG = LogFactory.getLog(Dispatcher.class); 70 71 private static final Map _dispatchers = new HashMap (); 72 73 public static final String DEFAULT_DISPATCHER_NAME = "default-dispatcher"; 74 75 76 public static final Dispatcher DEFAULT_DISPATCHER; 77 78 static 79 { 80 DEFAULT_DISPATCHER = new Dispatcher(DEFAULT_DISPATCHER_NAME); 81 _dispatchers.put(DEFAULT_DISPATCHER_NAME, DEFAULT_DISPATCHER); 82 } 83 84 85 private Set _scheduled = new HashSet (); 86 87 private BinaryTree _arrayPool = new BinaryTree(); 88 89 public static Iterator iterator() 90 { 91 return _dispatchers.values().iterator(); 92 } 93 94 public static Dispatcher getDispatcher(final String name, final boolean start) 95 { 96 String id = (null == name) ? DEFAULT_DISPATCHER_NAME : name; 97 Dispatcher d = (Dispatcher)_dispatchers.get(id); 98 if (null == d) 99 { 100 d = new Dispatcher(id); 101 _dispatchers.put(id,d); 102 } 103 104 if (start && !d.isAlive()) 105 d.start(); 106 107 return d; 108 } 109 110 public static Dispatcher getAnonymousDispatcher() 111 { 112 return new Dispatcher(); 113 } 114 115 private Dispatcher() 116 { 117 super(); 118 } 119 120 124 private Dispatcher(final String name) 125 { 126 super(name); 127 } 128 129 135 static Dispatcher getDefaultScheduler() 136 { 137 return DEFAULT_DISPATCHER; 138 } 139 140 public void start() 141 { 142 if (LOG.isDebugEnabled()) 143 LOG.debug("starting "+getName()); 144 super.start(); 145 } 146 150 final void add(final EggShell e) 151 { 152 LOG.trace("trace"); 153 synchronized (_scheduled) 154 { 155 _scheduled.add(e); 156 _scheduled.notify(); 157 } 158 } 159 164 final void remove(final EggShell e) 165 { 166 LOG.trace("trace"); 167 synchronized (_scheduled) 168 { 169 _scheduled.remove(e); 170 _scheduled.notify(); 171 } 172 } 173 174 179 public final void run() 180 { 181 LOG.trace("run(): starting"); 182 183 while (!interrupted()) 184 { 185 Object [] eggs = null; 186 int arraySize = 0; 187 188 while (null == eggs || 0 == eggs.length) 189 { 190 synchronized (this) 191 { 192 synchronized (_scheduled) 193 { 194 if (!_scheduled.isEmpty()) 195 { 196 eggs = getArray(_scheduled.size()); 197 arraySize = _scheduled.size(); 198 _scheduled.toArray(eggs); 199 } 200 } 201 202 if (null == eggs || 0 == arraySize) 203 { 204 try 205 { 206 this.wait(); 207 } 208 catch (InterruptedException e) 209 { 210 return; 211 } 212 } 213 } 214 } 215 216 boolean startOver = false; 217 218 synchronized (this) 219 { 220 int num = 0; 221 while (0 == num) 222 { 223 for (int i = 0; i < arraySize; ++i) 224 { 225 EggShell egg = (EggShell) eggs[i]; 226 num += egg.getNumPendingMessages(); 227 } 228 229 if (0 == num) 230 { 231 try 232 { 233 this.wait(); startOver = true; 235 break; 236 } 237 catch (InterruptedException e) 238 { 239 return; 240 } 241 } 242 } 243 } 244 245 if (startOver) 246 { 247 releaseArray(eggs); 248 continue; 249 } 250 251 for (int i = 0; i < arraySize; ++i) 252 { 253 if (interrupted()) 254 { 255 return; 256 } 257 EggShell egg = (EggShell) eggs[i]; 258 Message message = egg.getNextMessage(); 259 if (null != message) 260 { 261 egg.dispatch(message); 262 } 263 } 264 releaseArray(eggs); 265 } 267 } 269 Object [] getArray(int size) 270 { 271 Object [] o = (Object []) _arrayPool.getAtLeast(size); 272 if (null == o) 273 { 274 o = new Object [size]; 275 } 276 else 277 { 278 _arrayPool.insert(o.length, null); 279 } 280 return o; 281 } 282 283 private void releaseArray(Object [] array) 284 { 285 _arrayPool.insert(array.length, array); 286 } 287 288 } 289 | Popular Tags |