1 package org.jacorb.notification.engine; 2 3 23 24 import java.util.Date ; 25 26 import org.apache.avalon.framework.configuration.Configuration; 27 import org.apache.avalon.framework.logger.Logger; 28 import org.jacorb.notification.conf.Attributes; 29 import org.jacorb.notification.conf.Default; 30 import org.jacorb.notification.interfaces.Disposable; 31 import org.jacorb.notification.interfaces.IProxyPushSupplier; 32 import org.jacorb.notification.interfaces.Message; 33 import org.jacorb.notification.interfaces.MessageSupplier; 34 import org.omg.CORBA.Any ; 35 import org.omg.CosNotification.StructuredEvent; 36 37 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon; 38 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; 39 40 44 45 public class DefaultTaskProcessor implements TaskProcessor, Disposable 46 { 47 private class TimeoutTask implements Runnable , Message.MessageStateListener 48 { 49 Object timerRegistration_; 50 51 final Message message_; 52 53 public TimeoutTask(Message message) 54 { 55 message_ = message; 56 message_.setMessageStateListener(this); 57 timerRegistration_ = executeTaskAfterDelay(message.getTimeout(), this); 58 } 59 60 public void actionLifetimeChanged(long timeout) 61 { 62 ClockDaemon.cancel(timerRegistration_); 63 timerRegistration_ = executeTaskAfterDelay(message_.getTimeout(), this); 64 } 65 66 public void run() 67 { 68 logger_.debug("run Timeout"); 69 70 message_.removeMessageStateListener(); 71 72 message_.actionTimeout(); 73 } 74 } 75 76 78 private class DeferedStopTask implements Runnable 79 { 80 final Message message_; 81 82 public DeferedStopTask(Message message) 83 { 84 message_ = message; 85 86 executeTaskAt(message.getStopTime(), this); 87 } 88 89 public void run() 90 { 91 message_.actionTimeout(); 92 } 93 } 94 95 97 class DeferedStartTask implements Runnable 98 { 99 final Message message_; 100 101 DeferedStartTask(Message m) 102 { 103 if (logger_.isDebugEnabled()) 104 { 105 logger_.debug("Message with Option StartTime=" + m.getStartTime() 106 + " will be defered until then"); 107 } 108 109 message_ = m; 110 111 executeTaskAt(message_.getStartTime(), this); 112 } 113 114 public void run() 115 { 116 if (logger_.isDebugEnabled()) 117 { 118 logger_.debug("Defered Message " + message_ + " will be processed now"); 119 } 120 121 processMessageInternal(message_); 122 } 123 } 124 125 127 final Logger logger_; 128 129 132 private TaskExecutor matchTaskExecutor_; 133 134 137 private TaskExecutor pullTaskExecutor_; 138 139 142 private ClockDaemon clockDaemon_; 143 144 147 private DefaultTaskFactory taskFactory_; 148 149 151 154 public DefaultTaskProcessor(Configuration config) 155 { 156 clockDaemon_ = new ClockDaemon(); 157 158 clockDaemon_.setThreadFactory(new ThreadFactory() 159 { 160 public Thread newThread(Runnable command) 161 { 162 Thread _t = new Thread (command); 163 _t.setName("ClockDaemonThread"); 164 return _t; 165 } 166 }); 167 168 logger_ = ((org.jacorb.config.Configuration) config).getNamedLogger(getClass().getName()); 169 170 logger_.info("create TaskProcessor"); 171 172 int _pullPoolSize = config.getAttributeAsInteger(Attributes.PULL_POOL_WORKERS, 174 Default.DEFAULT_PULL_POOL_SIZE); 175 176 pullTaskExecutor_ = new DefaultTaskExecutor("PullThread", _pullPoolSize, true); 177 178 int _filterPoolSize = config.getAttributeAsInteger(Attributes.FILTER_POOL_WORKERS, 179 Default.DEFAULT_FILTER_POOL_SIZE); 180 181 matchTaskExecutor_ = new DefaultTaskExecutor("FilterThread", _filterPoolSize); 182 183 taskFactory_ = new DefaultTaskFactory(this); 184 185 taskFactory_.configure(config); 186 } 187 188 public TaskFactory getTaskFactory() 189 { 190 return taskFactory_; 191 } 192 193 public TaskExecutor getFilterTaskExecutor() 194 { 195 return matchTaskExecutor_; 196 } 197 198 203 public void dispose() 204 { 205 logger_.info("shutdown TaskProcessor"); 206 207 clockDaemon_.shutDown(); 208 209 matchTaskExecutor_.dispose(); 210 211 pullTaskExecutor_.dispose(); 212 213 taskFactory_.dispose(); 214 215 logger_.debug("shutdown complete"); 216 } 217 218 222 public void processMessage(Message mesg) 223 { 224 if (mesg.hasStopTime()) 225 { 226 logger_.debug("Message has StopTime"); 227 if (mesg.getStopTime() <= System.currentTimeMillis()) 228 { 229 fireEventDiscarded(mesg); 230 mesg.dispose(); 231 logger_.debug("Message Stoptime is passed already"); 232 233 return; 234 } 235 236 new DeferedStopTask(mesg); 237 } 238 239 if (mesg.hasTimeout()) 240 { 241 logger_.debug("Message has TimeOut"); 242 new TimeoutTask(mesg); 243 } 244 245 if (mesg.hasStartTime() && (mesg.getStartTime() > System.currentTimeMillis())) 246 { 247 new DeferedStartTask(mesg); 248 } 249 else 250 { 251 processMessageInternal(mesg); 252 } 253 } 254 255 258 protected void processMessageInternal(Message event) 259 { 260 logger_.debug("processMessageInternal"); 261 262 Schedulable _task = taskFactory_.newFilterProxyConsumerTask(event); 263 264 try 265 { 266 _task.schedule(); 267 } catch (InterruptedException ie) 268 { 269 logger_.info("Interrupt while scheduling FilterTask", ie); 270 } 271 } 272 273 278 public void scheduleTimedPullTask(MessageSupplier messageSupplier) throws InterruptedException 279 { 280 PullFromSupplierTask _task = new PullFromSupplierTask(pullTaskExecutor_); 281 282 _task.setTarget(messageSupplier); 283 284 _task.schedule(); 285 } 286 287 293 public void schedulePushOperation(IProxyPushSupplier pushSupplier) throws InterruptedException 294 { 295 throw new UnsupportedOperationException (); 296 } 297 298 302 305 private ClockDaemon getClockDaemon() 306 { 307 return clockDaemon_; 308 } 309 310 public Object executeTaskPeriodically(long intervall, Runnable task, boolean startImmediately) 311 { 312 logger_.debug("executeTaskPeriodically"); 313 314 return getClockDaemon().executePeriodically(intervall, task, startImmediately); 315 } 316 317 public void cancelTask(Object id) 318 { 319 ClockDaemon.cancel(id); 320 } 321 322 public Object executeTaskAfterDelay(long delay, Runnable task) 323 { 324 return clockDaemon_.executeAfterDelay(delay, task); 325 } 326 327 Object executeTaskAt(long startTime, Runnable task) 328 { 329 return executeTaskAt(new Date (startTime), task); 330 } 331 332 Object executeTaskAt(Date startTime, Runnable task) 333 { 334 return clockDaemon_.executeAt(startTime, task); 335 } 336 337 339 private void fireEventDiscarded(Message event) 340 { 341 switch (event.getType()) { 342 case Message.TYPE_ANY: 343 fireEventDiscarded(event.toAny()); 344 break; 345 346 case Message.TYPE_STRUCTURED: 347 fireEventDiscarded(event.toStructuredEvent()); 348 break; 349 350 default: 351 throw new RuntimeException (); 352 } 353 } 354 355 private void fireEventDiscarded(Any a) 356 { 357 if (logger_.isDebugEnabled()) 358 { 359 logger_.debug("Any: " + a + " has been discarded"); 360 } 361 } 362 363 private void fireEventDiscarded(StructuredEvent e) 364 { 365 if (logger_.isDebugEnabled()) 366 { 367 logger_.debug("StructuredEvent: " + e + " has been discarded"); 368 } 369 } 370 } | Popular Tags |