1 package org.jacorb.notification.servant; 2 3 22 23 import org.apache.avalon.framework.configuration.Configuration; 24 import org.apache.avalon.framework.configuration.ConfigurationException; 25 import org.jacorb.notification.OfferManager; 26 import org.jacorb.notification.SubscriptionManager; 27 import org.jacorb.notification.conf.Attributes; 28 import org.jacorb.notification.conf.Default; 29 import org.jacorb.notification.engine.TaskProcessor; 30 import org.jacorb.notification.interfaces.Message; 31 import org.jacorb.notification.interfaces.MessageConsumer; 32 import org.jacorb.notification.queue.EventQueueFactory; 33 import org.jacorb.notification.queue.MessageQueueAdapter; 34 import org.jacorb.notification.queue.RWLockEventQueueDecorator; 35 import org.jacorb.notification.util.PropertySet; 36 import org.jacorb.notification.util.PropertySetAdapter; 37 import org.omg.CORBA.NO_IMPLEMENT ; 38 import org.omg.CORBA.ORB ; 39 import org.omg.CosNotification.DiscardPolicy; 40 import org.omg.CosNotification.EventType; 41 import org.omg.CosNotification.OrderPolicy; 42 import org.omg.CosNotification.UnsupportedQoS; 43 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin; 44 import org.omg.CosNotifyChannelAdmin.ObtainInfoMode; 45 import org.omg.CosNotifyComm.InvalidEventType; 46 import org.omg.CosNotifyComm.NotifyPublish; 47 import org.omg.CosNotifyComm.NotifyPublishHelper; 48 import org.omg.CosNotifyComm.NotifyPublishOperations; 49 import org.omg.CosNotifyComm.NotifySubscribeOperations; 50 import org.omg.PortableServer.POA ; 51 52 63 64 public abstract class AbstractProxySupplier extends AbstractProxy implements MessageConsumer, 65 NotifySubscribeOperations 66 { 67 private static final Runnable EMPTY_RUNNABLE = new Runnable () 68 { 69 public void run() 70 { 71 } 73 }; 74 75 private static final EventType[] EMPTY_EVENT_TYPE_ARRAY = new EventType[0]; 76 77 private static final Message[] EMPTY_MESSAGE = new Message[0]; 78 79 81 private final RWLockEventQueueDecorator pendingMessages_; 82 83 private final int errorThreshold_; 84 85 private final ConsumerAdmin consumerAdmin_; 86 87 private final EventQueueFactory eventQueueFactory_; 88 89 private NotifyPublishOperations proxyOfferListener_; 90 91 private NotifyPublish offerListener_; 92 93 95 protected AbstractProxySupplier(IAdmin admin, ORB orb, POA poa, Configuration conf, 96 TaskProcessor taskProcessor, OfferManager offerManager, SubscriptionManager subscriptionManager, 97 ConsumerAdmin consumerAdmin) 98 throws ConfigurationException 99 { 100 super(admin, orb, poa, conf, taskProcessor, offerManager, subscriptionManager); 101 102 consumerAdmin_ = consumerAdmin; 103 104 eventQueueFactory_ = new EventQueueFactory(conf); 105 106 errorThreshold_ = conf.getAttributeAsInteger(Attributes.EVENTCONSUMER_ERROR_THRESHOLD, 107 Default.DEFAULT_EVENTCONSUMER_ERROR_THRESHOLD); 108 109 if (logger_.isInfoEnabled()) 110 { 111 logger_.info("set Error Threshold to : " + errorThreshold_); 112 } 113 114 qosSettings_.addPropertySetListener( 115 new String [] { OrderPolicy.value, DiscardPolicy.value }, 116 eventQueueConfigurationChangedCB); 117 118 try 119 { 120 MessageQueueAdapter initialEventQueue = getMessageQueueFactory().newMessageQueue( 121 qosSettings_); 122 123 pendingMessages_ = new RWLockEventQueueDecorator(initialEventQueue); 124 } catch (InterruptedException e) 125 { 126 throw new RuntimeException (); 127 } 128 } 129 130 132 protected EventQueueFactory getMessageQueueFactory() 133 { 134 return eventQueueFactory_; 135 } 136 137 141 private final void configureEventQueue() { 143 MessageQueueAdapter _newQueue = getMessageQueueFactory().newMessageQueue(qosSettings_); 144 145 try 146 { 147 pendingMessages_.replaceDelegate(_newQueue); 148 } catch (InterruptedException e) 149 { 150 } 152 } 153 154 private PropertySetAdapter eventQueueConfigurationChangedCB = new PropertySetAdapter() 155 { 156 public void actionPropertySetChanged(PropertySet source) throws UnsupportedQoS 157 { 158 configureEventQueue(); 159 } 160 }; 161 162 163 164 public int getPendingMessagesCount() 165 { 166 try 167 { 168 return pendingMessages_.getPendingMessagesCount(); 169 } catch (InterruptedException e) 170 { 171 return -1; 172 } 173 } 174 175 public boolean hasPendingData() 176 { 177 try 178 { 179 return pendingMessages_.hasPendingMessages(); 180 } catch (InterruptedException e) 181 { 182 return false; 183 } 184 } 185 186 192 protected void enqueue(Message message) 193 { 194 Message _copy = (Message) message.clone(); 195 196 try 197 { 198 pendingMessages_.enqeue(message); 199 200 if (logger_.isDebugEnabled()) 201 { 202 logger_.debug("added " + message + " to pending Messages."); 203 } 204 } catch (InterruptedException e) 205 { 206 _copy.dispose(); 207 logger_.info("enqueue was interrupted", e); 208 } 209 } 210 211 public Message getMessageBlocking() throws InterruptedException 212 { 213 return pendingMessages_.getMessageBlocking(); 214 } 215 216 protected Message getMessageNoBlock() 217 { 218 try 219 { 220 return pendingMessages_.getMessageNoBlock(); 221 } catch (InterruptedException e) 222 { 223 Thread.currentThread().interrupt(); 224 225 return null; 226 } 227 } 228 229 protected Message[] getAllMessages() 230 { 231 try 232 { 233 return pendingMessages_.getAllMessages(); 234 } catch (InterruptedException e) 235 { 236 Thread.currentThread().interrupt(); 237 238 return EMPTY_MESSAGE; 239 } 240 } 241 242 public void deliverMessage(final Message message) 243 { 244 if (logger_.isDebugEnabled()) 245 { 246 logger_.debug("deliverMessage() connected=" + isConnected() + " suspended=" 247 + isSuspended()); 248 } 249 250 if (isConnected()) 251 { 252 enqueue(message); 253 254 messageDelivered(); 255 } 256 } 257 258 261 protected void messageDelivered() 262 { 263 } 265 266 271 protected Message[] getUpToMessages(int max) 272 { 273 try 274 { 275 return pendingMessages_.getUpToMessages(max); 276 } catch (InterruptedException e) 277 { 278 Thread.currentThread().interrupt(); 279 280 return EMPTY_MESSAGE; 281 } 282 } 283 284 289 protected Message[] getAtLeastMessages(int min) 290 { 291 try 292 { 293 return pendingMessages_.getAtLeastMessages(min); 294 } catch (InterruptedException e) 295 { 296 Thread.currentThread().interrupt(); 297 298 return EMPTY_MESSAGE; 299 } 300 } 301 302 public int getErrorThreshold() 303 { 304 return errorThreshold_; 305 } 306 307 public final void dispose() 308 { 309 super.dispose(); 310 311 pendingMessages_.clear(); 312 313 getTaskProcessor().executeTaskAfterDelay(1000, EMPTY_RUNNABLE); 316 } 317 318 public final ConsumerAdmin MyAdmin() 319 { 320 return consumerAdmin_; 321 } 322 323 public final void subscription_change(EventType[] added, EventType[] removed) 324 throws InvalidEventType 325 { 326 subscriptionManager_.subscription_change(added, removed); 327 } 328 329 public final EventType[] obtain_offered_types(ObtainInfoMode obtainInfoMode) 330 { 331 EventType[] _offeredTypes = EMPTY_EVENT_TYPE_ARRAY; 332 333 switch (obtainInfoMode.value()) { 334 case ObtainInfoMode._ALL_NOW_UPDATES_ON: 335 registerListener(); 336 _offeredTypes = offerManager_.obtain_offered_types(); 337 break; 338 case ObtainInfoMode._ALL_NOW_UPDATES_OFF: 339 _offeredTypes = offerManager_.obtain_offered_types(); 340 removeListener(); 341 break; 342 case ObtainInfoMode._NONE_NOW_UPDATES_ON: 343 registerListener(); 344 break; 345 case ObtainInfoMode._NONE_NOW_UPDATES_OFF: 346 removeListener(); 347 break; 348 default: 349 throw new IllegalArgumentException ("Illegal ObtainInfoMode"); 350 } 351 352 return _offeredTypes; 353 } 354 355 private void registerListener() 356 { 357 if (proxyOfferListener_ == null) 358 { 359 final NotifyPublishOperations _listener = getOfferListener(); 360 361 if (_listener != null) 362 { 363 proxyOfferListener_ = new NotifyPublishOperations() 364 { 365 public void offer_change(EventType[] added, EventType[] removed) 366 { 367 try 368 { 369 _listener.offer_change(added, removed); 370 } catch (NO_IMPLEMENT e) 371 { 372 logger_.info("disable offer_change for connected Consumer.", e); 373 374 removeListener(); 375 } catch (InvalidEventType e) 376 { 377 logger_.error("invalid event type", e); 378 } catch (Exception e) 379 { 380 logger_.error("offer_change failed", e); 381 } 382 } 383 }; 384 385 offerManager_.addListener(proxyOfferListener_); 386 } 387 } 388 } 389 390 protected void removeListener() 391 { 392 if (proxyOfferListener_ != null) 393 { 394 offerManager_.removeListener(proxyOfferListener_); 395 proxyOfferListener_ = null; 396 } 397 } 398 399 final NotifyPublishOperations getOfferListener() 400 { 401 return offerListener_; 402 } 403 404 public void connectClient(org.omg.CORBA.Object client) 405 { 406 super.connectClient(client); 407 408 try 409 { 410 offerListener_ = NotifyPublishHelper.narrow(client); 411 412 logger_.debug("successfully narrowed connecting Client to IF NotifyPublish"); 413 } catch (Throwable t) 414 { 415 logger_.info("disable offer_change for connecting Consumer"); 416 } 417 } 418 419 public boolean isRetryAllowed() 420 { 421 return !isDisposed() && getErrorCounter() < getErrorThreshold(); 422 } 423 424 protected abstract long getCost(); 425 426 public int compareTo(Object o) 427 { 428 AbstractProxySupplier other = (AbstractProxySupplier) o; 429 430 return (int) (getCost() - other.getCost()); 431 } 432 433 public final boolean hasMessageConsumer() 434 { 435 return true; 436 } 437 } | Popular Tags |