1 11 12 package com.sun.jmx.snmp.daemon; 13 14 import java.util.Vector ; 15 import java.io.Serializable ; 16 17 import com.sun.jmx.trace.Trace; 20 21 25 26 final class SnmpQManager implements Serializable { 27 28 31 private SendQ newq ; 32 private WaitQ waitq ; 33 34 private ThreadGroup queueThreadGroup = null ; 35 private Thread requestQThread = null ; 36 private Thread timerQThread = null ; 37 38 static String dbgTag = "SnmpQManager"; 39 40 41 44 SnmpQManager() { 45 newq = new SendQ(20, 5) ; 46 waitq = new WaitQ(20, 5) ; 47 48 queueThreadGroup = new ThreadGroup ("Qmanager Thread Group") ; 49 50 startQThreads() ; 52 } 53 54 public void startQThreads() { 55 if (timerQThread == null || timerQThread.isAlive() == false) { 56 timerQThread = new SnmpTimerServer(queueThreadGroup, this) ; 57 } 58 if (requestQThread == null || requestQThread.isAlive() == false) { 59 requestQThread = new SnmpSendServer(queueThreadGroup, this) ; 60 } 61 } 62 63 public void stopQThreads() { 64 65 ((SnmpTimerServer)timerQThread).isBeingDestroyed = true; 66 waitq.isBeingDestroyed = true; 67 ((SnmpSendServer)requestQThread).isBeingDestroyed = true; 68 newq.isBeingDestroyed = true; 69 70 if (timerQThread != null && timerQThread.isAlive() == true) { 71 ((SnmpTimerServer)timerQThread).stopTimerServer(); 72 } 73 waitq = null; 74 timerQThread = null; 75 76 if (requestQThread != null && requestQThread.isAlive() == true) { 77 ((SnmpSendServer)requestQThread).stopSendServer(); 78 } 79 newq = null; 80 requestQThread = null; 81 } 82 83 public void addRequest(SnmpInformRequest reqc) { 84 newq.addRequest(reqc) ; 85 return ; 86 } 87 88 public void addWaiting(SnmpInformRequest reqc) { 89 waitq.addWaiting(reqc) ; 90 return ; 91 } 92 93 public Vector getAllOutstandingRequest(long range) { 94 return newq.getAllOutstandingRequest(range) ; 95 } 96 97 public SnmpInformRequest getTimeoutRequests() { 98 return waitq.getTimeoutRequests() ; 99 } 100 101 public void removeRequest(SnmpInformRequest reqc) { 102 newq.removeElement(reqc) ; 103 waitq.removeElement(reqc) ; 104 } 105 106 public SnmpInformRequest removeRequest(long reqid) { 107 SnmpInformRequest reqc = null ; 108 109 if ((reqc = newq.removeRequest(reqid)) == null) 110 reqc = waitq.removeRequest(reqid) ; 111 112 return reqc ; 113 } 114 115 118 static boolean isTraceOn() { 119 return Trace.isSelected(Trace.LEVEL_TRACE, Trace.INFO_ADAPTOR_SNMP); 120 } 121 122 static void trace(String clz, String func, String info) { 123 Trace.send(Trace.LEVEL_TRACE, Trace.INFO_ADAPTOR_SNMP, clz, func, info); 124 } 125 126 static void trace(String func, String info) { 127 SnmpQManager.trace(dbgTag, func, info); 128 } 129 130 static boolean isDebugOn() { 131 return Trace.isSelected(Trace.LEVEL_DEBUG, Trace.INFO_ADAPTOR_SNMP); 132 } 133 134 static void debug(String clz, String func, String info) { 135 Trace.send(Trace.LEVEL_DEBUG, Trace.INFO_ADAPTOR_SNMP, clz, func, info); 136 } 137 138 static void debug(String clz, String func, Throwable exception) { 139 Trace.send(Trace.LEVEL_DEBUG, Trace.INFO_ADAPTOR_SNMP, clz, func, exception); 140 } 141 142 static void debug(String func, String info) { 143 SnmpQManager.debug(dbgTag, func, info); 144 } 145 146 static void debug(String func, Throwable exception) { 147 SnmpQManager.debug(dbgTag, func, exception); 148 } 149 150 } 151 152 155 class SendQ extends Vector { 156 157 SendQ(int initialCapacity, int capacityIncr) { 158 super(initialCapacity , capacityIncr) ; 159 } 160 161 private synchronized void notifyClients() { 162 this.notifyAll() ; 163 } 164 165 public synchronized void addRequest(SnmpInformRequest req) { 166 167 long nextPoll = req.getAbsNextPollTime() ; 168 169 int i ; 170 for (i = size() ; i > 0 ; i--) { 171 if (nextPoll < getRequestAt(i-1).getAbsNextPollTime()) 172 break ; 173 } 174 if (i == size()) { 175 addElement(req) ; 176 notifyClients() ; 177 } else 178 insertElementAt(req, i) ; 179 return ; 180 } 181 182 public synchronized boolean waitUntilReady() { 183 while (true) { 184 if (isBeingDestroyed == true) 185 return false; 186 long tmp = 0 ; 187 if (isEmpty() == false) { 188 long currTime = System.currentTimeMillis() ; 189 SnmpInformRequest req = (SnmpInformRequest) lastElement() ; 190 tmp = req.getAbsNextPollTime() - currTime ; 191 if (tmp <= 0) { 192 return true ; 193 } 194 } 195 waitOnThisQueue(tmp) ; 196 } 197 } 198 199 public synchronized Vector getAllOutstandingRequest(long margin) { 200 int i ; 201 Vector outreq = new Vector () ; 202 while (true) { 203 if (waitUntilReady() == true) { 204 long refTime = System.currentTimeMillis() + margin ; 205 206 for (i = size() ; i > 0 ; i--) { 207 SnmpInformRequest req = getRequestAt(i-1) ; 208 if (req.getAbsNextPollTime() > refTime) 209 break ; 210 outreq.addElement(req) ; 211 } 212 213 if (! outreq.isEmpty()) { 214 elementCount -= outreq.size() ; 215 return outreq ; 216 } 217 } 218 else 219 return null; 220 } 221 } 222 223 public synchronized void waitOnThisQueue(long time) { 224 if (time == 0 && !isEmpty()) { 225 if (SnmpQManager.isDebugOn()) { 226 SnmpQManager.debug("waitOnThisQueue", "[" + Thread.currentThread().toString() + "]:" + 227 "Fatal BUG :: Blocking on newq permenantly. But size = " + size()); 228 } 229 } 230 231 try { 232 this.wait(time) ; 233 } catch (InterruptedException e) { 234 } 235 } 236 237 public SnmpInformRequest getRequestAt(int idx) { 238 return (SnmpInformRequest)elementAt(idx) ; 239 } 240 241 public synchronized SnmpInformRequest removeRequest(long reqid) { 242 int max= size() ; 243 for (int i = 0 ; i < max ; i++) { 244 SnmpInformRequest reqc = getRequestAt(i) ; 245 if (reqid == reqc.getRequestId()) { 246 removeElementAt(i) ; 247 return reqc ; 248 } 249 } 250 return null ; 251 } 252 253 boolean isBeingDestroyed = false; 257 } 258 259 262 class WaitQ extends Vector { 263 264 WaitQ(int initialCapacity, int capacityIncr) { 265 super(initialCapacity , capacityIncr) ; 266 } 267 268 public synchronized void addWaiting(SnmpInformRequest req) { 269 270 long waitTime = req.getAbsMaxTimeToWait() ; 271 int i ; 272 for (i = size() ; i > 0 ; i--) { 273 if (waitTime < getRequestAt(i-1).getAbsMaxTimeToWait()) 274 break ; 275 } 276 if (i == size()) { 277 addElement(req) ; 278 notifyClients() ; 279 } else 280 insertElementAt(req, i) ; 281 return ; 282 } 283 284 public synchronized boolean waitUntilReady() { 285 while (true) { 286 if (isBeingDestroyed == true) 287 return false; 288 long tmp = 0 ; 289 if (isEmpty() == false) { 290 long currTime = System.currentTimeMillis() ; 291 SnmpInformRequest req = (SnmpInformRequest) lastElement() ; 292 tmp = req.getAbsMaxTimeToWait() - currTime ; 293 if (tmp <= 0) { 294 return true ; 295 } 296 } 297 waitOnThisQueue(tmp) ; 298 } 299 } 300 301 public synchronized SnmpInformRequest getTimeoutRequests() { 302 if (waitUntilReady() == true) { 303 SnmpInformRequest req = (SnmpInformRequest) lastElement() ; 304 elementCount-- ; 305 return req ; 306 } 307 else { 308 return null; 309 } 310 } 311 312 private synchronized void notifyClients() { 313 this.notifyAll() ; 314 } 315 316 public synchronized void waitOnThisQueue(long time) { 317 if (time == 0 && !isEmpty()) { 318 if (SnmpQManager.isDebugOn()) { 319 SnmpQManager.debug("waitOnThisQueue", "[" + Thread.currentThread().toString() + "]:" + 320 "Fatal BUG :: Blocking on waitq permenantly. But size = " + size()); 321 } 322 } 323 324 try { 325 this.wait(time) ; 326 } catch (InterruptedException e) { 327 } 328 } 329 330 public SnmpInformRequest getRequestAt(int idx) { 331 return (SnmpInformRequest)elementAt(idx) ; 332 } 333 334 public synchronized SnmpInformRequest removeRequest(long reqid) { 335 int max= size(); 336 for (int i = 0 ; i < max ; i++) { 337 SnmpInformRequest reqc = getRequestAt(i) ; 338 if (reqid == reqc.getRequestId()) { 339 removeElementAt(i) ; 340 return reqc ; 341 } 342 } 343 return null ; 344 } 345 346 boolean isBeingDestroyed = false; 350 } 351 | Popular Tags |