KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > notification > engine > DefaultTaskProcessor


1 package org.jacorb.notification.engine;
2
3 /*
4  * JacORB - a free Java ORB
5  *
6  * Copyright (C) 1999-2003 Gerald Brose
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the Free
20  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */

23
24 import java.util.Date JavaDoc;
25
26 import org.apache.avalon.framework.configuration.Configuration;
27 import org.apache.avalon.framework.logger.Logger;
28 import org.jacorb.notification.conf.Attributes;
29 import org.jacorb.notification.conf.Default;
30 import org.jacorb.notification.interfaces.Disposable;
31 import org.jacorb.notification.interfaces.IProxyPushSupplier;
32 import org.jacorb.notification.interfaces.Message;
33 import org.jacorb.notification.interfaces.MessageSupplier;
34 import org.omg.CORBA.Any JavaDoc;
35 import org.omg.CosNotification.StructuredEvent;
36
37 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
38 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
39
40 /**
41  * @author Alphonse Bendt
42  * @version $Id: DefaultTaskProcessor.java,v 1.10 2005/05/01 21:10:09 alphonse.bendt Exp $
43  */

44
45 public class DefaultTaskProcessor implements TaskProcessor, Disposable
46 {
47     private class TimeoutTask implements Runnable JavaDoc, Message.MessageStateListener
48     {
49         Object JavaDoc timerRegistration_;
50
51         final Message message_;
52
53         public TimeoutTask(Message message)
54         {
55             message_ = message;
56             message_.setMessageStateListener(this);
57             timerRegistration_ = executeTaskAfterDelay(message.getTimeout(), this);
58         }
59
60         public void actionLifetimeChanged(long timeout)
61         {
62             ClockDaemon.cancel(timerRegistration_);
63             timerRegistration_ = executeTaskAfterDelay(message_.getTimeout(), this);
64         }
65
66         public void run()
67         {
68             logger_.debug("run Timeout");
69
70             message_.removeMessageStateListener();
71
72             message_.actionTimeout();
73         }
74     }
75
76     // //////////////////
77

78     private class DeferedStopTask implements Runnable JavaDoc
79     {
80         final Message message_;
81
82         public DeferedStopTask(Message message)
83         {
84             message_ = message;
85
86             executeTaskAt(message.getStopTime(), this);
87         }
88
89         public void run()
90         {
91             message_.actionTimeout();
92         }
93     }
94
95     // //////////////////
96

97     class DeferedStartTask implements Runnable JavaDoc
98     {
99         final Message message_;
100
101         DeferedStartTask(Message m)
102         {
103             if (logger_.isDebugEnabled())
104             {
105                 logger_.debug("Message with Option StartTime=" + m.getStartTime()
106                         + " will be defered until then");
107             }
108
109             message_ = m;
110
111             executeTaskAt(message_.getStartTime(), this);
112         }
113
114         public void run()
115         {
116             if (logger_.isDebugEnabled())
117             {
118                 logger_.debug("Defered Message " + message_ + " will be processed now");
119             }
120
121             processMessageInternal(message_);
122         }
123     }
124
125     // //////////////////
126

127     final Logger logger_;
128
129     /**
130      * TaskExecutor used to invoke match-Operation on filters
131      */

132     private TaskExecutor matchTaskExecutor_;
133
134     /**
135      * TaskExecutor used to invoke pull-Operation on PullSuppliers.
136      */

137     private TaskExecutor pullTaskExecutor_;
138
139     /**
140      * ClockDaemon to schedule Operation that must be run at a specific time.
141      */

142     private ClockDaemon clockDaemon_;
143
144     /**
145      * TaskFactory that is used to create new Tasks.
146      */

147     private DefaultTaskFactory taskFactory_;
148
149     // //////////////////////////////////////
150

151     /**
152      * Start ClockDaemon Set up TaskExecutors Set up TaskFactory
153      */

154     public DefaultTaskProcessor(Configuration config)
155     {
156         clockDaemon_ = new ClockDaemon();
157
158         clockDaemon_.setThreadFactory(new ThreadFactory()
159         {
160             public Thread JavaDoc newThread(Runnable JavaDoc command)
161             {
162                 Thread JavaDoc _t = new Thread JavaDoc(command);
163                 _t.setName("ClockDaemonThread");
164                 return _t;
165             }
166         });
167
168         logger_ = ((org.jacorb.config.Configuration) config).getNamedLogger(getClass().getName());
169
170         logger_.info("create TaskProcessor");
171
172         // create pull worker pool. allow pull workers to die
173
int _pullPoolSize = config.getAttributeAsInteger(Attributes.PULL_POOL_WORKERS,
174                 Default.DEFAULT_PULL_POOL_SIZE);
175
176         pullTaskExecutor_ = new DefaultTaskExecutor("PullThread", _pullPoolSize, true);
177
178         int _filterPoolSize = config.getAttributeAsInteger(Attributes.FILTER_POOL_WORKERS,
179                 Default.DEFAULT_FILTER_POOL_SIZE);
180
181         matchTaskExecutor_ = new DefaultTaskExecutor("FilterThread", _filterPoolSize);
182
183         taskFactory_ = new DefaultTaskFactory(this);
184
185         taskFactory_.configure(config);
186     }
187
188     public TaskFactory getTaskFactory()
189     {
190         return taskFactory_;
191     }
192
193     public TaskExecutor getFilterTaskExecutor()
194     {
195         return matchTaskExecutor_;
196     }
197
198     /**
199      * shutdown this TaskProcessor. The TaskExecutors will be shutdown, the running Threads
200      * interrupted and all allocated ressources will be freed. As the active Threads will be
201      * interrupted pending Events will be discarded.
202      */

203     public void dispose()
204     {
205         logger_.info("shutdown TaskProcessor");
206
207         clockDaemon_.shutDown();
208
209         matchTaskExecutor_.dispose();
210
211         pullTaskExecutor_.dispose();
212
213         taskFactory_.dispose();
214
215         logger_.debug("shutdown complete");
216     }
217
218     /**
219      * process a Message. the various settings for the Message (timeout, starttime, stoptime) are
220      * checked and applied.
221      */

222     public void processMessage(Message mesg)
223     {
224         if (mesg.hasStopTime())
225         {
226             logger_.debug("Message has StopTime");
227             if (mesg.getStopTime() <= System.currentTimeMillis())
228             {
229                 fireEventDiscarded(mesg);
230                 mesg.dispose();
231                 logger_.debug("Message Stoptime is passed already");
232
233                 return;
234             }
235
236             new DeferedStopTask(mesg);
237         }
238
239         if (mesg.hasTimeout())
240         {
241             logger_.debug("Message has TimeOut");
242             new TimeoutTask(mesg);
243         }
244
245         if (mesg.hasStartTime() && (mesg.getStartTime() > System.currentTimeMillis()))
246         {
247             new DeferedStartTask(mesg);
248         }
249         else
250         {
251             processMessageInternal(mesg);
252         }
253     }
254
255     /**
256      * process a Message. create FilterTask and schedule it.
257      */

258     protected void processMessageInternal(Message event)
259     {
260         logger_.debug("processMessageInternal");
261
262         Schedulable _task = taskFactory_.newFilterProxyConsumerTask(event);
263
264         try
265         {
266             _task.schedule();
267         } catch (InterruptedException JavaDoc ie)
268         {
269             logger_.info("Interrupt while scheduling FilterTask", ie);
270         }
271     }
272
273     /**
274      * Schedule ProxyPullConsumer for pull-Operation. If a Supplier connects to a ProxyPullConsumer
275      * the ProxyPullConsumer needs to regularely poll the Supplier. This method queues a Task to run
276      * runPullEvent on the specified TimerEventSupplier
277      */

278     public void scheduleTimedPullTask(MessageSupplier messageSupplier) throws InterruptedException JavaDoc
279     {
280         PullFromSupplierTask _task = new PullFromSupplierTask(pullTaskExecutor_);
281
282         _task.setTarget(messageSupplier);
283
284         _task.schedule();
285     }
286
287     /**
288      * Schedule MessageConsumer for a deliver-Operation. Some MessageConsumers (namely
289      * SequenceProxyPushSuppliers) need to push Messages regularely to its connected Consumer.
290      * Schedule a Task to call deliverPendingEvents on the specified MessageConsumer. Also used
291      * after a disabled MessageConsumer is enabled again to push the pending Messages.
292      */

293     public void schedulePushOperation(IProxyPushSupplier pushSupplier) throws InterruptedException JavaDoc
294     {
295         throw new UnsupportedOperationException JavaDoc();
296     }
297
298     // //////////////////////////////////////
299
// Timer Operations
300
// //////////////////////////////////////
301

302     /**
303      * access the Clock Daemon instance.
304      */

305     private ClockDaemon getClockDaemon()
306     {
307         return clockDaemon_;
308     }
309
310     public Object JavaDoc executeTaskPeriodically(long intervall, Runnable JavaDoc task, boolean startImmediately)
311     {
312         logger_.debug("executeTaskPeriodically");
313
314         return getClockDaemon().executePeriodically(intervall, task, startImmediately);
315     }
316
317     public void cancelTask(Object JavaDoc id)
318     {
319         ClockDaemon.cancel(id);
320     }
321
322     public Object JavaDoc executeTaskAfterDelay(long delay, Runnable JavaDoc task)
323     {
324         return clockDaemon_.executeAfterDelay(delay, task);
325     }
326
327     Object JavaDoc executeTaskAt(long startTime, Runnable JavaDoc task)
328     {
329         return executeTaskAt(new Date JavaDoc(startTime), task);
330     }
331
332     Object JavaDoc executeTaskAt(Date JavaDoc startTime, Runnable JavaDoc task)
333     {
334         return clockDaemon_.executeAt(startTime, task);
335     }
336
337     // //////////////////////////////////////
338

339     private void fireEventDiscarded(Message event)
340     {
341         switch (event.getType()) {
342         case Message.TYPE_ANY:
343             fireEventDiscarded(event.toAny());
344             break;
345
346         case Message.TYPE_STRUCTURED:
347             fireEventDiscarded(event.toStructuredEvent());
348             break;
349
350         default:
351             throw new RuntimeException JavaDoc();
352         }
353     }
354
355     private void fireEventDiscarded(Any JavaDoc a)
356     {
357         if (logger_.isDebugEnabled())
358         {
359             logger_.debug("Any: " + a + " has been discarded");
360         }
361     }
362
363     private void fireEventDiscarded(StructuredEvent e)
364     {
365         if (logger_.isDebugEnabled())
366         {
367             logger_.debug("StructuredEvent: " + e + " has been discarded");
368         }
369     }
370 }
Popular Tags