1 5 package com.tc.async.impl; 6 7 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue; 8 import EDU.oswego.cs.dl.util.concurrent.Channel; 9 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 10 11 import com.tc.async.api.AddPredicate; 12 import com.tc.async.api.EventContext; 13 import com.tc.async.api.Sink; 14 import com.tc.async.api.Source; 15 import com.tc.exception.TCRuntimeException; 16 import com.tc.logging.TCLogger; 17 import com.tc.logging.TCLoggerProvider; 18 import com.tc.stats.Stats; 19 import com.tc.util.Assert; 20 import com.tc.util.State; 21 22 import java.util.Collection ; 23 import java.util.Iterator ; 24 import java.util.LinkedList ; 25 import java.util.List ; 26 27 32 public class StageQueueImpl implements Sink, Source { 33 34 private static final State RUNNING = new State("RUNNING"); 35 private static final State PAUSED = new State("PAUSED"); 36 37 private final Channel queue; 38 private final String stage; 39 private final TCLogger logger; 40 private AddPredicate predicate = DefaultAddPredicate.getInstance(); 41 private volatile State state = RUNNING; 42 private volatile StageQueueStatsCollector statsCollector; 43 44 public StageQueueImpl(TCLoggerProvider loggerProvider, String stage, Channel queue) { 45 this.queue = queue; 46 this.logger = loggerProvider.getLogger(Sink.class.getName() + ": " + stage); 47 this.stage = stage; 48 this.statsCollector = new NullStageQueueStatsCollector(stage); 49 } 50 51 56 public boolean addLossy(EventContext context) { 57 if (isEmpty()) { 58 add(context); 59 return true; 60 } else { 61 return false; 62 } 63 } 64 65 private boolean isEmpty() { 67 if (queue instanceof BoundedLinkedQueue) { 68 return ((BoundedLinkedQueue) queue).isEmpty(); 69 } else if (queue instanceof LinkedQueue) { 70 return ((LinkedQueue) queue).isEmpty(); 71 } else { 72 throw new AssertionError ("Unsupported channel " + queue.getClass().getName() + " in " + getClass().getName()); 73 } 74 } 75 76 public void addMany(Collection contexts) { 77 if (logger.isDebugEnabled()) logger.debug("Added many:" + contexts + " to:" + stage); 78 for (Iterator i = contexts.iterator(); i.hasNext();) { 79 add((EventContext) i.next()); 80 } 81 } 82 83 public void add(EventContext context) { 84 Assert.assertNotNull(context); 85 if (state == PAUSED) { 86 logger.info("Ignoring event while PAUSED: " + context); 87 return; 88 } 89 90 if (logger.isDebugEnabled()) logger.debug("Added:" + context + " to:" + stage); 91 if (!predicate.accept(context)) { 92 if (logger.isDebugEnabled()) logger.debug("Predicate caused skip add for:" + context + " to:" + stage); 93 return; 94 } 95 96 statsCollector.contextAdded(); 97 try { 98 queue.put(context); 99 } catch (InterruptedException e) { 100 e.printStackTrace(); 102 } 103 104 } 105 106 public EventContext get() throws InterruptedException { 107 return poll(Long.MAX_VALUE); 108 } 109 110 public EventContext poll(long period) throws InterruptedException { 111 EventContext rv = (EventContext) queue.poll(period); 112 if (rv != null) statsCollector.contextRemoved(); 113 return rv; 114 } 115 116 public int size() { 118 if (queue instanceof BoundedLinkedQueue) { 119 return ((BoundedLinkedQueue) queue).size(); 120 } else { 121 return 0; 122 } 123 } 124 125 public Collection getAll() throws InterruptedException { 126 List l = new LinkedList (); 127 l.add(queue.take()); 128 while (true) { 129 Object o = queue.poll(0); 130 if (o == null) { 131 statsCollector.reset(); 133 break; 134 } else { 135 l.add(o); 136 } 137 } 138 return l; 139 } 140 141 public void setAddPredicate(AddPredicate predicate) { 142 Assert.eval(predicate != null); 143 this.predicate = predicate; 144 } 145 146 public AddPredicate getPredicate() { 147 return predicate; 148 } 149 150 public String toString() { 151 return "StageQueue(" + stage + ")"; 152 } 153 154 public void clear() { 155 try { 156 int clearCount = 0; 158 while (poll(0) != null) { 160 clearCount++; 161 } 162 statsCollector.reset(); 163 logger.info("Cleared " + clearCount); 164 } catch (InterruptedException e) { 165 throw new TCRuntimeException(e); 166 } 167 } 168 169 public void pause(List pauseEvents) { 170 if (state != RUNNING) throw new AssertionError ("Attempt to pause while not running: " + state); 171 state = PAUSED; 172 clear(); 173 for (Iterator i = pauseEvents.iterator(); i.hasNext();) { 174 try { 175 queue.put(i.next()); 176 statsCollector.contextAdded(); 177 } catch (InterruptedException e) { 178 throw new TCRuntimeException(e); 179 } 180 } 181 } 182 183 public void unpause() { 184 if (state != PAUSED) throw new AssertionError ("Attempt to unpause while not paused: " + state); 185 state = RUNNING; 186 } 187 188 191 192 public void enableStatsCollection(boolean enable) { 193 if (enable) { 194 statsCollector = new StageQueueStatsCollectorImpl(stage); 195 } else { 196 statsCollector = new NullStageQueueStatsCollector(stage); 197 } 198 } 199 200 public Stats getStats(long frequency) { 201 return statsCollector; 202 } 203 204 public Stats getStatsAndReset(long frequency) { 205 return getStats(frequency); 206 } 207 208 public boolean isStatsCollectionEnabled() { 209 return statsCollector instanceof StageQueueStatsCollectorImpl; 210 } 211 212 public void resetStats() { 213 } 215 216 public static abstract class StageQueueStatsCollector implements Stats { 217 218 public void logDetails(TCLogger statsLogger) { 219 statsLogger.info(getDetails()); 220 } 221 222 public abstract void contextAdded(); 223 224 public abstract void reset(); 225 226 public abstract void contextRemoved(); 227 228 protected String makeWidth(String name, int width) { 229 final int len = name.length(); 230 if (len == width) { return name; } 231 if (len > width) { return name.substring(0, width); } 232 233 StringBuffer buf = new StringBuffer (name); 234 for (int i = len; i < width; i++) { 235 buf.append(' '); 236 } 237 return buf.toString(); 238 } 239 } 240 241 public static class NullStageQueueStatsCollector extends StageQueueStatsCollector { 242 243 private String name; 244 245 public NullStageQueueStatsCollector(String stage) { 246 this.name = makeWidth(stage, 40); 247 } 248 249 public String getDetails() { 250 return name + " : Not Monitored"; 251 } 252 253 public void contextAdded() { 254 } 256 257 public void contextRemoved() { 258 } 260 261 public void reset() { 262 } 264 } 265 266 public static class StageQueueStatsCollectorImpl extends StageQueueStatsCollector { 267 268 private int count = 0; 269 private String name; 270 271 public StageQueueStatsCollectorImpl(String stage) { 272 this.name = makeWidth(stage, 40); 273 } 274 275 public synchronized String getDetails() { 276 return name + " : " + count; 277 } 278 279 public synchronized void contextAdded() { 280 count++; 281 } 282 283 public synchronized void contextRemoved() { 284 count--; 285 } 286 287 public synchronized void reset() { 288 count = 0; 289 } 290 } 291 292 } 293 | Popular Tags |