KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > async > impl > StageImpl


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
3  */

4 package com.tc.async.impl;
5
6 import com.tc.async.api.ConfigurationContext;
7 import com.tc.async.api.EventContext;
8 import com.tc.async.api.EventHandler;
9 import com.tc.async.api.EventHandlerException;
10 import com.tc.async.api.Sink;
11 import com.tc.async.api.Source;
12 import com.tc.async.api.Stage;
13 import com.tc.exception.TCRuntimeException;
14 import com.tc.logging.TCLogger;
15 import com.tc.logging.TCLoggerProvider;
16
17 import java.util.ArrayList JavaDoc;
18 import java.util.List JavaDoc;
19
20 /**
21  * @author steve
22  */

23 public class StageImpl implements Stage {
24   private static final long pollTime = 3000; // This is the poor man's solution for stage
25
// shutdown
26
private static final EventContext PAUSE_TOKEN = new EventContext() {
27                                                   //
28
};
29
30   private final String JavaDoc name;
31   private final EventHandler handler;
32   private final Sink sink;
33   private final WorkerThread[] threads;
34   private final ThreadGroup JavaDoc group;
35   private final TCLogger logger;
36   private boolean isPaused = false;
37
38   public StageImpl(TCLoggerProvider loggerProvider, String JavaDoc name, EventHandler handler, Sink sink, int threadCount,
39                    ThreadGroup JavaDoc group) {
40     this.logger = loggerProvider.getLogger(Stage.class.getName() + ": " + name);
41     this.name = name;
42     this.handler = handler;
43     this.threads = new WorkerThread[threadCount];
44     this.sink = sink;
45     this.group = group;
46   }
47
48   public void destroy() {
49     stopThreads();
50   }
51
52   public void start(ConfigurationContext context) {
53     handler.initialize(context);
54     startThreads();
55   }
56
57   public Sink getSink() {
58     return sink;
59   }
60
61   public String JavaDoc getName() {
62     return name;
63   }
64
65   private synchronized void startThreads() {
66     for (int i = 0; i < threads.length; i++) {
67       threads[i] = new WorkerThread("WorkerThread(" + name + "," + i + ")", (Source) sink, handler, group);
68       threads[i].start();
69     }
70   }
71
72   private void stopThreads() {
73     for (int i = 0; i < threads.length; i++) {
74       threads[i].shutdown();
75     }
76   }
77
78   public String JavaDoc toString() {
79     return "StageImpl(" + name + ")";
80   }
81
82   private static class WorkerThread extends Thread JavaDoc {
83     private static final com.tc.util.State RUNNING = new com.tc.util.State("RUNNING");
84     private static final com.tc.util.State PAUSED = new com.tc.util.State("PAUSED");
85
86     private com.tc.util.State state;
87     private final Source source;
88     private final EventHandler handler;
89     private boolean shutdownRequested = false;
90
91     public WorkerThread(String JavaDoc name, Source source, EventHandler handler, ThreadGroup JavaDoc group) {
92       super(group, name);
93       setDaemon(true);
94       this.source = source;
95       this.handler = handler;
96     }
97
98     public synchronized void shutdown() {
99       this.shutdownRequested = true;
100     }
101
102     private synchronized boolean shutdownRequested() {
103       return this.shutdownRequested;
104     }
105
106     synchronized void pause() {
107       while (state != PAUSED) {
108         try {
109           wait();
110         } catch (InterruptedException JavaDoc e) {
111           throw new TCRuntimeException(e);
112         }
113       }
114     }
115
116     synchronized void unpause() {
117       if (state != PAUSED) throw new AssertionError JavaDoc("Attempt to unpause when not paused: " + state);
118       state = RUNNING;
119       notifyAll();
120     }
121
122     public void run() {
123       state = RUNNING;
124       while (!shutdownRequested()) {
125         EventContext ctxt;
126         try {
127           ctxt = source.poll(pollTime);
128           if (ctxt == PAUSE_TOKEN) {
129             synchronized (this) {
130               state = PAUSED;
131               notifyAll();
132               while (state == PAUSED) {
133                 wait();
134               }
135             }
136           } else if (ctxt != null) {
137             //XXX:: This synchronization is there to create proper memory boundary.
138
synchronized (ctxt) {
139               handler.logOnEnter(ctxt);
140               try {
141                 handler.handleEvent(ctxt);
142               } finally {
143                 handler.logOnExit(ctxt);
144               }
145             }
146           }
147         } catch (InterruptedException JavaDoc ie) {
148           if (shutdownRequested()) { return; }
149           throw new TCRuntimeException(ie);
150         } catch (EventHandlerException ie) {
151           if (shutdownRequested()) return;
152           throw new TCRuntimeException(ie);
153         } finally {
154           // Agressively null out the reference before going around the loop again. If you don't do this, the reference
155
// to the context will exist until another context comes in. This can potentially keep many objects in memory
156
// longer than necessary
157
ctxt = null;
158         }
159       }
160     }
161   }
162
163   public void pause() {
164     if (isPaused) throw new AssertionError JavaDoc("Attempt to pause while already paused.");
165
166     log("Pausing...");
167
168     List JavaDoc pauseTokens = new ArrayList JavaDoc(threads.length);
169     for (int i = 0; i < threads.length; i++) {
170       pauseTokens.add(PAUSE_TOKEN);
171     }
172     sink.pause(pauseTokens);
173     for (int i = 0; i < threads.length; i++) {
174       threads[i].pause();
175     }
176     isPaused = true;
177     log("Paused.");
178   }
179
180   public void unpause() {
181     if (!isPaused) throw new AssertionError JavaDoc("Attempt to unpause while not paused.");
182     log("Unpausing...");
183
184     sink.unpause();
185     for (int i = 0; i < threads.length; i++) {
186       threads[i].unpause();
187     }
188
189     isPaused = false;
190     log("Unpaused.");
191   }
192
193   private void log(Object JavaDoc msg) {
194     logger.info("Stage " + name + ": " + msg);
195   }
196 }
Popular Tags