KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > thread > PooledTaskRunner


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.thread;
19
20 import java.util.concurrent.Executor JavaDoc;
21
22 /**
23  *
24  * @version $Revision: 1.1 $
25  */

26 class PooledTaskRunner implements TaskRunner {
27
28     private final int maxIterationsPerRun;
29     private final Executor JavaDoc executor;
30     private final Task task;
31     private final Runnable JavaDoc runable;
32     private boolean queued;
33     private boolean shutdown;
34     private boolean iterating;
35     private Thread JavaDoc runningThread;
36     
37     public PooledTaskRunner(Executor JavaDoc executor, Task task, int maxIterationsPerRun) {
38         this.executor = executor;
39         this.maxIterationsPerRun = maxIterationsPerRun;
40         this.task = task;
41         runable = new Runnable JavaDoc() {
42             public void run() {
43                 runningThread = Thread.currentThread();
44                 runTask();
45                 runningThread = null;
46             }
47         };
48     }
49     
50
51
52     /**
53      * We Expect MANY wakeup calls on the same TaskRunner.
54      */

55     public void wakeup() throws InterruptedException JavaDoc {
56         synchronized( runable ) {
57             
58             // When we get in here, we make some assumptions of state:
59
// queued=false, iterating=false: wakeup() has not be called and therefore task is not executing.
60
// queued=true, iterating=false: wakeup() was called but, task execution has not started yet
61
// queued=false, iterating=true : wakeup() was called, which caused task execution to start.
62
// queued=true, iterating=true : wakeup() called after task execution was started.
63

64             if( queued || shutdown )
65                 return;
66             
67             queued=true;
68             
69             // The runTask() method will do this for me once we are done iterating.
70
if( !iterating ) {
71                 executor.execute(runable);
72             }
73         }
74     }
75
76     /**
77      * shut down the task
78      * @throws InterruptedException
79      */

80     public void shutdown(long timeout) throws InterruptedException JavaDoc{
81         synchronized(runable){
82             shutdown=true;
83             //the check on the thread is done
84
//because a call to iterate can result in
85
//shutDown() being called, which would wait forever
86
//waiting for iterating to finish
87
if(runningThread!=Thread.currentThread()){
88                 if(iterating==true){
89                     runable.wait(timeout);
90                 }
91             }
92         }
93     }
94     
95     
96     public void shutdown() throws InterruptedException JavaDoc {
97         shutdown(0);
98     }
99     private void runTask() {
100         
101         synchronized (runable) {
102             queued = false;
103             if( shutdown ) {
104                 iterating = false;
105                 runable.notifyAll();
106                 return;
107             }
108             iterating = true;
109         }
110         
111         // Don't synchronize while we are iterating so that
112
// multiple wakeup() calls can be executed concurrently.
113
boolean done=false;
114         for (int i = 0; i < maxIterationsPerRun; i++) {
115             if( !task.iterate() ) {
116                 done=true;
117                 break;
118             }
119         }
120         
121         synchronized (runable) {
122             iterating=false;
123             if( shutdown ) {
124                 queued=false;
125                 runable.notifyAll();
126                 return;
127             }
128             
129             // If we could not iterate all the items
130
// then we need to re-queue.
131
if( !done )
132                 queued = true;
133             
134             if( queued ) {
135                 executor.execute(runable);
136             }
137         }
138     }
139 }
140
Popular Tags