1 23 24 30 31 package com.sun.enterprise.admin.monitor.callflow; 32 33 import java.util.List ; 34 import java.util.ArrayList ; 35 import java.util.logging.Level ; 36 import java.util.logging.Logger ; 37 import java.util.concurrent.TimeUnit ; 38 import java.util.concurrent.LinkedBlockingQueue ; 39 import java.util.concurrent.ConcurrentLinkedQueue ; 40 41 import com.sun.enterprise.admin.common.constant.AdminConstants; 42 43 55 class AsyncHandler { 56 57 58 59 private static final Logger logger = 60 Logger.getLogger(AdminConstants.kLoggerName); 61 private static final int WAIT_INTERVALS = 100; 62 private static final int MAX_BULK_SIZE = 10000; 63 private static final int BUFFER_COUNT = 6; 64 private static final String THREAD_NAME = "Callflow AsyncThread"; 65 66 67 68 private LinkedBlockingQueue <RequestStartTO> requestStartQ; 69 private LinkedBlockingQueue <RequestEndTO> requestEndQ; 70 private LinkedBlockingQueue <MethodStartTO> methodStartQ; 71 private LinkedBlockingQueue <MethodEndTO> methodEndQ; 72 private LinkedBlockingQueue <StartTimeTO> startTimeQ; 73 private LinkedBlockingQueue <EndTimeTO> endTimeQ; 74 75 private ConcurrentLinkedQueue <RequestStartTO> requestStartFreeQ; 76 private ConcurrentLinkedQueue <RequestEndTO> requestEndFreeQ; 77 private ConcurrentLinkedQueue <MethodStartTO> methodStartFreeQ; 78 private ConcurrentLinkedQueue <MethodEndTO> methodEndFreeQ; 79 private ConcurrentLinkedQueue <StartTimeTO> startTimeFreeQ; 80 private ConcurrentLinkedQueue <EndTimeTO> endTimeFreeQ; 81 82 private AsyncThread asyncThread; 83 84 private class AsyncThread extends Thread { 85 86 private boolean shutdown; 87 private int emptyBufferCount; 88 private DbAccessObject dbAccessObject; 89 90 AsyncThread() { 91 setDaemon(true); 92 setName(THREAD_NAME); 93 dbAccessObject = DbAccessObjectImpl.getInstance(); 94 } 95 96 void shutdown() { 97 shutdown = true; 98 while (emptyBufferCount < BUFFER_COUNT) { 99 try { 100 Thread.sleep(WAIT_INTERVALS); 101 } catch (InterruptedException e) {} 102 } 103 } 104 105 public void run() { 106 107 List <RequestStartTO> rsTransferObjects = 108 new ArrayList <RequestStartTO>(); 109 List <RequestEndTO> reTransferObjects = 110 new ArrayList <RequestEndTO>(); 111 List <MethodStartTO> msTransferObjects = 112 new ArrayList <MethodStartTO>(); 113 List <MethodEndTO> meTransferObjects = 114 new ArrayList <MethodEndTO>(); 115 List <StartTimeTO> stTransferObjects = 116 new ArrayList <StartTimeTO>(); 117 List <EndTimeTO> etTransferObjects = 118 new ArrayList <EndTimeTO>(); 119 120 while (emptyBufferCount < BUFFER_COUNT) { 121 122 124 for (int i=0; i<MAX_BULK_SIZE; i++) { 125 try { 126 RequestStartTO rsto = 127 requestStartQ.poll( 128 WAIT_INTERVALS, TimeUnit.MILLISECONDS); 129 if (rsto == null) { 130 break; 131 } 132 rsTransferObjects.add(rsto); 133 } catch (InterruptedException e) { 134 logger.log( 135 Level.FINE, 136 "callflow.async_thread_interrupted", 137 e); 138 break; 139 } 140 } 141 try { 142 if (rsTransferObjects.isEmpty()) { 143 if (shutdown) { 144 emptyBufferCount++; 145 } 146 } else { 147 dbAccessObject.insert( 148 rsTransferObjects.toArray(new TransferObject[0])); 149 } 150 } catch (Exception e) { 151 logger.log( 152 Level.WARNING, 153 "callflow.async_db_write_failed", 154 e); 155 } 156 requestStartFreeQ.addAll(rsTransferObjects); 157 rsTransferObjects.clear(); 158 159 161 for (int i=0; i<MAX_BULK_SIZE; i++) { 162 try { 163 RequestEndTO reto = 164 requestEndQ.poll( 165 WAIT_INTERVALS, TimeUnit.MILLISECONDS); 166 if (reto == null) { 167 break; 168 } 169 reTransferObjects.add(reto); 170 } catch (InterruptedException e) { 171 logger.log( 172 Level.FINE, 173 "callflow.async_thread_interrupted", 174 e); 175 break; 176 } 177 } 178 try { 179 if (reTransferObjects.isEmpty()) { 180 if (shutdown) { 181 emptyBufferCount++; 182 } 183 } else { 184 dbAccessObject.insert( 185 reTransferObjects.toArray(new TransferObject[0])); 186 } 187 } catch (Exception e) { 188 logger.log( 189 Level.WARNING, 190 "callflow.async_db_write_failed", 191 e); 192 } 193 requestEndFreeQ.addAll(reTransferObjects); 194 reTransferObjects.clear(); 195 196 198 for (int i=0; i<MAX_BULK_SIZE; i++) { 199 try { 200 MethodStartTO msto = 201 methodStartQ.poll( 202 WAIT_INTERVALS, TimeUnit.MILLISECONDS); 203 if (msto == null) { 204 break; 205 } 206 msTransferObjects.add(msto); 207 } catch (InterruptedException e) { 208 logger.log( 209 Level.FINE, 210 "callflow.async_thread_interrupted", 211 e); 212 break; 213 } 214 } 215 try { 216 if (msTransferObjects.isEmpty()) { 217 if (shutdown) { 218 emptyBufferCount++; 219 } 220 } else { 221 dbAccessObject.insert( 222 msTransferObjects.toArray(new TransferObject[0])); 223 } 224 } catch (Exception e) { 225 logger.log( 226 Level.WARNING, 227 "callflow.async_db_write_failed", 228 e); 229 } 230 methodStartFreeQ.addAll(msTransferObjects); 231 msTransferObjects.clear(); 232 233 235 for (int i=0; i<MAX_BULK_SIZE; i++) { 236 try { 237 MethodEndTO meto = 238 methodEndQ.poll( 239 WAIT_INTERVALS, TimeUnit.MILLISECONDS); 240 if (meto == null) { 241 break; 242 } 243 meTransferObjects.add(meto); 244 } catch (InterruptedException e) { 245 logger.log( 246 Level.FINE, 247 "callflow.async_thread_interrupted", 248 e); 249 break; 250 } 251 } 252 try { 253 if (meTransferObjects.isEmpty()) { 254 if (shutdown) { 255 emptyBufferCount++; 256 } 257 } else { 258 dbAccessObject.insert( 259 meTransferObjects.toArray(new TransferObject[0])); 260 } 261 } catch (Exception e) { 262 logger.log( 263 Level.WARNING, 264 "callflow.async_db_write_failed", 265 e); 266 } 267 methodEndFreeQ.addAll(meTransferObjects); 268 meTransferObjects.clear(); 269 270 272 for (int i=0; i<MAX_BULK_SIZE; i++) { 273 try { 274 StartTimeTO stto = 275 startTimeQ.poll( 276 WAIT_INTERVALS, TimeUnit.MILLISECONDS); 277 if (stto == null) { 278 break; 279 } 280 stTransferObjects.add(stto); 281 } catch (InterruptedException e) { 282 logger.log( 283 Level.FINE, 284 "callflow.async_thread_interrupted", 285 e); 286 break; 287 } 288 } 289 try { 290 if (stTransferObjects.isEmpty()) { 291 if (shutdown) { 292 emptyBufferCount++; 293 } 294 } else { 295 dbAccessObject.insert( 296 stTransferObjects.toArray(new TransferObject[0])); 297 } 298 } catch (Exception e) { 299 logger.log( 300 Level.WARNING, 301 "callflow.async_db_write_failed", 302 e); 303 } 304 startTimeFreeQ.addAll(stTransferObjects); 305 stTransferObjects.clear(); 306 307 309 for (int i=0; i<MAX_BULK_SIZE; i++) { 310 try { 311 EndTimeTO etto = 312 endTimeQ.poll( 313 WAIT_INTERVALS, TimeUnit.MILLISECONDS); 314 if (etto == null) { 315 break; 316 } 317 etTransferObjects.add(etto); 318 } catch (InterruptedException e) { 319 logger.log( 320 Level.FINE, 321 "callflow.async_thread_interrupted", 322 e); 323 break; 324 } 325 } 326 try { 327 if (etTransferObjects.isEmpty()) { 328 if (shutdown) { 329 emptyBufferCount++; 330 } 331 } else { 332 dbAccessObject.insert( 333 etTransferObjects.toArray(new TransferObject[0])); 334 } 335 } catch (Exception e) { 336 logger.log( 337 Level.WARNING, 338 "callflow.async_db_write_failed", 339 e); 340 } 341 endTimeFreeQ.addAll(etTransferObjects); 342 etTransferObjects.clear(); 343 } 344 } 345 } 346 347 AsyncHandler() { 348 349 requestStartQ = new LinkedBlockingQueue <RequestStartTO>(); 350 requestEndQ = new LinkedBlockingQueue <RequestEndTO>(); 351 methodStartQ = new LinkedBlockingQueue <MethodStartTO>(); 352 methodEndQ = new LinkedBlockingQueue <MethodEndTO>(); 353 startTimeQ = new LinkedBlockingQueue <StartTimeTO>(); 354 endTimeQ = new LinkedBlockingQueue <EndTimeTO>(); 355 356 requestStartFreeQ = new ConcurrentLinkedQueue <RequestStartTO>(); 357 requestEndFreeQ = new ConcurrentLinkedQueue <RequestEndTO>(); 358 methodStartFreeQ = new ConcurrentLinkedQueue <MethodStartTO>(); 359 methodEndFreeQ = new ConcurrentLinkedQueue <MethodEndTO>(); 360 startTimeFreeQ = new ConcurrentLinkedQueue <StartTimeTO>(); 361 endTimeFreeQ = new ConcurrentLinkedQueue <EndTimeTO>(); 362 } 363 364 synchronized void enable() { 365 asyncThread = new AsyncThread(); 366 asyncThread.start(); 367 } 368 369 synchronized void disable() { 370 asyncThread.shutdown(); 371 } 372 373 void handleRequestStart( 374 String requestId, long timeStamp, long timeStampMillis, 375 RequestType requestType, String callerIPAddress, 376 String remoteUser) { 377 RequestStartTO rsto = requestStartFreeQ.poll(); 378 if (rsto == null) { 379 rsto = new RequestStartTO(); 380 } 381 rsto.setRequestId(requestId); 382 rsto.setTimeStamp(timeStamp); 383 rsto.setTimeStampMillis(timeStampMillis); 384 rsto.setRequestType(requestType); 385 rsto.setIpAddress(callerIPAddress); 386 boolean success = false; 388 while (!success) { 389 try { 390 requestStartQ.put(rsto); 391 success = true; 392 } catch (InterruptedException e) { 393 logger.log( 394 Level.FINE, 395 "callflow.transfer_to_async_thread_interrupted", e); 396 } 397 } 398 } 399 400 void handleRequestEnd(String requestId, long timeStamp) { 401 RequestEndTO reto = requestEndFreeQ.poll(); 402 if (reto == null) { 403 reto = new RequestEndTO(); 404 } 405 reto.setRequestId(requestId); 406 reto.setTimeStamp(timeStamp); 407 408 boolean success = false; 409 while (!success) { 410 try { 411 requestEndQ.put(reto); 412 success = true; 413 } catch (InterruptedException e) { 414 logger.log( 415 Level.FINE, 416 "callflow.transfer_to_async_thread_interrupted", e); 417 } 418 } 419 } 420 421 void handleMethodStart( 422 String requestId, long timeStamp, String methodName, 423 ComponentType componentType, String applicationName, 424 String moduleName, String componentName, String threadId, 425 String transactionId, String securityId) { 426 MethodStartTO msto = methodStartFreeQ.poll(); 427 if (msto == null) { 428 msto = new MethodStartTO(); 429 } 430 msto.setRequestId(requestId); 431 msto.setTimeStamp(timeStamp); 432 msto.setMethodName(methodName); 433 msto.setComponentType(componentType); 434 msto.setAppName(applicationName); 435 msto.setModuleName(moduleName); 436 msto.setComponentName(componentName); 437 msto.setThreadId(threadId); 438 msto.setTransactionId(transactionId); 439 msto.setSecurityId(securityId); 440 441 boolean success = false; 442 while (!success) { 443 try { 444 methodStartQ.put(msto); 445 success = true; 446 } catch (InterruptedException e) { 447 logger.log( 448 Level.FINE, 449 "callflow.transfer_to_async_thread_interrupted", e); 450 } 451 } 452 } 453 454 void handleMethodEnd( 455 String requestId, long timeStamp, Throwable exception) { 456 MethodEndTO meto = methodEndFreeQ.poll(); 457 if (meto == null) { 458 meto = new MethodEndTO(); 459 } 460 meto.setRequestId(requestId); 461 meto.setTimeStamp(timeStamp); 462 meto.setException(((exception == null) ? null : exception.toString())); 463 464 boolean success = false; 465 while (!success) { 466 try { 467 methodEndQ.put(meto); 468 success = true; 469 } catch (InterruptedException e) { 470 logger.log( 471 Level.FINE, 472 "callflow.transfer_to_async_thread_interrupted", e); 473 } 474 } 475 } 476 477 void handleStartTime( 478 String requestId, long timeStamp, 479 ContainerTypeOrApplicationType type) { 480 StartTimeTO stto = startTimeFreeQ.poll(); 481 if (stto == null) { 482 stto = new StartTimeTO(); 483 } 484 stto.setRequestId(requestId); 485 stto.setTimeStamp(timeStamp); 486 stto.setContainerTypeOrApplicationType(type); 487 boolean success = false; 488 while (!success) { 489 try { 490 startTimeQ.put(stto); 491 success = true; 492 } catch (InterruptedException e) { 493 logger.log( 494 Level.FINE, 495 "callflow.transfer_to_async_thread_interrupted", e); 496 } 497 } 498 } 499 500 void handleEndTime( 501 String requestId, long timeStamp, 502 ContainerTypeOrApplicationType type) { 503 EndTimeTO etto = endTimeFreeQ.poll(); 504 if (etto == null) { 505 etto = new EndTimeTO(); 506 } 507 etto.setRequestId(requestId); 508 etto.setTimeStamp(timeStamp); 509 etto.setContainerTypeOrApplicationType(type); 510 boolean success = false; 511 while (!success) { 512 try { 513 endTimeQ.put(etto); 514 success = true; 515 } catch (InterruptedException e) { 516 logger.log( 517 Level.FINE, 518 "callflow.transfer_to_async_thread_interrupted", e); 519 } 520 } 521 } 522 } 523 | Popular Tags |