KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > sun > corba > se > impl > orbutil > threadpool > WorkQueueImpl


1 /*
2  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
3  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
4  */

5
6 package com.sun.corba.se.impl.orbutil.threadpool;
7
8 import java.util.LinkedList JavaDoc;
9
10 import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
11 import com.sun.corba.se.spi.orbutil.threadpool.Work;
12 import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;
13
14 import com.sun.corba.se.impl.orbutil.ORBConstants;
15 import com.sun.corba.se.impl.orbutil.threadpool.ThreadPoolImpl;
16
17 import com.sun.corba.se.spi.monitoring.MonitoringConstants;
18 import com.sun.corba.se.spi.monitoring.MonitoringFactories;
19 import com.sun.corba.se.spi.monitoring.MonitoredObject;
20 import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;
21
22 public class WorkQueueImpl implements WorkQueue
23 {
24     private ThreadPool workerThreadPool;
25     private LinkedList JavaDoc theWorkQueue = new LinkedList JavaDoc();
26     private long workItemsAdded = 0;
27
28     // Initialized to 1 to avoid divide by zero in averageTimeInQueue()
29
private long workItemsDequeued = 1;
30
31     private long totalTimeInQueue = 0;
32
33     // Name of the work queue
34
private String JavaDoc name;
35
36     // MonitoredObject for work queue
37
private MonitoredObject workqueueMonitoredObject;
38
39     public WorkQueueImpl() {
40     name=ORBConstants.WORKQUEUE_DEFAULT_NAME;
41     initializeMonitoring();
42     }
43
44     public WorkQueueImpl(ThreadPool workerThreadPool) {
45         this(workerThreadPool, ORBConstants.WORKQUEUE_DEFAULT_NAME);
46     }
47
48     public WorkQueueImpl(ThreadPool workerThreadPool, String JavaDoc name) {
49         this.workerThreadPool = workerThreadPool;
50     this.name = name;
51     initializeMonitoring();
52     }
53
54     // Setup monitoring for this workqueue
55
private void initializeMonitoring() {
56     workqueueMonitoredObject = MonitoringFactories.
57                 getMonitoredObjectFactory().
58                 createMonitoredObject(name,
59                 MonitoringConstants.WORKQUEUE_MONITORING_DESCRIPTION);
60
61     LongMonitoredAttributeBase b1 = new
62         LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED,
63             MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED_DESCRIPTION) {
64         public Object JavaDoc getValue() {
65             return new Long JavaDoc(WorkQueueImpl.this.totalWorkItemsAdded());
66         }
67         };
68     workqueueMonitoredObject.addAttribute(b1);
69     LongMonitoredAttributeBase b2 = new
70         LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE,
71             MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE_DESCRIPTION) {
72         public Object JavaDoc getValue() {
73             return new Long JavaDoc(WorkQueueImpl.this.workItemsInQueue());
74         }
75         };
76     workqueueMonitoredObject.addAttribute(b2);
77     LongMonitoredAttributeBase b3 = new
78         LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE,
79             MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE_DESCRIPTION) {
80         public Object JavaDoc getValue() {
81             return new Long JavaDoc(WorkQueueImpl.this.averageTimeInQueue());
82         }
83         };
84     workqueueMonitoredObject.addAttribute(b3);
85     }
86
87
88     // Package private method to get the monitored object for this
89
// class
90
MonitoredObject getMonitoredObject() {
91     return workqueueMonitoredObject;
92     }
93  
94     public void addWork(Work work) {
95         synchronized (this) {
96             workItemsAdded++;
97             work.setEnqueueTime(System.currentTimeMillis());
98             theWorkQueue.addLast(work);
99         ((ThreadPoolImpl)workerThreadPool).notifyForAvailableWork(this);
100         }
101     }
102
103     Work requestWork(long waitTime)
104         throws TimeoutException, InterruptedException JavaDoc
105     {
106         Work workItem;
107         synchronized (this) {
108             if (theWorkQueue.size() != 0) {
109                 workItem = (Work)theWorkQueue.removeFirst();
110                 totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime();
111                 workItemsDequeued++;
112                 return workItem;
113             }
114
115             try {
116
117                 long remainingWaitTime = waitTime;
118                 long finishTime = System.currentTimeMillis() + waitTime;
119
120                 do {
121
122                     this.wait(remainingWaitTime);
123
124                     if (theWorkQueue.size() != 0) {
125                         workItem = (Work)theWorkQueue.removeFirst();
126                         totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime();
127                         workItemsDequeued++;
128                         return workItem;
129                     }
130
131                     remainingWaitTime = finishTime - System.currentTimeMillis();
132
133                 } while (remainingWaitTime > 0);
134
135                 throw new TimeoutException();
136
137             } catch (InterruptedException JavaDoc ie) {
138                 throw ie;
139             }
140         }
141     }
142     
143     public void setThreadPool(ThreadPool workerThreadPool) {
144         this.workerThreadPool = workerThreadPool;
145     }
146
147     public ThreadPool getThreadPool() {
148         return workerThreadPool;
149     }
150
151     /**
152      * Returns the total number of Work items added to the Queue.
153      * This method is unsynchronized and only gives a snapshot of the
154      * state when it is called
155      */

156     public long totalWorkItemsAdded() {
157         return workItemsAdded;
158     }
159
160     /**
161      * Returns the total number of Work items in the Queue to be processed
162      * This method is unsynchronized and only gives a snapshot of the
163      * state when it is called
164      */

165     public int workItemsInQueue() {
166         return theWorkQueue.size();
167     }
168     
169     public synchronized long averageTimeInQueue() {
170         return (totalTimeInQueue/workItemsDequeued);
171     }
172
173     public String JavaDoc getName() {
174         return name;
175     }
176 }
177
178 // End of file.
179
Popular Tags