1 21 22 package org.apache.derby.impl.services.daemon; 23 24 import org.apache.derby.iapi.services.context.ContextService; 25 import org.apache.derby.iapi.services.context.ContextManager; 26 import org.apache.derby.iapi.services.daemon.DaemonService; 27 import org.apache.derby.iapi.services.daemon.Serviceable; 28 import org.apache.derby.iapi.services.monitor.Monitor; 29 import org.apache.derby.iapi.services.monitor.ModuleFactory; 30 import org.apache.derby.iapi.services.sanity.SanityManager; 31 32 import org.apache.derby.iapi.error.StandardException; 33 34 import java.util.Vector ; 35 import java.util.List ; 36 37 76 public class BasicDaemon implements DaemonService, Runnable 77 { 78 private int numClients; 80 private static final int OPTIMAL_QUEUE_SIZE = 100; 81 82 private final Vector subscription; 83 84 protected final ContextService contextService; 86 protected final ContextManager contextMgr; 87 88 92 private final List highPQ; private final List normPQ; 95 99 private int nextService; 100 101 104 105 private boolean awakened; 108 111 private boolean waiting; 112 113 private boolean inPause; private boolean running; private boolean stopRequested; private boolean stopped; 118 private long lastServiceTime; private int earlyWakeupCount; 124 127 public BasicDaemon(ContextService contextService) 128 { 129 this.contextService = contextService; 130 this.contextMgr = contextService.newContextManager(); 131 132 subscription = new Vector (1, 1); 133 highPQ = new java.util.LinkedList (); 134 normPQ = new java.util.LinkedList (); 135 136 lastServiceTime = System.currentTimeMillis(); 137 } 138 139 public int subscribe(Serviceable newClient, boolean onDemandOnly) 140 { 141 int clientNumber; 142 143 ServiceRecord clientRecord; 144 145 synchronized(this) 146 { 147 clientNumber = numClients++; 148 149 clientRecord = new ServiceRecord(newClient, onDemandOnly, true); 150 subscription.insertElementAt(clientRecord, clientNumber); 151 } 152 153 154 if (SanityManager.DEBUG) 155 { 156 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 157 SanityManager.DEBUG(DaemonService.DaemonTrace, 158 "subscribed client # " + clientNumber + " : " + 159 clientRecord); 160 } 161 162 return clientNumber; 163 } 164 165 174 public void unsubscribe(int clientNumber) 175 { 176 if (clientNumber < 0 || clientNumber > subscription.size()) 177 return; 178 179 subscription.setElementAt(null, clientNumber); 181 } 182 183 public void serviceNow(int clientNumber) 184 { 185 if (clientNumber < 0 || clientNumber > subscription.size()) 186 return; 187 188 ServiceRecord clientRecord = (ServiceRecord)subscription.elementAt(clientNumber); 189 if (clientRecord == null) 190 return; 191 192 clientRecord.called(); 193 wakeUp(); 194 } 195 196 public boolean enqueue(Serviceable newClient, boolean serviceNow) 197 { 198 ServiceRecord clientRecord = new ServiceRecord(newClient, false, false); 199 200 if (SanityManager.DEBUG) 201 { 202 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 203 SanityManager.DEBUG(DaemonService.DaemonTrace, 204 "enqueing work, urgent = " + serviceNow + ":" + newClient ); 205 } 206 207 208 List queue = serviceNow ? highPQ : normPQ; 209 210 int highPQsize; 211 synchronized (this) { 212 queue.add(clientRecord); 213 highPQsize = highPQ.size(); 214 215 if (SanityManager.DEBUG) { 216 217 if (SanityManager.DEBUG_ON("memoryLeakTrace")) { 218 219 if (highPQsize > (OPTIMAL_QUEUE_SIZE * 2)) 220 System.out.println("memoryLeakTrace:BasicDaemon " + highPQsize); 221 } 222 } 223 } 224 225 if (serviceNow && !awakened) 226 wakeUp(); 227 228 if (serviceNow) { 229 return highPQsize > OPTIMAL_QUEUE_SIZE; 230 } 231 return false; 232 } 233 234 237 public synchronized void clear() 238 { 239 normPQ.clear(); 240 highPQ.clear(); 241 } 242 243 246 247 protected ServiceRecord nextAssignment(boolean urgent) 248 { 249 ServiceRecord clientRecord; 251 252 while (nextService < subscription.size()) 253 { 254 clientRecord = (ServiceRecord)subscription.elementAt(nextService++); 255 if (clientRecord != null && (clientRecord.needImmediateService() || (!urgent && clientRecord.needService()))) 256 return clientRecord; 257 } 258 259 clientRecord = null; 260 261 synchronized(this) 262 { 263 if (!highPQ.isEmpty()) 264 clientRecord = (ServiceRecord) highPQ.remove(0); 265 } 266 267 if (urgent || clientRecord != null) 268 { 269 if (SanityManager.DEBUG) 270 { 271 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 272 SanityManager.DEBUG(DaemonService.DaemonTrace, 273 clientRecord == null ? 274 "No more urgent assignment " : 275 "Next urgent assignment : " + clientRecord); 276 } 277 278 return clientRecord; 279 } 280 281 clientRecord = null; 282 synchronized(this) 283 { 284 if (!normPQ.isEmpty()) 285 { 286 clientRecord = (ServiceRecord)normPQ.remove(0); 287 288 if (SanityManager.DEBUG) 289 { 290 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 291 SanityManager.DEBUG(DaemonService.DaemonTrace, 292 "Next normal enqueued : " + clientRecord); 293 } 294 } 295 296 } 298 299 if (SanityManager.DEBUG) 300 { 301 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 302 { 303 if (clientRecord == null) 304 SanityManager.DEBUG(DaemonService.DaemonTrace, "No more assignment"); 305 } 306 } 307 308 return clientRecord; 309 } 310 311 protected void serviceClient(ServiceRecord clientRecord) 312 { 313 clientRecord.serviced(); 314 315 Serviceable client = clientRecord.client; 316 317 if (client == null) 319 return; 320 321 ContextManager cm = contextMgr; 322 323 if (SanityManager.DEBUG) 324 { 325 SanityManager.ASSERT(cm != null, "Context manager is null"); 326 SanityManager.ASSERT(client != null, "client is null"); 327 } 328 329 try 330 { 331 int status = client.performWork(cm); 332 333 if (clientRecord.subscriber) 334 return; 335 336 if (status == Serviceable.REQUEUE) 337 { 338 List queue = client.serviceASAP() ? highPQ : normPQ; 339 synchronized (this) { 340 queue.add(clientRecord); 341 342 if (SanityManager.DEBUG) { 343 344 if (SanityManager.DEBUG_ON("memoryLeakTrace")) { 345 346 if (queue.size() > (OPTIMAL_QUEUE_SIZE * 2)) 347 System.out.println("memoryLeakTrace:BasicDaemon " + queue.size()); 348 } 349 } 350 } 351 } 352 353 return; 354 } 355 catch (Throwable e) 356 { 357 if (SanityManager.DEBUG) 358 SanityManager.showTrace(e); 359 cm.cleanupOnError(e); 360 } 361 } 362 363 366 public void run() 367 { 368 contextService.setCurrentContextManager(contextMgr); 369 370 if (SanityManager.DEBUG) 371 { 372 if (SanityManager.DEBUG_ON(DaemonService.DaemonOff)) 373 { 374 SanityManager.DEBUG(DaemonService.DaemonTrace, "DaemonOff is set in properties, background Daemon not run"); 375 return; 376 } 377 SanityManager.DEBUG(DaemonService.DaemonTrace, "running"); 378 } 379 380 while(true) 382 { 383 if (stopRequested()) 384 break; 385 386 boolean urgentOnly = rest(); 389 390 if (stopRequested()) 391 break; 392 393 if (!inPause()) 394 work(urgentOnly); 395 } 396 397 synchronized(this) 398 { 399 running = false; 400 stopped = true; 401 } 402 contextMgr.cleanupOnError(StandardException.normalClose()); 403 contextService.resetCurrentContextManager(contextMgr); 404 } 405 406 409 410 413 public void pause() 414 { 415 if (SanityManager.DEBUG) 416 { 417 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 418 SanityManager.DEBUG(DaemonService.DaemonTrace, "pausing daemon"); 419 } 420 421 synchronized(this) 422 { 423 inPause = true; 424 while(running) 425 { 426 if (SanityManager.DEBUG) 427 { 428 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 429 SanityManager.DEBUG(DaemonService.DaemonTrace, 430 "waiting for daemon run to finish"); 431 } 432 433 try 434 { 435 wait(); 436 } 437 catch (InterruptedException ie) 438 { 439 } 441 } 442 } 443 444 if (SanityManager.DEBUG) 445 { 446 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 447 SanityManager.DEBUG(DaemonService.DaemonTrace, 448 "daemon paused"); 449 } 450 } 451 452 public void resume() 453 { 454 synchronized(this) 455 { 456 inPause = false; 457 } 458 459 if (SanityManager.DEBUG) 460 { 461 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 462 SanityManager.DEBUG(DaemonService.DaemonTrace, 463 "daemon resumed"); 464 } 465 } 466 467 473 public void stop() 474 { 475 if (stopped) return; 477 478 synchronized(this) 479 { 480 stopRequested = true; 481 notifyAll(); } 483 484 pause(); 486 } 487 488 493 public void waitUntilQueueIsEmpty() 494 { 495 while(true){ 496 synchronized(this) 497 { 498 boolean noSubscriptionRequests = true; 499 for (int urgentServiced = 0; urgentServiced < subscription.size(); urgentServiced++) 500 { 501 ServiceRecord clientRecord = (ServiceRecord)subscription.elementAt(urgentServiced); 502 if (clientRecord != null && clientRecord.needService()) 503 { 504 noSubscriptionRequests = false; 505 break; 506 } 507 } 508 509 if (highPQ.isEmpty() && noSubscriptionRequests &&!running){ 510 return; 511 }else{ 512 513 notifyAll(); try{ 517 wait(); 518 }catch (InterruptedException ie) 519 { 520 } 522 } 523 } 524 } 525 } 526 527 private synchronized boolean stopRequested() 528 { 529 return stopRequested; 530 } 531 532 private synchronized boolean inPause() 533 { 534 return inPause; 535 } 536 537 540 protected synchronized void wakeUp() 541 { 542 if (!awakened) { 543 awakened = true; 545 if (waiting) { 546 notifyAll(); 547 } 548 } 549 } 550 551 554 private boolean rest() 555 { 556 if (SanityManager.DEBUG) 557 { 558 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 559 SanityManager.DEBUG(DaemonService.DaemonTrace, 560 "going back to rest"); 561 } 562 563 boolean urgentOnly; 564 boolean checkWallClock = false; 565 synchronized(this) 566 { 567 try 568 { 569 if (!awakened) { 570 waiting = true; 571 wait(DaemonService.TIMER_DELAY); 572 waiting = false; 573 } 574 } 575 catch (InterruptedException ie) 576 { 577 } 579 580 nextService = 0; 581 582 urgentOnly = awakened; 583 if (urgentOnly) { 585 if (earlyWakeupCount++ > (DaemonService.TIMER_DELAY / 500)) { 587 earlyWakeupCount = 0; 588 checkWallClock = true; 589 } 590 } 591 awakened = false; } 593 594 if (SanityManager.DEBUG) 595 { 596 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 597 SanityManager.DEBUG(DaemonService.DaemonTrace, 598 urgentOnly ? 599 "someone wakes me up" : 600 "wakes up by myself"); 601 } 602 603 if (checkWallClock) 604 { 605 long currenttime = System.currentTimeMillis(); 606 if ((currenttime - lastServiceTime) > DaemonService.TIMER_DELAY) 607 { 608 lastServiceTime = currenttime; 609 urgentOnly = false; 610 611 if (SanityManager.DEBUG) 612 { 613 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 614 SanityManager.DEBUG(DaemonService.DaemonTrace, 615 "wall clock check says service all"); 616 } 617 } 618 } 619 620 return urgentOnly; 621 } 622 623 private void work(boolean urgentOnly) 624 { 625 if (SanityManager.DEBUG) 626 { 627 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 628 SanityManager.DEBUG(DaemonService.DaemonTrace, 629 "going back to work"); 630 } 631 632 ServiceRecord work; 633 634 635 int serviceCount = 0; 638 639 int yieldFactor = 10; 640 if (urgentOnly && (highPQ.size() > OPTIMAL_QUEUE_SIZE)) 641 yieldFactor = 2; 642 643 int yieldCount = OPTIMAL_QUEUE_SIZE / yieldFactor; 644 645 646 for (work = nextAssignment(urgentOnly); 647 work != null; 648 work = nextAssignment(urgentOnly)) 649 { 650 if (SanityManager.DEBUG) 651 { 652 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 653 SanityManager.DEBUG(DaemonService.DaemonTrace, 654 "servicing " + work); 655 } 656 657 658 synchronized(this) 659 { 660 if (inPause || stopRequested) 661 break; running = true; 663 } 664 665 try 667 { 668 serviceClient(work); 669 serviceCount++; 670 } 671 finally 672 { 673 synchronized(this) 675 { 676 running = false; 677 notifyAll(); 678 if (inPause || stopRequested) 679 break; } 681 } 682 683 if (SanityManager.DEBUG) 684 { 685 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 686 SanityManager.DEBUG(DaemonService.DaemonTrace, 687 "done " + work); 688 } 689 690 if ((serviceCount % (OPTIMAL_QUEUE_SIZE / 2)) == 0) { 693 nextService = 0; 694 } 695 696 if ((serviceCount % yieldCount) == 0) { 697 698 yield(); 699 } 700 701 if (SanityManager.DEBUG) 702 { 703 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 704 SanityManager.DEBUG(DaemonService.DaemonTrace, 705 "come back from yield"); 706 } 707 } 708 } 709 710 711 712 private void yield() 713 { 714 Thread currentThread = Thread.currentThread(); 715 int oldPriority = currentThread.getPriority(); 716 717 if (oldPriority <= Thread.MIN_PRIORITY) 718 { 719 currentThread.yield(); 720 } 721 else 722 { 723 ModuleFactory mf = Monitor.getMonitor(); 724 if (mf != null) 725 mf.setThreadPriority(Thread.MIN_PRIORITY); 726 currentThread.yield(); 727 if (mf != null) 728 mf.setThreadPriority(oldPriority); 729 } 730 } 731 } 732 733 734 735 736 737 738 739 740 741 | Popular Tags |