1 package org.jacorb.notification.servant; 2 3 23 24 import org.apache.avalon.framework.configuration.Configuration; 25 import org.apache.avalon.framework.configuration.ConfigurationException; 26 import org.jacorb.notification.OfferManager; 27 import org.jacorb.notification.SubscriptionManager; 28 import org.jacorb.notification.engine.PushOperation; 29 import org.jacorb.notification.engine.PushTaskExecutorFactory; 30 import org.jacorb.notification.engine.TaskProcessor; 31 import org.jacorb.notification.interfaces.Message; 32 import org.jacorb.notification.util.PropertySet; 33 import org.jacorb.notification.util.PropertySetAdapter; 34 import org.omg.CORBA.ORB ; 35 import org.omg.CosEventChannelAdmin.AlreadyConnected; 36 import org.omg.CosEventChannelAdmin.TypeError; 37 import org.omg.CosEventComm.Disconnected; 38 import org.omg.CosNotification.MaximumBatchSize; 39 import org.omg.CosNotification.PacingInterval; 40 import org.omg.CosNotification.StructuredEvent; 41 import org.omg.CosNotification.UnsupportedQoS; 42 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin; 43 import org.omg.CosNotifyChannelAdmin.ProxyType; 44 import org.omg.CosNotifyChannelAdmin.SequenceProxyPushSupplierOperations; 45 import org.omg.CosNotifyChannelAdmin.SequenceProxyPushSupplierPOATie; 46 import org.omg.CosNotifyComm.SequencePushConsumer; 47 import org.omg.PortableServer.POA ; 48 import org.omg.PortableServer.Servant ; 49 import org.omg.TimeBase.TimeTHelper; 50 51 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 52 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; 53 54 58 59 public class SequenceProxyPushSupplierImpl extends StructuredProxyPushSupplierImpl implements 60 SequenceProxyPushSupplierOperations 61 { 62 private class PushSequenceOperation implements PushOperation 63 { 64 private final StructuredEvent[] structuredEvents_; 65 66 public PushSequenceOperation(StructuredEvent[] structuredEvents) 67 { 68 structuredEvents_ = structuredEvents; 69 } 70 71 public void invokePush() throws Disconnected 72 { 73 deliverPendingMessagesInternal(structuredEvents_); 74 } 75 76 public void dispose() 77 { 78 } 80 } 81 82 public SequenceProxyPushSupplierImpl(IAdmin admin, ORB orb, POA poa, Configuration config, 83 TaskProcessor taskProcessor, PushTaskExecutorFactory pushTaskExecutorFactory, OfferManager offerManager, 84 SubscriptionManager subscriptionManager, ConsumerAdmin consumerAdmin) 85 throws ConfigurationException 86 { 87 super(admin, orb, poa, config, taskProcessor, pushTaskExecutorFactory, offerManager, 88 subscriptionManager, consumerAdmin); 89 90 configureMaxBatchSize(); 91 92 configurePacingInterval(); 93 94 schedulePushOperation_ = new Runnable () 95 { 96 public void run() 97 { 98 schedulePush(); 99 } 100 }; 101 102 qosSettings_.addPropertySetListener(MaximumBatchSize.value, new PropertySetAdapter() 103 { 104 public void actionPropertySetChanged(PropertySet source) throws UnsupportedQoS 105 { 106 configureMaxBatchSize(); 107 } 108 }); 109 110 qosSettings_.addPropertySetListener(PacingInterval.value, new PropertySetAdapter() 111 { 112 public void actionPropertySetChanged(PropertySet source) throws UnsupportedQoS 113 { 114 configurePacingInterval(); 115 } 116 }); 117 } 118 119 private final Runnable schedulePushOperation_; 120 121 124 private SequencePushConsumer sequencePushConsumer_; 125 126 129 private Object taskId_; 130 131 134 private final SynchronizedInt maxBatchSize_ = new SynchronizedInt(1); 135 136 139 private final SynchronizedLong pacingInterval_ = new SynchronizedLong(0); 140 141 private long timeSpent_ = 0; 142 143 149 public ProxyType MyType() 152 { 153 return ProxyType.PUSH_SEQUENCE; 154 } 155 156 159 public void pushPendingData() 160 { 161 deliverPendingMessages(false); 162 } 163 164 private void deliverPendingMessages(boolean flush) 165 { 166 final Message[] _messages; 167 168 if (flush) 169 { 170 _messages = getAllMessages(); 171 } 172 else 173 { 174 _messages = getAtLeastMessages(maxBatchSize_.get()); 175 } 176 177 if (_messages != null && _messages.length > 0) 178 { 179 final StructuredEvent[] _structuredEvents = new StructuredEvent[_messages.length]; 180 181 for (int x = 0; x < _messages.length; ++x) 182 { 183 _structuredEvents[x] = _messages[x].toStructuredEvent(); 184 185 _messages[x].dispose(); 186 } 187 188 try 189 { 190 deliverPendingMessagesInternal(_structuredEvents); 191 } catch (Throwable e) 192 { 193 PushSequenceOperation _failedOperation = new PushSequenceOperation( 194 _structuredEvents); 195 196 handleFailedPushOperation(_failedOperation, e); 197 } 198 } 199 } 200 201 void deliverPendingMessagesInternal(final StructuredEvent[] structuredEvents) 202 throws Disconnected 203 { 204 long now = System.currentTimeMillis(); 205 sequencePushConsumer_.push_structured_events(structuredEvents); 206 timeSpent_ += (System.currentTimeMillis() - now); 207 resetErrorCounter(); 208 } 209 210 public void connect_sequence_push_consumer(SequencePushConsumer consumer) 211 throws AlreadyConnected, TypeError 212 { 213 logger_.debug("connect_sequence_push_consumer"); 214 215 checkIsNotConnected(); 216 217 sequencePushConsumer_ = consumer; 218 219 connectClient(consumer); 220 221 startCronJob(); 222 } 223 224 protected void connectionResumed() 225 { 226 schedulePush(); 227 228 startCronJob(); 229 } 230 231 protected void connectionSuspended() 232 { 233 stopCronJob(); 234 } 235 236 public void disconnect_sequence_push_supplier() 237 { 238 destroy(); 239 } 240 241 protected void disconnectClient() 242 { 243 stopCronJob(); 244 245 sequencePushConsumer_.disconnect_sequence_push_consumer(); 246 sequencePushConsumer_ = null; 247 } 248 249 private void startCronJob() 250 { 251 if (pacingInterval_.get() > 0 && taskId_ != null) 252 { 253 taskId_ = getTaskProcessor().executeTaskPeriodically(pacingInterval_.get(), 254 schedulePushOperation_, true); 255 } 256 } 257 258 synchronized private void stopCronJob() 259 { 260 if (taskId_ != null) 261 { 262 getTaskProcessor().cancelTask(taskId_); 263 taskId_ = null; 264 } 265 } 266 267 private void checkCronJob() 268 { 269 if (pacingInterval_.get() > 0) 270 { 271 startCronJob(); 272 } 273 else 274 { 275 stopCronJob(); 276 } 277 } 278 279 private boolean configurePacingInterval() 280 { 281 if (qosSettings_.containsKey(PacingInterval.value)) 282 { 283 long _pacingInterval = TimeTHelper.extract(qosSettings_.get(PacingInterval.value)); 284 285 if (pacingInterval_.get() != _pacingInterval) 286 { 287 if (logger_.isInfoEnabled()) 288 { 289 logger_.info("set PacingInterval=" + _pacingInterval); 290 } 291 pacingInterval_.set(_pacingInterval); 292 293 checkCronJob(); 294 295 return true; 296 } 297 } 298 return false; 299 } 300 301 private boolean configureMaxBatchSize() 302 { 303 if (qosSettings_.containsKey(MaximumBatchSize.value)) 304 { 305 int _maxBatchSize = qosSettings_.get(MaximumBatchSize.value).extract_long(); 306 307 if (maxBatchSize_.get() != _maxBatchSize) 308 { 309 if (logger_.isInfoEnabled()) 310 { 311 logger_.info("set MaxBatchSize=" + _maxBatchSize); 312 } 313 314 maxBatchSize_.set(_maxBatchSize); 315 316 return true; 317 } 318 } 319 320 return false; 321 } 322 323 public synchronized Servant getServant() 324 { 325 if (thisServant_ == null) 326 { 327 thisServant_ = new SequenceProxyPushSupplierPOATie(this); 328 } 329 330 return thisServant_; 331 } 332 333 protected long getCost() 334 { 335 return timeSpent_; 336 } 337 } | Popular Tags |