1 5 6 package com.sun.corba.se.impl.orbutil.threadpool; 7 8 import java.util.LinkedList ; 9 10 import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool; 11 import com.sun.corba.se.spi.orbutil.threadpool.Work; 12 import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue; 13 14 import com.sun.corba.se.impl.orbutil.ORBConstants; 15 import com.sun.corba.se.impl.orbutil.threadpool.ThreadPoolImpl; 16 17 import com.sun.corba.se.spi.monitoring.MonitoringConstants; 18 import com.sun.corba.se.spi.monitoring.MonitoringFactories; 19 import com.sun.corba.se.spi.monitoring.MonitoredObject; 20 import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase; 21 22 public class WorkQueueImpl implements WorkQueue 23 { 24 private ThreadPool workerThreadPool; 25 private LinkedList theWorkQueue = new LinkedList (); 26 private long workItemsAdded = 0; 27 28 private long workItemsDequeued = 1; 30 31 private long totalTimeInQueue = 0; 32 33 private String name; 35 36 private MonitoredObject workqueueMonitoredObject; 38 39 public WorkQueueImpl() { 40 name=ORBConstants.WORKQUEUE_DEFAULT_NAME; 41 initializeMonitoring(); 42 } 43 44 public WorkQueueImpl(ThreadPool workerThreadPool) { 45 this(workerThreadPool, ORBConstants.WORKQUEUE_DEFAULT_NAME); 46 } 47 48 public WorkQueueImpl(ThreadPool workerThreadPool, String name) { 49 this.workerThreadPool = workerThreadPool; 50 this.name = name; 51 initializeMonitoring(); 52 } 53 54 private void initializeMonitoring() { 56 workqueueMonitoredObject = MonitoringFactories. 57 getMonitoredObjectFactory(). 58 createMonitoredObject(name, 59 MonitoringConstants.WORKQUEUE_MONITORING_DESCRIPTION); 60 61 LongMonitoredAttributeBase b1 = new 62 LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED, 63 MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED_DESCRIPTION) { 64 public Object getValue() { 65 return new Long (WorkQueueImpl.this.totalWorkItemsAdded()); 66 } 67 }; 68 workqueueMonitoredObject.addAttribute(b1); 69 LongMonitoredAttributeBase b2 = new 70 LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE, 71 MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE_DESCRIPTION) { 72 public Object getValue() { 73 return new Long (WorkQueueImpl.this.workItemsInQueue()); 74 } 75 }; 76 workqueueMonitoredObject.addAttribute(b2); 77 LongMonitoredAttributeBase b3 = new 78 LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE, 79 MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE_DESCRIPTION) { 80 public Object getValue() { 81 return new Long (WorkQueueImpl.this.averageTimeInQueue()); 82 } 83 }; 84 workqueueMonitoredObject.addAttribute(b3); 85 } 86 87 88 MonitoredObject getMonitoredObject() { 91 return workqueueMonitoredObject; 92 } 93 94 public void addWork(Work work) { 95 synchronized (this) { 96 workItemsAdded++; 97 work.setEnqueueTime(System.currentTimeMillis()); 98 theWorkQueue.addLast(work); 99 ((ThreadPoolImpl)workerThreadPool).notifyForAvailableWork(this); 100 } 101 } 102 103 Work requestWork(long waitTime) 104 throws TimeoutException, InterruptedException 105 { 106 Work workItem; 107 synchronized (this) { 108 if (theWorkQueue.size() != 0) { 109 workItem = (Work)theWorkQueue.removeFirst(); 110 totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime(); 111 workItemsDequeued++; 112 return workItem; 113 } 114 115 try { 116 117 long remainingWaitTime = waitTime; 118 long finishTime = System.currentTimeMillis() + waitTime; 119 120 do { 121 122 this.wait(remainingWaitTime); 123 124 if (theWorkQueue.size() != 0) { 125 workItem = (Work)theWorkQueue.removeFirst(); 126 totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime(); 127 workItemsDequeued++; 128 return workItem; 129 } 130 131 remainingWaitTime = finishTime - System.currentTimeMillis(); 132 133 } while (remainingWaitTime > 0); 134 135 throw new TimeoutException(); 136 137 } catch (InterruptedException ie) { 138 throw ie; 139 } 140 } 141 } 142 143 public void setThreadPool(ThreadPool workerThreadPool) { 144 this.workerThreadPool = workerThreadPool; 145 } 146 147 public ThreadPool getThreadPool() { 148 return workerThreadPool; 149 } 150 151 156 public long totalWorkItemsAdded() { 157 return workItemsAdded; 158 } 159 160 165 public int workItemsInQueue() { 166 return theWorkQueue.size(); 167 } 168 169 public synchronized long averageTimeInQueue() { 170 return (totalTimeInQueue/workItemsDequeued); 171 } 172 173 public String getName() { 174 return name; 175 } 176 } 177 178 | Popular Tags |