1 4 package com.tctest.performance.generate.load; 5 6 import com.tc.util.Assert; 7 8 import java.util.LinkedList ; 9 import java.util.List ; 10 11 15 final class MonitoredWorkQueue { 16 17 private static final int TIMEOUT = 5000; 18 private final Object [] queue; 19 private final long[] intervalStartTime; 20 private final List points; 21 private int count, first, last; 22 private long intervalMagnitude; 23 private final Object intervalLock = new Object (); 24 private volatile boolean overflow; 25 private final int quarterCapacity; 26 private final boolean keepMetronome; 27 28 private int waitCount; 30 31 MonitoredWorkQueue(int size, boolean keepMetronome) { 32 Assert.assertTrue(size > 0); 33 this.queue = new Object [size]; 34 this.intervalStartTime = new long[size]; 35 this.points = new LinkedList (); 36 this.keepMetronome = keepMetronome; 37 this.quarterCapacity = size / 4; 38 } 39 40 synchronized void put(Object obj) { 41 if (count == quarterCapacity) { 42 System.err.println("WORK QUEUE HAS REACHED 25% CAPACITY - NOW WOULD BE A GOOD TIME FOR A THREAD DUMP"); 43 if (waitCount++ < 2) { 44 System.err.println("PAUSE (2 seconds)"); 45 try { 46 Thread.sleep(1000 * 2); 47 } catch (InterruptedException e) { 48 throw new RuntimeException (e); 49 } 50 } 51 } 52 if (count == queue.length) { 53 overflow = true; 54 while (true) 55 try { 56 wait(); 57 } catch (InterruptedException e) { 58 } 60 } 61 62 boolean startTimer = false; 63 if (obj instanceof Metronome) { 64 if (!keepMetronome) obj = ((Metronome) obj).object; 65 startTimer = true; 66 } 67 68 queue[last] = obj; 69 last = (last + 1) % queue.length; 70 count++; 71 if (startTimer) intervalStartTime[last] = System.nanoTime(); 72 notifyAll(); 73 } 74 75 synchronized Object get() throws InterruptedException , WorkQueueOverflowException { 76 if (overflow) throw new WorkQueueOverflowException(); 77 long waitTimeout; 78 while (count == 0) { 80 waitTimeout = System.currentTimeMillis(); 81 wait(TIMEOUT); 82 if ((System.currentTimeMillis() - waitTimeout) >= TIMEOUT) return null; } 84 85 Object obj = queue[first]; 86 queue[first] = null; 87 count--; 88 first = (first + 1) % queue.length; 89 90 if (intervalStartTime[first] != 0L) { 92 long waitTime = System.nanoTime() - intervalStartTime[first]; 93 synchronized (points) { 94 points.add(new Measurement(intervalMagnitude, waitTime)); 95 } 96 synchronized (intervalLock) { 97 if (keepMetronome) ((Metronome) obj).load = intervalMagnitude; 98 intervalMagnitude = 0; 99 intervalStartTime[first] = 0L; 100 } 101 } 102 synchronized (intervalLock) { 103 intervalMagnitude++; 104 } 105 106 return obj; 107 } 108 109 Measurement[] getWaitTimes() { 110 synchronized (points) { 111 return (Measurement[]) points.toArray(new Measurement[0]); 112 } 113 } 114 } 115 | Popular Tags |