1 4 package com.tctest.performance.generate.load; 5 6 import com.tc.util.Assert; 7 8 import java.util.concurrent.atomic.AtomicInteger ; 9 10 14 final class LoadBuffer { 15 16 private static final long NANOSEC = 1000000000L; 17 private final Object [] buffer; 18 private final boolean[] isMetronome; 19 private int in, out, count; 20 private final LatchTimer latch; 21 private volatile boolean beginClockCycle; 22 private volatile boolean illegalState; 23 private long clock, nanotime; 24 25 LoadBuffer(int size) { 26 Assert.assertTrue(size > 0); 27 buffer = new Object [size]; 28 isMetronome = new boolean[size]; 29 latch = new LatchTimer(this); 30 latch.start(); 31 } 32 33 synchronized boolean put(Object obj, boolean metronome) throws InterruptedException { 34 while (count == buffer.length) 35 wait(); 36 37 if (illegalState) return false; 38 39 buffer[in] = obj; 40 isMetronome[in] = metronome; 41 ++count; 42 in = (in + 1) % buffer.length; 43 notifyAll(); 44 return true; 45 } 46 47 synchronized Object get() throws InterruptedException { 48 pageClock(); 49 while (count == 0 || !latch.isOpen || (isMetronome[out] && !beginClockCycle)) 50 wait(); 51 52 if (beginClockCycle && !isMetronome[out]) latch.adjustIntervals(); 53 beginClockCycle = false; 54 55 Object obj = buffer[out]; 56 buffer[out] = null; 57 58 if (isMetronome[out]) { 59 Metronome metronome = new Metronome(obj); 60 obj = metronome; 61 isMetronome[out] = false; 62 } 63 64 --count; 65 out = (out + 1) % buffer.length; 66 notifyAll(); 67 68 return obj; 69 } 70 71 private void pageClock() { 72 if ((nanotime = System.nanoTime()) > clock) { 73 clock = nanotime + NANOSEC; 74 beginClockCycle = true; 75 } 76 } 77 78 private class LatchTimer extends Thread { 79 80 private static final int DELAY = 1000; 81 private static final int RATIO = 100; 82 private final Object parent; 83 private final AtomicInteger openInterval, closeInterval, intervalCount; 84 private volatile boolean isOpen = false; 85 private int intervalRatio = 11; 86 87 LatchTimer(Object parent) { 88 setDaemon(true); 89 setPriority(Thread.MAX_PRIORITY); 90 this.parent = parent; 91 openInterval = new AtomicInteger (0); 92 closeInterval = new AtomicInteger (0); 93 intervalCount = new AtomicInteger (0); 94 adjustIntervals(); 95 } 96 97 private void adjustIntervals() { 98 System.err.println("Adjusting Load Buffer Latch Intervals (" + (intervalRatio - 1) + ")"); 99 if (--intervalRatio == 1) { 100 System.err.println("Load Buffer Latch Open, CPU Bound"); 101 openInterval.getAndSet(DELAY); 102 } else if (intervalRatio == 0) { 103 illegalState = true; 104 isOpen = false; 105 return; 106 } 107 closeInterval.getAndSet(new Double (Math.floor(intervalRatio * DELAY / (2 * RATIO))).intValue()); 108 openInterval.getAndSet(new Double (Math.floor((DELAY / intervalRatio) - closeInterval.intValue())).intValue()); 109 intervalCount.getAndSet(0); 110 } 111 112 public void run() { 113 while (!illegalState) { 114 try { 115 isOpen = true; 116 if ((intervalCount.incrementAndGet() * RATIO) == DELAY) { 117 intervalCount.getAndSet(0); 118 } 119 synchronized (parent) { 120 pageClock(); 121 parent.notifyAll(); 122 } 123 Thread.sleep(openInterval.intValue()); 124 125 if (openInterval.intValue() != DELAY) { 126 isOpen = false; 127 synchronized (parent) { 128 parent.notifyAll(); 129 } 130 Thread.sleep(closeInterval.intValue()); 131 } 132 } catch (InterruptedException e) { 133 Thread.currentThread().interrupt(); 134 } 135 } 136 if (illegalState) isOpen = false; 137 } 138 } 139 } 140 | Popular Tags |