1 18 19 package org.apache.jmeter.threads; 20 import java.util.Iterator ; 21 import java.util.List ; 22 23 import org.apache.commons.collections.Buffer; 24 import org.apache.commons.collections.BufferUtils; 25 import org.apache.commons.collections.UnboundedFifoBuffer; 26 import org.apache.jmeter.samplers.SampleEvent; 27 import org.apache.jmeter.samplers.SampleListener; 28 import org.apache.jorphan.logging.LoggingManager; 29 import org.apache.log.Logger; 30 31 48 public class ListenerNotifier 49 { 50 private static Logger log = LoggingManager.getLoggerForClass(); 51 52 55 private static final int SLEEP_TIME = 2000; 56 57 63 private boolean running = true; 64 65 69 private boolean isStopped = true; 70 71 79 private Buffer listenerEvents = 80 BufferUtils.synchronizedBuffer(new UnboundedFifoBuffer()); 81 82 83 84 89 public void stop() 90 { 91 running = false; 92 } 93 94 102 public boolean isStopped() 103 { 104 return isStopped; 105 } 106 107 119 public void run() 120 { 121 boolean isMaximumPriority = false; 122 int normalCount = 0; 123 124 while (running) 125 { 126 long startTime = System.currentTimeMillis(); 127 processNotifications(); 128 long sleep = SLEEP_TIME - (System.currentTimeMillis() - startTime); 129 130 if (!running) 132 { 133 break; 134 } 135 136 if (sleep < 0) 137 { 138 isMaximumPriority = true; 139 normalCount = 0; 140 if (log.isInfoEnabled()) 141 { 142 log.info("ListenerNotifier exceeded maximum " + 143 "notification time by " + (-sleep) + "ms"); 144 } 145 boostPriority(); 146 } 147 else 148 { 149 normalCount++; 150 151 if (isMaximumPriority && normalCount >= 3) 155 { 156 isMaximumPriority = false; 157 unboostPriority(); 158 } 159 160 if (log.isDebugEnabled()) 161 { 162 log.debug("ListenerNotifier sleeping for " + sleep + "ms"); 163 } 164 165 try 166 { 167 Thread.sleep(sleep); 168 } 169 catch (InterruptedException e) 170 { 171 } 172 } 173 } 174 175 processNotifications(); 178 isStopped = true; 179 } 180 181 187 private void processNotifications() 188 { 189 int listenerEventsSize = listenerEvents.size(); 190 if (log.isDebugEnabled()) 191 { 192 log.debug ("ListenerNotifier: processing " + listenerEventsSize + 193 " events"); 194 } 195 196 while (listenerEventsSize > 0) 197 { 198 SampleEvent res = (SampleEvent)listenerEvents.remove(); 205 List listeners = (List )listenerEvents.remove(); 206 207 notifyListeners (res, listeners); 208 209 listenerEventsSize -= 2; 210 } 211 } 212 213 218 private void boostPriority() 219 { 220 if (Thread.currentThread().getPriority() != Thread.MAX_PRIORITY) 221 { 222 log.info("ListenerNotifier: Boosting thread priority to maximum."); 223 Thread.currentThread().setPriority(Thread.MAX_PRIORITY); 224 } 225 } 226 227 231 private void unboostPriority() 232 { 233 if (Thread.currentThread().getPriority() != Thread.NORM_PRIORITY) 234 { 235 log.info("ListenerNotifier: Returning thread priority to normal."); 236 Thread.currentThread().setPriority(Thread.NORM_PRIORITY); 237 } 238 } 239 240 248 public void notifyListeners(SampleEvent res, List listeners) 249 { 250 Iterator iter = listeners.iterator(); 251 while (iter.hasNext()) 252 { 253 try { 254 ((SampleListener) iter.next()).sampleOccurred(res); 255 } catch (RuntimeException e) { 256 log.error("Detected problem in Listener: ",e); 257 log.info("Continuing to process further listeners"); 258 } 259 } 260 } 261 262 272 public void addLast(SampleEvent item, List listeners) 273 { 274 synchronized (listenerEvents) 277 { 278 listenerEvents.add(item); 279 listenerEvents.add(listeners); 280 } 281 } 282 } 283 | Popular Tags |