KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > derby > impl > services > daemon > BasicDaemon


1 /*
2
3    Derby - Class org.apache.derby.impl.services.daemon.BasicDaemon
4
5    Licensed to the Apache Software Foundation (ASF) under one or more
6    contributor license agreements. See the NOTICE file distributed with
7    this work for additional information regarding copyright ownership.
8    The ASF licenses this file to you under the Apache License, Version 2.0
9    (the "License"); you may not use this file except in compliance with
10    the License. You may obtain a copy of the License at
11
12       http://www.apache.org/licenses/LICENSE-2.0
13
14    Unless required by applicable law or agreed to in writing, software
15    distributed under the License is distributed on an "AS IS" BASIS,
16    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17    See the License for the specific language governing permissions and
18    limitations under the License.
19
20  */

21
22 package org.apache.derby.impl.services.daemon;
23
24 import org.apache.derby.iapi.services.context.ContextService;
25 import org.apache.derby.iapi.services.context.ContextManager;
26 import org.apache.derby.iapi.services.daemon.DaemonService;
27 import org.apache.derby.iapi.services.daemon.Serviceable;
28 import org.apache.derby.iapi.services.monitor.Monitor;
29 import org.apache.derby.iapi.services.monitor.ModuleFactory;
30 import org.apache.derby.iapi.services.sanity.SanityManager;
31
32 import org.apache.derby.iapi.error.StandardException;
33
34 import java.util.Vector JavaDoc;
35 import java.util.List JavaDoc;
36
37 /**
38     A BasicDaemon is a background worker thread which does asynchronous I/O and
39     general clean up. It should not be used as a general worker thread for
40     parallel execution.
41
42     One cannot count on the order of request or count on when the daemon will
43     wake up, even with serviceNow requests. Request are not persistent and not
44     recoverable, they are all lost when the system crashes or is shutdown.
45     System shutdown, even orderly ones, do not wait for daemons to finish its
46     work or empty its queue. Furthermore, any Serviceable subscriptions,
47     including onDemandOnly, must tolerate spurious services. The BasicDaemon
48     will setup a context manager with no context on it. The Serviceable
49     object's performWork must provide useful context on the context manager to
50     do its work. The BasicDaemon will wrap performWork call with try / catch
51     block and will use the ContextManager's error handling to clean up any
52     error. The BasicDaemon will guarentee serviceNow request will not be lost
53     as long as the jbms does not crash - however, if N serviceNow requests are
54     made by the same client, it may only be serviced once, not N times.
55
56     Many Serviceable object will subscribe to the same BasicDaemon. Their
57     performWork method should be well behaved - in other words, it should not
58     take too long or hog too many resources or deadlock with anyone else. And
59     it cannot (should not) error out.
60
61     The BasicDaemon implementation manages the DaemonService's data structure,
62     handles subscriptions and enqueues requests, and determine the service
63     schedule for its Serviceable objects. The BasicDaemon keeps an array
64     (Vector) of Serviceable subscriptions it also keeps 2 queues for clients
65     that uses it for one time service - the 1st queue is for a serviceNow
66     enqueue request, the 2nd queue is for non serviceNow enqueue request.
67
68     This BasicDaemon services its clients in the following order:
69     1. any subscribed client that have made a serviceNow request that has not
70                 been fulfilled
71     2. serviceable clients on the 1st queue
72     3. all subscribed clients that are not onDemandOnly
73     4. serviceable clients 2nd queue
74
75 */

76 public class BasicDaemon implements DaemonService, Runnable JavaDoc
77 {
78     private int numClients; // number of clients that needs services
79

80     private static final int OPTIMAL_QUEUE_SIZE = 100;
81
82     private final Vector JavaDoc subscription;
83
84     // the context this daemon should run with
85
protected final ContextService contextService;
86     protected final ContextManager contextMgr;
87
88     /**
89         Queues for the work to be done.
90         These are synchronized by this object.
91     */

92     private final List JavaDoc highPQ; // high priority queue
93
private final List JavaDoc normPQ; // normal priority queue
94

95     /**
96         which subscribed clients to service next?
97         only accessed by daemon thread
98     */

99     private int nextService;
100
101     /*
102     ** State for the sleep/wakeup routines.
103     */

104
105     private boolean awakened; // a wake up call has been issued
106
// MT - synchronized on this
107

108     /**
109         true if I'm waiting, if this is false then I am running and a notify is not required.
110     */

111     private boolean waiting;
112
113     private boolean inPause; // if true, don't do anything
114
private boolean running; // I am running now
115
private boolean stopRequested; // thread is requested to die
116
private boolean stopped; // we have stopped
117

118     private long lastServiceTime; // when did I last wake up on a timer
119
private int earlyWakeupCount; // if I am waken up a couple of times, check
120
// that lastServiceTime to make sure work
121
// scheduled on a timer gets done once in a
122
// while
123

124     /**
125         make a BasicDaemon
126     */

127     public BasicDaemon(ContextService contextService)
128     {
129         this.contextService = contextService;
130         this.contextMgr = contextService.newContextManager();
131
132         subscription = new Vector JavaDoc(1, 1);
133         highPQ = new java.util.LinkedList JavaDoc();
134         normPQ = new java.util.LinkedList JavaDoc();
135         
136         lastServiceTime = System.currentTimeMillis();
137     }
138
139     public int subscribe(Serviceable newClient, boolean onDemandOnly)
140     {
141         int clientNumber;
142
143         ServiceRecord clientRecord;
144
145         synchronized(this)
146         {
147             clientNumber = numClients++;
148
149             clientRecord = new ServiceRecord(newClient, onDemandOnly, true);
150             subscription.insertElementAt(clientRecord, clientNumber);
151         }
152
153
154         if (SanityManager.DEBUG)
155         {
156             if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
157                 SanityManager.DEBUG(DaemonService.DaemonTrace,
158                                 "subscribed client # " + clientNumber + " : " +
159                                 clientRecord);
160         }
161
162         return clientNumber;
163     }
164
165     /**
166      * Removes a client from the list of subscribed clients. The call does not
167      * wait for the daemon to finish the work it is currently performing.
168      * Therefore, the client must tolerate that its <code>performWork()</code>
169      * method could be invoked even after the call to
170      * <code>unsubscribe()</code> has returned (but not more than once).
171      *
172      * @param clientNumber client identifier
173      */

174     public void unsubscribe(int clientNumber)
175     {
176         if (clientNumber < 0 || clientNumber > subscription.size())
177             return;
178
179         // client number is never reused. Just null out the vector entry.
180
subscription.setElementAt(null, clientNumber);
181     }
182
183     public void serviceNow(int clientNumber)
184     {
185         if (clientNumber < 0 || clientNumber > subscription.size())
186             return;
187
188         ServiceRecord clientRecord = (ServiceRecord)subscription.elementAt(clientNumber);
189         if (clientRecord == null)
190             return;
191
192         clientRecord.called();
193         wakeUp();
194     }
195
196     public boolean enqueue(Serviceable newClient, boolean serviceNow)
197     {
198         ServiceRecord clientRecord = new ServiceRecord(newClient, false, false);
199
200         if (SanityManager.DEBUG)
201         {
202             if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
203                 SanityManager.DEBUG(DaemonService.DaemonTrace,
204                                     "enqueing work, urgent = " + serviceNow + ":" + newClient );
205         }
206
207
208         List JavaDoc queue = serviceNow ? highPQ : normPQ;
209
210         int highPQsize;
211         synchronized (this) {
212             queue.add(clientRecord);
213             highPQsize = highPQ.size();
214
215             if (SanityManager.DEBUG) {
216
217                 if (SanityManager.DEBUG_ON("memoryLeakTrace")) {
218
219                     if (highPQsize > (OPTIMAL_QUEUE_SIZE * 2))
220                         System.out.println("memoryLeakTrace:BasicDaemon " + highPQsize);
221                 }
222             }
223         }
224
225         if (serviceNow && !awakened)
226             wakeUp();
227
228         if (serviceNow) {
229             return highPQsize > OPTIMAL_QUEUE_SIZE;
230         }
231         return false;
232     }
233
234     /**
235         Get rid of all queued up Serviceable tasks.
236      */

237     public synchronized void clear()
238     {
239         normPQ.clear();
240         highPQ.clear();
241     }
242
243     /*
244      * class specific methods
245      */

246
247     protected ServiceRecord nextAssignment(boolean urgent)
248     {
249         // first goes thru the subscription list, then goes thru highPQ;
250
ServiceRecord clientRecord;
251
252         while (nextService < subscription.size())
253         {
254             clientRecord = (ServiceRecord)subscription.elementAt(nextService++);
255             if (clientRecord != null && (clientRecord.needImmediateService() || (!urgent && clientRecord.needService())))
256                 return clientRecord;
257         }
258
259         clientRecord = null;
260
261         synchronized(this)
262         {
263             if (!highPQ.isEmpty())
264                 clientRecord = (ServiceRecord) highPQ.remove(0);
265         }
266
267         if (urgent || clientRecord != null)
268         {
269             if (SanityManager.DEBUG)
270             {
271                 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
272                     SanityManager.DEBUG(DaemonService.DaemonTrace,
273                                     clientRecord == null ?
274                                     "No more urgent assignment " :
275                                     "Next urgent assignment : " + clientRecord);
276             }
277             
278             return clientRecord;
279         }
280
281         clientRecord = null;
282         synchronized(this)
283         {
284             if (!normPQ.isEmpty())
285             {
286                 clientRecord = (ServiceRecord)normPQ.remove(0);
287
288                 if (SanityManager.DEBUG)
289                 {
290                     if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
291                         SanityManager.DEBUG(DaemonService.DaemonTrace,
292                                         "Next normal enqueued : " + clientRecord);
293                 }
294             }
295
296             // else no more work
297
}
298
299         if (SanityManager.DEBUG)
300         {
301             if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
302             {
303                 if (clientRecord == null)
304                     SanityManager.DEBUG(DaemonService.DaemonTrace, "No more assignment");
305             }
306         }
307
308         return clientRecord;
309     }
310
311     protected void serviceClient(ServiceRecord clientRecord)
312     {
313         clientRecord.serviced();
314
315         Serviceable client = clientRecord.client;
316
317         // client may have unsubscribed while it had items queued
318
if (client == null)
319             return;
320
321         ContextManager cm = contextMgr;
322
323         if (SanityManager.DEBUG)
324         {
325             SanityManager.ASSERT(cm != null, "Context manager is null");
326             SanityManager.ASSERT(client != null, "client is null");
327         }
328
329         try
330         {
331             int status = client.performWork(cm);
332
333             if (clientRecord.subscriber)
334                 return;
335
336             if (status == Serviceable.REQUEUE)
337             {
338                 List JavaDoc queue = client.serviceASAP() ? highPQ : normPQ;
339                 synchronized (this) {
340                     queue.add(clientRecord);
341
342                     if (SanityManager.DEBUG) {
343
344                         if (SanityManager.DEBUG_ON("memoryLeakTrace")) {
345
346                             if (queue.size() > (OPTIMAL_QUEUE_SIZE * 2))
347                                 System.out.println("memoryLeakTrace:BasicDaemon " + queue.size());
348                         }
349                     }
350                 }
351             }
352
353             return;
354         }
355         catch (Throwable JavaDoc e)
356         {
357             if (SanityManager.DEBUG)
358                 SanityManager.showTrace(e);
359             cm.cleanupOnError(e);
360         }
361     }
362
363     /*
364      * Runnable methods
365      */

366     public void run()
367     {
368         contextService.setCurrentContextManager(contextMgr);
369
370         if (SanityManager.DEBUG)
371         {
372             if (SanityManager.DEBUG_ON(DaemonService.DaemonOff))
373             {
374                 SanityManager.DEBUG(DaemonService.DaemonTrace, "DaemonOff is set in properties, background Daemon not run");
375                 return;
376             }
377             SanityManager.DEBUG(DaemonService.DaemonTrace, "running");
378         }
379
380         // infinite loop of rest and work
381
while(true)
382         {
383             if (stopRequested())
384                 break;
385
386             // if someone wake me up, only service the urgent requests.
387
// if I wake up by my regular schedule, service all clients
388
boolean urgentOnly = rest();
389
390             if (stopRequested())
391                 break;
392
393             if (!inPause())
394                 work(urgentOnly);
395         }
396
397         synchronized(this)
398         {
399             running = false;
400             stopped = true;
401         }
402         contextMgr.cleanupOnError(StandardException.normalClose());
403         contextService.resetCurrentContextManager(contextMgr);
404     }
405
406     /*
407      * Daemon Service method
408      */

409
410     /*
411      * pause the daemon. Wait till it is no running before it returns
412      */

413     public void pause()
414     {
415         if (SanityManager.DEBUG)
416         {
417             if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
418                 SanityManager.DEBUG(DaemonService.DaemonTrace, "pausing daemon");
419         }
420
421         synchronized(this)
422         {
423             inPause = true;
424             while(running)
425             {
426                 if (SanityManager.DEBUG)
427                 {
428                     if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
429                         SanityManager.DEBUG(DaemonService.DaemonTrace,
430                                         "waiting for daemon run to finish");
431                 }
432
433                 try
434                 {
435                     wait();
436                 }
437                 catch (InterruptedException JavaDoc ie)
438                 {
439                     // someone interrrupt us, done running
440
}
441             }
442         }
443
444         if (SanityManager.DEBUG)
445         {
446             if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
447                 SanityManager.DEBUG(DaemonService.DaemonTrace,
448                                 "daemon paused");
449         }
450     }
451
452     public void resume()
453     {
454         synchronized(this)
455         {
456             inPause = false;
457         }
458
459         if (SanityManager.DEBUG)
460         {
461             if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
462                 SanityManager.DEBUG(DaemonService.DaemonTrace,
463                                 "daemon resumed");
464         }
465     }
466
467     /**
468         Finish what we are doing and at the next convenient moment, get rid of
469         the thread and make the daemon object goes away if possible.
470
471         remember we are calling from another thread
472      */

473     public void stop()
474     {
475         if (stopped) // already stopped
476
return;
477
478         synchronized(this)
479         {
480             stopRequested = true;
481             notifyAll(); // get sleeper to wake up and stop ASAP
482
}
483
484         pause(); // finish doing what we are doing first
485

486     }
487
488     /*
489     **Wait until the work in the high priority queue is done.
490     **Note: Used by tests only to make sure all the work
491     **assigned to the daemon is completed.
492     **/

493     public void waitUntilQueueIsEmpty()
494     {
495         while(true){
496             synchronized(this)
497             {
498                 boolean noSubscriptionRequests = true;
499                 for (int urgentServiced = 0; urgentServiced < subscription.size(); urgentServiced++)
500                 {
501                     ServiceRecord clientRecord = (ServiceRecord)subscription.elementAt(urgentServiced);
502                     if (clientRecord != null && clientRecord.needService())
503                     {
504                         noSubscriptionRequests = false;
505                         break;
506                     }
507                 }
508
509                 if (highPQ.isEmpty() && noSubscriptionRequests &&!running){
510                     return;
511                 }else{
512
513                     notifyAll(); //wake up the the daemon thread
514
//wait for the raw store daemon to wakeus up
515
//when it finihes work.
516
try{
517                         wait();
518                     }catch (InterruptedException JavaDoc ie)
519                     {
520                         // someone interrupt us, see what's going on
521
}
522                 }
523             }
524         }
525     }
526
527     private synchronized boolean stopRequested()
528     {
529         return stopRequested;
530     }
531
532     private synchronized boolean inPause()
533     {
534         return inPause;
535     }
536
537     /*
538      * BasicDaemon method
539      */

540     protected synchronized void wakeUp()
541     {
542         if (!awakened) {
543             awakened = true; // I am being awakened for urgent work.
544

545             if (waiting) {
546                 notifyAll();
547             }
548         }
549     }
550
551     /**
552         Returns true if awakened by some notification, false if wake up by timer
553     */

554     private boolean rest()
555     {
556         if (SanityManager.DEBUG)
557         {
558             if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
559                 SanityManager.DEBUG(DaemonService.DaemonTrace,
560                                 "going back to rest");
561         }
562
563         boolean urgentOnly;
564         boolean checkWallClock = false;
565         synchronized(this)
566         {
567             try
568             {
569                 if (!awakened) {
570                     waiting = true;
571                     wait(DaemonService.TIMER_DELAY);
572                     waiting = false;
573                 }
574             }
575             catch (InterruptedException JavaDoc ie)
576             {
577                 // someone interrupt us, see what's going on
578
}
579
580             nextService = 0;
581
582             urgentOnly = awakened;
583             if (urgentOnly) // check wall clock
584
{
585                 // take a guess that each early request is services every 500ms.
586
if (earlyWakeupCount++ > (DaemonService.TIMER_DELAY / 500)) {
587                     earlyWakeupCount = 0;
588                     checkWallClock = true;
589                 }
590             }
591             awakened = false; // reset this for next time
592
}
593
594         if (SanityManager.DEBUG)
595         {
596             if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
597                 SanityManager.DEBUG(DaemonService.DaemonTrace,
598                                 urgentOnly ?
599                                 "someone wakes me up" :
600                                 "wakes up by myself");
601         }
602
603         if (checkWallClock)
604         {
605             long currenttime = System.currentTimeMillis();
606             if ((currenttime - lastServiceTime) > DaemonService.TIMER_DELAY)
607             {
608                 lastServiceTime = currenttime;
609                 urgentOnly = false;
610
611                 if (SanityManager.DEBUG)
612                 {
613                     if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
614                         SanityManager.DEBUG(DaemonService.DaemonTrace,
615                                         "wall clock check says service all");
616                 }
617             }
618         }
619
620         return urgentOnly;
621     }
622
623     private void work(boolean urgentOnly)
624     {
625         if (SanityManager.DEBUG)
626         {
627             if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
628                 SanityManager.DEBUG(DaemonService.DaemonTrace,
629                                 "going back to work");
630         }
631
632         ServiceRecord work;
633
634
635         // while I am working, all serviceNow requests that comes in now will
636
// be taken care of when we get the next Assignment.
637
int serviceCount = 0;
638
639         int yieldFactor = 10;
640         if (urgentOnly && (highPQ.size() > OPTIMAL_QUEUE_SIZE))
641             yieldFactor = 2;
642
643         int yieldCount = OPTIMAL_QUEUE_SIZE / yieldFactor;
644
645
646         for (work = nextAssignment(urgentOnly);
647              work != null;
648              work = nextAssignment(urgentOnly))
649         {
650             if (SanityManager.DEBUG)
651             {
652                 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
653                     SanityManager.DEBUG(DaemonService.DaemonTrace,
654                                         "servicing " + work);
655             }
656
657
658             synchronized(this)
659             {
660                 if (inPause || stopRequested)
661                     break; // don't do anything more
662
running = true;
663             }
664
665             // do work
666
try
667             {
668                 serviceClient(work);
669                 serviceCount++;
670             }
671             finally
672             {
673                 // catch run time exceptions
674
synchronized(this)
675                 {
676                     running = false;
677                     notifyAll();
678                     if (inPause || stopRequested)
679                         break; // don't do anything more
680
}
681             }
682
683             if (SanityManager.DEBUG)
684             {
685                 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
686                     SanityManager.DEBUG(DaemonService.DaemonTrace,
687                                         "done " + work);
688             }
689
690             // ensure the subscribed clients get a look in once in a while
691
// when the queues are large.
692
if ((serviceCount % (OPTIMAL_QUEUE_SIZE / 2)) == 0) {
693                 nextService = 0;
694             }
695
696             if ((serviceCount % yieldCount) == 0) {
697
698                 yield();
699             }
700
701             if (SanityManager.DEBUG)
702             {
703                 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
704                     SanityManager.DEBUG(DaemonService.DaemonTrace,
705                                         "come back from yield");
706             }
707         }
708     }
709
710
711     /* let everybody else run first */
712     private void yield()
713     {
714         Thread JavaDoc currentThread = Thread.currentThread();
715         int oldPriority = currentThread.getPriority();
716
717         if (oldPriority <= Thread.MIN_PRIORITY)
718         {
719             currentThread.yield();
720         }
721         else
722         {
723             ModuleFactory mf = Monitor.getMonitor();
724             if (mf != null)
725                 mf.setThreadPriority(Thread.MIN_PRIORITY);
726             currentThread.yield();
727             if (mf != null)
728                 mf.setThreadPriority(oldPriority);
729         }
730     }
731 }
732
733
734
735
736
737
738
739
740
741
Popular Tags