1 package org.jacorb.poa; 2 3 22 23 import java.util.*; 24 25 import org.apache.avalon.framework.logger.Logger; 26 import org.apache.avalon.framework.configuration.*; 27 28 import org.jacorb.orb.dsi.ServerRequest; 29 import org.jacorb.poa.except.ResourceLimitReachedException; 30 import org.jacorb.poa.util.StringPair; 31 32 38 public class RequestQueue 39 implements Configurable 40 { 41 private RequestQueueListener queueListener; 42 private RequestController controller; 43 44 45 private org.jacorb.config.Configuration configuration = null; 46 private Logger logger; 47 private int queueMin; 48 private int queueMax; 49 private boolean queueWait; 50 51 private boolean configured = false; 52 53 private List queue = 54 new ArrayList (POAConstants.QUEUE_CAPACITY_INI); 55 57 private RequestQueue() 58 { 59 } 60 61 protected RequestQueue(RequestController controller) 62 { 63 this.controller = controller; 64 } 65 66 public void configure(Configuration myConfiguration) 67 throws ConfigurationException 68 { 69 this.configuration = (org.jacorb.config.Configuration)myConfiguration; 70 logger = configuration.getNamedLogger("jacorb.poa.queue"); 71 queueMax = configuration.getAttributeAsInteger("jacorb.poa.queue_max", 100); 72 queueMin = configuration.getAttributeAsInteger("jacorb.poa.queue_min", 10); 73 queueWait = configuration.getAttributeAsBoolean("jacorb.poa.queue_wait",false); 74 configured = true; 75 } 76 77 88 89 protected synchronized void add(ServerRequest request) 90 throws ResourceLimitReachedException 91 { 92 if (!configured) 93 throw new Error ("RQ.add(): not configured!"); 94 95 if (queue.size() >= queueMax ) 96 { 97 if (logger.isWarnEnabled()) 98 { 99 logger.warn("Request queue is full, consider increasing " 100 + "jacorb.poa.queue_max (currently: " 101 + queueMax + ")"); 102 } 103 104 if ( queueWait ) 105 { 106 while (queue.size() > queueMin ) 107 { 108 try 109 { 110 this.wait(); 111 } 112 catch (InterruptedException ex) 113 { 114 } 116 } 117 } 118 else 119 { 120 throw new ResourceLimitReachedException(); 121 } 122 } 123 queue.add(request); 124 125 if (queue.size() == 1) 126 { 127 controller.continueToWork(); 128 } 129 130 if (logger.isDebugEnabled()) 131 { 132 logger.debug("rid: " + request.requestId() + 133 " opname: " + request.operation() + 134 " is queued (queue size: " + queue.size() + ")"); 135 } 136 137 if (queueListener != null) 139 queueListener.requestAddedToQueue(request, queue.size()); 140 } 141 142 protected synchronized void addRequestQueueListener(RequestQueueListener listener) 143 { 144 if (!configured) 145 throw new Error ("RQ.add(): not configured!"); 146 queueListener = EventMulticaster.add(queueListener, listener); 147 } 148 149 protected synchronized StringPair[] deliverContent() 150 { 151 if (!configured) 152 throw new Error ("RQ.add(): not configured!"); 153 StringPair[] result = new StringPair[queue.size()]; 154 Iterator en = queue.iterator(); 155 ServerRequest sr; 156 for (int i=0; i<result.length; i++) 157 { 158 sr = (ServerRequest) en.next(); 159 result[i] = new StringPair(sr.requestId()+"", new String ( sr.objectId() ) ); 160 } 161 return result; 162 } 163 164 protected synchronized ServerRequest getElementAndRemove(int rid) 165 { 166 if (!configured) 167 throw new Error ("RQ.add(): not configured!"); 168 if (!queue.isEmpty()) 169 { 170 Iterator en = queue.iterator(); 171 ServerRequest result; 172 while (en.hasNext()) 173 { 174 result = (ServerRequest) en.next(); 175 if (result.requestId() == rid) 176 { 177 en.remove(); 178 this.notifyAll(); 179 if (queueListener != null) 181 queueListener.requestRemovedFromQueue(result, queue.size()); 182 return result; 183 } 184 } 185 } 186 return null; 187 } 188 189 protected synchronized ServerRequest getFirst() 190 { 191 if (!configured) 192 throw new Error ("RQ.add(): not configured!"); 193 if (!queue.isEmpty()) 194 { 195 return (ServerRequest) queue.get(0); 196 } 197 return null; 198 } 199 200 protected boolean isEmpty() 201 { 202 if (!configured) 203 throw new Error ("RQ.add(): not configured!"); 204 return queue.isEmpty(); 205 } 206 207 protected synchronized ServerRequest removeFirst() 208 { 209 if (!configured) 210 throw new Error ("RQ.add(): not configured!"); 211 if (!queue.isEmpty()) 212 { 213 ServerRequest result = (ServerRequest) queue.get(0); 214 queue.remove(0); 215 this.notifyAll(); 216 218 if (queueListener != null) 219 queueListener.requestRemovedFromQueue(result, queue.size()); 220 return result; 221 } 222 return null; 223 } 224 225 protected synchronized ServerRequest removeLast() 226 { 227 if (!configured) 228 throw new Error ("RQ.add(): not configured!"); 229 if (!queue.isEmpty()) 230 { 231 int index = queue.size() - 1; 232 ServerRequest result = (ServerRequest) queue.get(index); 233 queue.remove(index); 234 this.notifyAll(); 235 if (queueListener != null) 237 queueListener.requestRemovedFromQueue(result, queue.size()); 238 return result; 239 } 240 return null; 241 } 242 243 protected synchronized void removeRequestQueueListener(RequestQueueListener listener) 244 { 245 if (!configured) 246 throw new Error ("RQ.add(): not configured!"); 247 queueListener = EventMulticaster.remove(queueListener, listener); 248 } 249 250 protected int size() 251 { 252 return queue.size(); 253 } 254 } 255 | Popular Tags |