KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > async > impl > StageQueueImpl


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.async.impl;
6
7 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
8 import EDU.oswego.cs.dl.util.concurrent.Channel;
9 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
10
11 import com.tc.async.api.AddPredicate;
12 import com.tc.async.api.EventContext;
13 import com.tc.async.api.Sink;
14 import com.tc.async.api.Source;
15 import com.tc.exception.TCRuntimeException;
16 import com.tc.logging.TCLogger;
17 import com.tc.logging.TCLoggerProvider;
18 import com.tc.stats.Stats;
19 import com.tc.util.Assert;
20 import com.tc.util.State;
21
22 import java.util.Collection JavaDoc;
23 import java.util.Iterator JavaDoc;
24 import java.util.LinkedList JavaDoc;
25 import java.util.List JavaDoc;
26
27 /**
28  * The beginnings of an implementation of our SEDA like framework. This Is part of an impl for the queue
29  *
30  * @author steve
31  */

32 public class StageQueueImpl implements Sink, Source {
33
34   private static final State RUNNING = new State("RUNNING");
35   private static final State PAUSED = new State("PAUSED");
36
37   private final Channel queue;
38   private final String JavaDoc stage;
39   private final TCLogger logger;
40   private AddPredicate predicate = DefaultAddPredicate.getInstance();
41   private volatile State state = RUNNING;
42   private volatile StageQueueStatsCollector statsCollector;
43
44   public StageQueueImpl(TCLoggerProvider loggerProvider, String JavaDoc stage, Channel queue) {
45     this.queue = queue;
46     this.logger = loggerProvider.getLogger(Sink.class.getName() + ": " + stage);
47     this.stage = stage;
48     this.statsCollector = new NullStageQueueStatsCollector(stage);
49   }
50
51   /**
52    * The context will be added if the sink was found to be empty(at somepoint during the call). If the queue was not
53    * empty (at somepoint during the call) the context might not be added. This method should only be used where the
54    * stage threads are to be signaled on data availablity and the threads take care of getting data from elsewhere
55    */

56   public boolean addLossy(EventContext context) {
57     if (isEmpty()) {
58       add(context);
59       return true;
60     } else {
61       return false;
62     }
63   }
64
65   // XXX::Ugly hack since this method doesnt exist on the Channel interface
66
private boolean isEmpty() {
67     if (queue instanceof BoundedLinkedQueue) {
68       return ((BoundedLinkedQueue) queue).isEmpty();
69     } else if (queue instanceof LinkedQueue) {
70       return ((LinkedQueue) queue).isEmpty();
71     } else {
72       throw new AssertionError JavaDoc("Unsupported channel " + queue.getClass().getName() + " in " + getClass().getName());
73     }
74   }
75
76   public void addMany(Collection JavaDoc contexts) {
77     if (logger.isDebugEnabled()) logger.debug("Added many:" + contexts + " to:" + stage);
78     for (Iterator JavaDoc i = contexts.iterator(); i.hasNext();) {
79       add((EventContext) i.next());
80     }
81   }
82
83   public void add(EventContext context) {
84     Assert.assertNotNull(context);
85     if (state == PAUSED) {
86       logger.info("Ignoring event while PAUSED: " + context);
87       return;
88     }
89
90     if (logger.isDebugEnabled()) logger.debug("Added:" + context + " to:" + stage);
91     if (!predicate.accept(context)) {
92       if (logger.isDebugEnabled()) logger.debug("Predicate caused skip add for:" + context + " to:" + stage);
93       return;
94     }
95
96     statsCollector.contextAdded();
97     try {
98       queue.put(context);
99     } catch (InterruptedException JavaDoc e) {
100       // TODO Auto-generated catch block
101
e.printStackTrace();
102     }
103
104   }
105
106   public EventContext get() throws InterruptedException JavaDoc {
107     return poll(Long.MAX_VALUE);
108   }
109
110   public EventContext poll(long period) throws InterruptedException JavaDoc {
111     EventContext rv = (EventContext) queue.poll(period);
112     if (rv != null) statsCollector.contextRemoved();
113     return rv;
114   }
115
116   // Used for testing
117
public int size() {
118     if (queue instanceof BoundedLinkedQueue) {
119       return ((BoundedLinkedQueue) queue).size();
120     } else {
121       return 0;
122     }
123   }
124
125   public Collection JavaDoc getAll() throws InterruptedException JavaDoc {
126     List JavaDoc l = new LinkedList JavaDoc();
127     l.add(queue.take());
128     while (true) {
129       Object JavaDoc o = queue.poll(0);
130       if (o == null) {
131         // could be a little off
132
statsCollector.reset();
133         break;
134       } else {
135         l.add(o);
136       }
137     }
138     return l;
139   }
140
141   public void setAddPredicate(AddPredicate predicate) {
142     Assert.eval(predicate != null);
143     this.predicate = predicate;
144   }
145
146   public AddPredicate getPredicate() {
147     return predicate;
148   }
149
150   public String JavaDoc toString() {
151     return "StageQueue(" + stage + ")";
152   }
153
154   public void clear() {
155     try {
156       // XXX: poor man's clear.
157
int clearCount = 0;
158       while (poll(0) != null) { // calling this.poll() to get counter updated
159
/* supress no-body warning */
160         clearCount++;
161       }
162       statsCollector.reset();
163       logger.info("Cleared " + clearCount);
164     } catch (InterruptedException JavaDoc e) {
165       throw new TCRuntimeException(e);
166     }
167   }
168
169   public void pause(List JavaDoc pauseEvents) {
170     if (state != RUNNING) throw new AssertionError JavaDoc("Attempt to pause while not running: " + state);
171     state = PAUSED;
172     clear();
173     for (Iterator JavaDoc i = pauseEvents.iterator(); i.hasNext();) {
174       try {
175         queue.put(i.next());
176         statsCollector.contextAdded();
177       } catch (InterruptedException JavaDoc e) {
178         throw new TCRuntimeException(e);
179       }
180     }
181   }
182
183   public void unpause() {
184     if (state != PAUSED) throw new AssertionError JavaDoc("Attempt to unpause while not paused: " + state);
185     state = RUNNING;
186   }
187
188   /*********************************************************************************************************************
189    * Monitorable Interface
190    */

191
192   public void enableStatsCollection(boolean enable) {
193     if (enable) {
194       statsCollector = new StageQueueStatsCollectorImpl(stage);
195     } else {
196       statsCollector = new NullStageQueueStatsCollector(stage);
197     }
198   }
199
200   public Stats getStats(long frequency) {
201     return statsCollector;
202   }
203
204   public Stats getStatsAndReset(long frequency) {
205     return getStats(frequency);
206   }
207
208   public boolean isStatsCollectionEnabled() {
209     return statsCollector instanceof StageQueueStatsCollectorImpl;
210   }
211
212   public void resetStats() {
213     // NOP
214
}
215
216   public static abstract class StageQueueStatsCollector implements Stats {
217
218     public void logDetails(TCLogger statsLogger) {
219       statsLogger.info(getDetails());
220     }
221
222     public abstract void contextAdded();
223
224     public abstract void reset();
225
226     public abstract void contextRemoved();
227
228     protected String JavaDoc makeWidth(String JavaDoc name, int width) {
229       final int len = name.length();
230       if (len == width) { return name; }
231       if (len > width) { return name.substring(0, width); }
232
233       StringBuffer JavaDoc buf = new StringBuffer JavaDoc(name);
234       for (int i = len; i < width; i++) {
235         buf.append(' ');
236       }
237       return buf.toString();
238     }
239   }
240
241   public static class NullStageQueueStatsCollector extends StageQueueStatsCollector {
242
243     private String JavaDoc name;
244
245     public NullStageQueueStatsCollector(String JavaDoc stage) {
246       this.name = makeWidth(stage, 40);
247     }
248
249     public String JavaDoc getDetails() {
250       return name + " : Not Monitored";
251     }
252
253     public void contextAdded() {
254       // NOOP
255
}
256
257     public void contextRemoved() {
258       // NOOP
259
}
260
261     public void reset() {
262       // NOOP
263
}
264   }
265
266   public static class StageQueueStatsCollectorImpl extends StageQueueStatsCollector {
267
268     private int count = 0;
269     private String JavaDoc name;
270
271     public StageQueueStatsCollectorImpl(String JavaDoc stage) {
272       this.name = makeWidth(stage, 40);
273     }
274
275     public synchronized String JavaDoc getDetails() {
276       return name + " : " + count;
277     }
278
279     public synchronized void contextAdded() {
280       count++;
281     }
282
283     public synchronized void contextRemoved() {
284       count--;
285     }
286
287     public synchronized void reset() {
288       count = 0;
289     }
290   }
291
292 }
293
Popular Tags