1 64 65 package com.jcorporate.expresso.services.asyncprocess; 66 67 import com.jcorporate.expresso.core.db.DBException; 68 import com.jcorporate.expresso.core.misc.StringUtil; 69 import com.jcorporate.expresso.core.registry.MutableRequestRegistry; 70 import com.jcorporate.expresso.core.registry.RequestRegistry; 71 import com.jcorporate.expresso.core.security.User; 72 import com.jcorporate.expresso.services.dbobj.Setup; 73 import org.apache.log4j.Logger; 74 75 import java.util.HashMap ; 76 import java.util.LinkedList ; 77 import java.util.Map ; 78 79 107 public class DefaultAsyncProcessor 108 implements AsyncProcessor { 109 110 public static final String DEFAULT_PROCESSSOR_PROPERTIES_FILE = 111 "DefaultProcessor.properties"; 112 private int nextIssuedTicket = 0; 113 114 private LinkedList waitQueue = new LinkedList (); 115 116 private Map allProcesses = new HashMap (); 117 118 private ProcessThread processThreads[]; 119 120 private ThreadGroup processThreadGroup = new ThreadGroup ("AsyncProcess Threads"); 121 122 long claimTimeout = 30000; 123 124 int numThreads = 10; 125 126 int queueSize = 20; 127 128 private static final Logger log = Logger.getLogger(DefaultAsyncProcessor.class); 129 130 public DefaultAsyncProcessor() { 131 try { 132 claimTimeout = Long.parseLong((StringUtil.notNull(Setup.getValue("default", "AsyncClaimTimeout")).length() != 0) 135 ? 136 Setup.getValue("default", "AsyncClaimTimeout") : 137 "30000"); 138 141 numThreads = Integer.parseInt((StringUtil.notNull(Setup.getValue("default", "AsyncNumThreads")).length() != 0) 142 ? 143 Setup.getValue("default", "AsyncNumThreads") : 144 "10"); 145 147 queueSize = Integer.parseInt((StringUtil.notNull(Setup.getValue("default", "AsyncQueueSize")).length() != 0) 148 ? 149 Setup.getValue("default", "AsyncQueueSize") : 150 "20"); 151 152 } catch (DBException ex) { 154 log.error("Error loading properties", ex); 155 } catch (NumberFormatException ex) { 156 log.error("Error parsing setup values for Async processor. " 157 + "Has this DB gone through initial setup? Using default " 158 + "values instead."); 159 } 160 161 processThreads = new ProcessThread[numThreads]; 162 for (int i = 0; i < numThreads; i++) { 163 processThreads[i] = new ProcessThread(this, processThreadGroup, 164 "Processor" + i); 165 } 166 167 if (log.isInfoEnabled()) { 168 log.info("Starting processing threads"); 169 } 170 171 for (int i = 0; i < numThreads; i++) { 172 processThreads[i].start(); 173 } 174 } 175 176 179 public synchronized void destroy() { 180 for (int i = 0; i < numThreads; i++) { 181 Thread aThread = processThreads[i]; 182 if (aThread != null) { 183 aThread.interrupt(); 184 } 185 } 186 187 for (int i = 0; i < numThreads; i++) { 188 Thread aThread = processThreads[i]; 189 if (aThread != null) { 190 try { 191 if (aThread != null) { 192 aThread.join(1000); 193 } 194 } catch (InterruptedException ex) { 195 log.info("Interrupted while waiting for process thread", ex); 196 } 197 198 if (aThread.isAlive()) { 199 log.warn("After waiting a second, the process thread: " + 200 aThread.getName() 201 + " has still not exited"); 202 } 203 } 204 } 205 206 processThreads = null; 207 if (processThreadGroup != null) { 208 processThreadGroup.destroy(); 209 } 210 } 211 212 219 public synchronized AsyncTicket addToQueue(AsyncProcess newProcess) 220 throws QueueFullException { 221 if (processThreads == null) { 222 throw new IllegalStateException ("Async Processor has already shut down"); 223 } 224 225 nextIssuedTicket++; 226 DefaultTicket ticket = new DefaultTicket(nextIssuedTicket); 227 ProcessWrapper wrapper = new ProcessWrapper(newProcess, ticket); 228 wrapper.getResult().setStatusCode(AsyncProcessResult.STATUS_PENDING); 229 DefaultTicket theTicket = new DefaultTicket(nextIssuedTicket); 230 231 synchronized (allProcesses) { 232 allProcesses.put(theTicket, wrapper); 233 } 234 235 synchronized (waitQueue) { 236 if (waitQueue.size() > queueSize) { 237 throw new QueueFullException("Queue already has " + 238 waitQueue.size() 239 + 240 " processes waiting. Cannot continue"); 241 } 242 243 waitQueue.addLast(wrapper); 244 waitQueue.notify(); 245 } 246 return theTicket; 247 } 248 249 259 260 public AsyncTicket addToQueue(AsyncProcess newProcess, long waitTimeout) 261 throws QueueFullException { 262 263 AsyncTicket ticket = addToQueue(newProcess); 264 synchronized (newProcess) { 265 try { 266 newProcess.wait(waitTimeout); 267 } catch (InterruptedException ex) { 268 log.info("Interrupted while waiting for process", ex); 269 return ticket; 270 } 271 } 272 273 synchronized (allProcesses) { 274 ProcessWrapper wrapper = this.getProcessWrapper(ticket); 275 if (wrapper == null) { 276 return null; 277 } 278 279 if (wrapper.getResult().getStatusCode() == 280 AsyncProcessResult.STATUS_COMPLETE) { 281 this.getProcessResult(ticket); 282 return new DefaultTicket(-1); 283 } 284 } 285 286 return ticket; 287 } 288 289 296 private ProcessWrapper getProcessWrapper(AsyncTicket ticketId) { 297 ProcessWrapper wrapper = (ProcessWrapper) allProcesses.get(ticketId); 298 if (wrapper == null) { 299 return null; 300 } 301 302 return wrapper; 303 } 304 305 310 LinkedList getQueue() { 311 return waitQueue; 312 } 313 314 318 330 331 337 class ProcessWrapper { 338 private AsyncProcess wrappedObject; 339 340 DefaultTicket objectId; 341 342 DefaultProcessResult result; 343 344 long queueTime; 345 346 long startTime; 347 348 long completedTime; 349 350 private String defaultDataContext; 351 352 private User defaultUser; 353 354 public ProcessWrapper(AsyncProcess objectToWrap, 355 DefaultTicket newObjectId) { 356 queueTime = System.currentTimeMillis(); 357 wrappedObject = objectToWrap; 358 objectId = newObjectId; 359 result = new DefaultProcessResult(); 360 result.setOriginalProcess(objectToWrap); 361 362 defaultDataContext = RequestRegistry.getDataContext(); 365 defaultUser = RequestRegistry.getUser(); 366 } 367 368 public AsyncProcess getWrappedObject() { 369 return wrappedObject; 370 } 371 372 public DefaultProcessResult getResult() { 373 return result; 374 } 375 376 377 381 public void process() { 382 new MutableRequestRegistry(defaultDataContext, defaultUser); 385 386 startTime = System.currentTimeMillis(); 387 this.getResult().setStatusCode(AsyncProcessResult.STATUS_RUNNING); 388 try { 389 wrappedObject.process(); 390 } catch (Throwable ex) { 391 this.getResult().setException(ex); 392 this.getResult().setStatusCode(AsyncProcessResult.STATUS_FAULT); 393 } 394 completedTime = System.currentTimeMillis(); 395 this.getResult().setStatusCode(AsyncProcessResult.STATUS_COMPLETE); 396 397 synchronized (wrappedObject) { 402 wrappedObject.notify(); 403 } 404 405 System.gc(); 411 } 412 413 public long getQueueTime() { 414 return queueTime; 415 } 416 417 public long getCompletedTime() { 418 return completedTime; 419 } 420 421 public DefaultTicket getObjectId() { 422 return objectId; 423 } 424 425 } 426 427 428 434 public AsyncProcessResult getProcessResult(AsyncTicket ticketId) { 435 synchronized (allProcesses) { 436 ProcessWrapper wrapper = getProcessWrapper(ticketId); 437 438 if (wrapper == null) { 439 log.warn("Error getting process wrapper for ticket: " + 440 ticketId.toString()); 441 return null; 442 } 443 444 if (wrapper.getResult().getStatusCode() == 445 AsyncProcessResult.STATUS_COMPLETE) { 446 allProcesses.remove(ticketId); 447 } 448 449 return wrapper.getResult(); 450 } 451 } 452 453 454 463 public AsyncProcessResult getProcessResult(AsyncTicket ticketId, 464 long waitTimeout) { 465 ProcessWrapper wrapper; 466 467 synchronized (allProcesses) { 468 wrapper = getProcessWrapper(ticketId); 469 470 if (wrapper == null) { 471 log.warn("Didn't get a process warpper for ticket: " + 472 ticketId.toString()); 473 return null; 474 } 475 476 if (wrapper.getResult().getStatusCode() == 477 AsyncProcessResult.STATUS_COMPLETE) { 478 allProcesses.remove(ticketId); 479 return wrapper.getResult(); 480 } 481 } 482 483 AsyncProcess process = wrapper.getWrappedObject(); 484 synchronized (process) { 485 try { 486 process.wait(waitTimeout); 487 } catch (InterruptedException ex) { 488 log.info("Interrupted while waiting for process", ex); 489 } 490 } 491 492 if (wrapper.getResult().getStatusCode() 493 == AsyncProcessResult.STATUS_COMPLETE) { 494 allProcesses.remove(ticketId); 495 } 496 497 return wrapper.getResult(); 498 } 499 500 } 501 | Popular Tags |