1 4 package com.tc.async.impl; 5 6 import com.tc.async.api.ConfigurationContext; 7 import com.tc.async.api.EventContext; 8 import com.tc.async.api.EventHandler; 9 import com.tc.async.api.EventHandlerException; 10 import com.tc.async.api.Sink; 11 import com.tc.async.api.Source; 12 import com.tc.async.api.Stage; 13 import com.tc.exception.TCRuntimeException; 14 import com.tc.logging.TCLogger; 15 import com.tc.logging.TCLoggerProvider; 16 17 import java.util.ArrayList ; 18 import java.util.List ; 19 20 23 public class StageImpl implements Stage { 24 private static final long pollTime = 3000; private static final EventContext PAUSE_TOKEN = new EventContext() { 27 }; 29 30 private final String name; 31 private final EventHandler handler; 32 private final Sink sink; 33 private final WorkerThread[] threads; 34 private final ThreadGroup group; 35 private final TCLogger logger; 36 private boolean isPaused = false; 37 38 public StageImpl(TCLoggerProvider loggerProvider, String name, EventHandler handler, Sink sink, int threadCount, 39 ThreadGroup group) { 40 this.logger = loggerProvider.getLogger(Stage.class.getName() + ": " + name); 41 this.name = name; 42 this.handler = handler; 43 this.threads = new WorkerThread[threadCount]; 44 this.sink = sink; 45 this.group = group; 46 } 47 48 public void destroy() { 49 stopThreads(); 50 } 51 52 public void start(ConfigurationContext context) { 53 handler.initialize(context); 54 startThreads(); 55 } 56 57 public Sink getSink() { 58 return sink; 59 } 60 61 public String getName() { 62 return name; 63 } 64 65 private synchronized void startThreads() { 66 for (int i = 0; i < threads.length; i++) { 67 threads[i] = new WorkerThread("WorkerThread(" + name + "," + i + ")", (Source) sink, handler, group); 68 threads[i].start(); 69 } 70 } 71 72 private void stopThreads() { 73 for (int i = 0; i < threads.length; i++) { 74 threads[i].shutdown(); 75 } 76 } 77 78 public String toString() { 79 return "StageImpl(" + name + ")"; 80 } 81 82 private static class WorkerThread extends Thread { 83 private static final com.tc.util.State RUNNING = new com.tc.util.State("RUNNING"); 84 private static final com.tc.util.State PAUSED = new com.tc.util.State("PAUSED"); 85 86 private com.tc.util.State state; 87 private final Source source; 88 private final EventHandler handler; 89 private boolean shutdownRequested = false; 90 91 public WorkerThread(String name, Source source, EventHandler handler, ThreadGroup group) { 92 super(group, name); 93 setDaemon(true); 94 this.source = source; 95 this.handler = handler; 96 } 97 98 public synchronized void shutdown() { 99 this.shutdownRequested = true; 100 } 101 102 private synchronized boolean shutdownRequested() { 103 return this.shutdownRequested; 104 } 105 106 synchronized void pause() { 107 while (state != PAUSED) { 108 try { 109 wait(); 110 } catch (InterruptedException e) { 111 throw new TCRuntimeException(e); 112 } 113 } 114 } 115 116 synchronized void unpause() { 117 if (state != PAUSED) throw new AssertionError ("Attempt to unpause when not paused: " + state); 118 state = RUNNING; 119 notifyAll(); 120 } 121 122 public void run() { 123 state = RUNNING; 124 while (!shutdownRequested()) { 125 EventContext ctxt; 126 try { 127 ctxt = source.poll(pollTime); 128 if (ctxt == PAUSE_TOKEN) { 129 synchronized (this) { 130 state = PAUSED; 131 notifyAll(); 132 while (state == PAUSED) { 133 wait(); 134 } 135 } 136 } else if (ctxt != null) { 137 synchronized (ctxt) { 139 handler.logOnEnter(ctxt); 140 try { 141 handler.handleEvent(ctxt); 142 } finally { 143 handler.logOnExit(ctxt); 144 } 145 } 146 } 147 } catch (InterruptedException ie) { 148 if (shutdownRequested()) { return; } 149 throw new TCRuntimeException(ie); 150 } catch (EventHandlerException ie) { 151 if (shutdownRequested()) return; 152 throw new TCRuntimeException(ie); 153 } finally { 154 ctxt = null; 158 } 159 } 160 } 161 } 162 163 public void pause() { 164 if (isPaused) throw new AssertionError ("Attempt to pause while already paused."); 165 166 log("Pausing..."); 167 168 List pauseTokens = new ArrayList (threads.length); 169 for (int i = 0; i < threads.length; i++) { 170 pauseTokens.add(PAUSE_TOKEN); 171 } 172 sink.pause(pauseTokens); 173 for (int i = 0; i < threads.length; i++) { 174 threads[i].pause(); 175 } 176 isPaused = true; 177 log("Paused."); 178 } 179 180 public void unpause() { 181 if (!isPaused) throw new AssertionError ("Attempt to unpause while not paused."); 182 log("Unpausing..."); 183 184 sink.unpause(); 185 for (int i = 0; i < threads.length; i++) { 186 threads[i].unpause(); 187 } 188 189 isPaused = false; 190 log("Unpaused."); 191 } 192 193 private void log(Object msg) { 194 logger.info("Stage " + name + ": " + msg); 195 } 196 } | Popular Tags |