1 4 package com.tc.async.impl; 5 6 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue; 7 import EDU.oswego.cs.dl.util.concurrent.Channel; 8 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 9 10 import com.tc.async.api.ConfigurationContext; 11 import com.tc.async.api.EventHandler; 12 import com.tc.async.api.Stage; 13 import com.tc.async.api.StageManager; 14 import com.tc.async.api.StageMonitor; 15 import com.tc.logging.DefaultLoggerProvider; 16 import com.tc.logging.TCLogger; 17 import com.tc.logging.TCLoggerProvider; 18 import com.tc.properties.TCPropertiesImpl; 19 import com.tc.stats.Stats; 20 import com.tc.text.StringFormatter; 21 import com.tc.util.Assert; 22 import com.tc.util.concurrent.ThreadUtil; 23 24 import java.util.Arrays ; 25 import java.util.Collections ; 26 import java.util.HashMap ; 27 import java.util.Iterator ; 28 import java.util.LinkedList ; 29 import java.util.List ; 30 import java.util.Map ; 31 32 35 public class StageManagerImpl implements StageManager { 36 37 private static final String STAGE_MONITOR = "tc.stage.monitor.enabled"; 38 private static final String STAGE_MONITOR_DELAY = "tc.stage.monitor.delay"; 39 40 private static final boolean MONITOR = TCPropertiesImpl.getProperties().getBoolean(STAGE_MONITOR); 41 private static final long MONITOR_DELAY = TCPropertiesImpl.getProperties().getLong(STAGE_MONITOR_DELAY); 42 43 private Map stages = new HashMap (); 44 private TCLoggerProvider loggerProvider; 45 private final ThreadGroup group; 46 private String [] stageNames = new String [] {}; 47 48 public StageManagerImpl(ThreadGroup threadGroup) { 49 this.loggerProvider = new DefaultLoggerProvider(); 50 this.group = threadGroup; 51 52 if (MONITOR) { 53 startMonitor(); 54 } 55 } 56 57 private void startMonitor() { 58 final TCLogger logger = loggerProvider.getLogger(getClass()); 59 Thread t = new Thread ("SEDA Stage Monitor") { 60 public void run() { 61 while (true) { 62 printStats(); 63 ThreadUtil.reallySleep(MONITOR_DELAY); 64 } 65 } 66 67 private void printStats() { 68 try { 69 Stats stats[] = StageManagerImpl.this.getStats(); 70 logger.info("Stage Depths"); 71 logger.info("================================="); 72 for (int i = 0; i < stats.length; i++) { 73 stats[i].logDetails(logger); 74 } 75 } catch (Throwable th) { 76 logger.error(th); 77 } 78 } 79 }; 80 t.setDaemon(true); 81 t.start(); 82 } 83 84 public void setLoggerProvider(TCLoggerProvider loggerProvider) { 85 this.loggerProvider = loggerProvider; 86 } 87 88 public synchronized Stage createStage(String name, EventHandler handler, int threads, int maxSize) { 89 Channel q = maxSize > 0 ? (Channel) new BoundedLinkedQueue(maxSize) : new LinkedQueue(); 90 Stage s = new StageImpl(loggerProvider, name, handler, new StageQueueImpl(loggerProvider, name, q), threads, group); 91 addStage(name, s); 92 return s; 93 } 94 95 private synchronized void addStage(String name, Stage s) { 96 Object prev = stages.put(name, s); 97 Assert.assertNull(prev); 98 s.getSink().enableStatsCollection(MONITOR); 99 stageNames = (String []) stages.keySet().toArray(new String [stages.size()]); 100 Arrays.sort(stageNames); 101 } 102 103 public void startStage(Stage stage, ConfigurationContext context) { 104 stage.start(context); 105 } 106 107 public synchronized void startAll(ConfigurationContext context) { 108 for (Iterator i = stages.values().iterator(); i.hasNext();) { 109 Stage s = (Stage) i.next(); 110 s.start(context); 111 } 112 } 113 114 public void stopStage(Stage stage) { 115 stage.destroy(); 116 } 117 118 public synchronized void stopAll() { 119 for (Iterator i = stages.values().iterator(); i.hasNext();) { 120 Stage s = (Stage) i.next(); 121 s.destroy(); 122 } 123 } 124 125 public synchronized Stage getStage(String name) { 126 return (Stage) stages.get(name); 127 } 128 129 public synchronized Stats[] getStats() { 130 final String [] names = stageNames; 131 final Stats[] stats = new Stats[names.length]; 132 133 for (int i = 0; i < names.length; i++) { 134 stats[i] = getStage(names[i]).getSink().getStats(MONITOR_DELAY); 135 } 136 return stats; 137 } 138 139 static class StageMonitors { 140 141 private final List monitors = Collections.synchronizedList(new LinkedList ()); 142 private final StringFormatter formatter = new StringFormatter(); 143 144 StageMonitors(final TCLogger logger) { 145 return; 146 } 147 148 public StageMonitor newStageMonitor(String name) { 149 return new NullStageMonitor(); 150 } 151 152 public String toString() { 153 StringBuffer buf = new StringBuffer (); 154 buf.append("StageMonitors").append(formatter.newline()); 155 for (Iterator i = Collections.unmodifiableList(monitors).iterator(); i.hasNext();) { 156 buf.append(((StageMonitorImpl) i.next()).dumpAndFlush()).append(formatter.newline()); 157 } 158 return buf.toString(); 159 } 160 } 161 162 } 163 | Popular Tags |