1 package org.jacorb.notification.servant; 2 3 22 23 import java.util.List ; 24 25 import org.apache.avalon.framework.configuration.Configuration; 26 import org.jacorb.notification.EventTypeWrapper; 27 import org.jacorb.notification.MessageFactory; 28 import org.jacorb.notification.OfferManager; 29 import org.jacorb.notification.SubscriptionManager; 30 import org.jacorb.notification.conf.Default; 31 import org.jacorb.notification.engine.TaskProcessor; 32 import org.jacorb.notification.interfaces.FilterStage; 33 import org.jacorb.notification.interfaces.Message; 34 import org.jacorb.notification.interfaces.MessageConsumer; 35 import org.jacorb.notification.interfaces.MessageSupplier; 36 import org.jacorb.notification.util.PropertySet; 37 import org.jacorb.notification.util.PropertySetAdapter; 38 import org.omg.CORBA.NO_IMPLEMENT ; 39 import org.omg.CORBA.ORB ; 40 import org.omg.CosNotification.EventType; 41 import org.omg.CosNotification.Priority; 42 import org.omg.CosNotification.StartTimeSupported; 43 import org.omg.CosNotification.StopTimeSupported; 44 import org.omg.CosNotification.Timeout; 45 import org.omg.CosNotifyChannelAdmin.ObtainInfoMode; 46 import org.omg.CosNotifyChannelAdmin.SupplierAdmin; 47 import org.omg.CosNotifyComm.InvalidEventType; 48 import org.omg.CosNotifyComm.NotifyPublishOperations; 49 import org.omg.CosNotifyComm.NotifySubscribe; 50 import org.omg.CosNotifyComm.NotifySubscribeHelper; 51 import org.omg.CosNotifyComm.NotifySubscribeOperations; 52 import org.omg.PortableServer.POA ; 53 54 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 55 56 60 61 abstract class AbstractProxyConsumer extends AbstractProxy implements AbstractProxyConsumerI, 62 NotifyPublishOperations 63 { 64 private final static EventType[] EMPTY_EVENT_TYPE_ARRAY = new EventType[0]; 65 66 68 private final MessageFactory messageFactory_; 69 70 private final SynchronizedBoolean isStartTimeSupported_ = new SynchronizedBoolean(true); 71 72 private final SynchronizedBoolean isStopTimeSupported_ = new SynchronizedBoolean(true); 73 74 private List subsequentDestinations_; 75 76 private NotifySubscribeOperations proxySubscriptionListener_; 77 78 private NotifySubscribe subscriptionListener_; 79 80 protected final SupplierAdmin supplierAdmin_; 81 82 84 protected AbstractProxyConsumer(IAdmin admin, ORB orb, POA poa, Configuration conf, 85 TaskProcessor taskProcessor, MessageFactory messageFactory, 86 SupplierAdmin supplierAdmin, OfferManager offerManager, 87 SubscriptionManager subscriptionManager) 88 { 89 super(admin, orb, poa, conf, taskProcessor, offerManager, subscriptionManager); 90 91 supplierAdmin_ = supplierAdmin; 92 messageFactory_ = messageFactory; 93 94 configureStartTimeSupported(); 95 96 configureStopTimeSupported(); 97 98 qosSettings_.addPropertySetListener(new String [] { Priority.value, Timeout.value, 99 StartTimeSupported.value, StopTimeSupported.value }, reconfigureQoS_); 100 } 101 102 protected MessageFactory getMessageFactory() 103 { 104 return messageFactory_; 105 } 106 107 public final List getSubsequentFilterStages() 108 { 109 return subsequentDestinations_; 110 } 111 112 public void setSubsequentDestinations(List list) 113 { 114 subsequentDestinations_ = list; 115 } 116 117 private PropertySetAdapter reconfigureQoS_ = new PropertySetAdapter() 118 { 119 public void actionPropertySetChanged(PropertySet source) 120 { 121 configureStartTimeSupported(); 122 123 configureStopTimeSupported(); 124 } 125 }; 126 127 private void configureStartTimeSupported() 128 { 129 try 130 { 131 isStartTimeSupported_.set(qosSettings_.get(StartTimeSupported.value).extract_boolean()); 132 } catch (Exception e) 133 { 134 isStartTimeSupported_.set(Default.DEFAULT_START_TIME_SUPPORTED.equals("on")); 135 } 136 137 if (logger_.isInfoEnabled()) 138 { 139 logger_.info("set QoS: StartTimeSupported=" + isStartTimeSupported_); 140 } 141 } 142 143 private void configureStopTimeSupported() 144 { 145 try 146 { 147 isStopTimeSupported_.set(qosSettings_.get(StopTimeSupported.value).extract_boolean()); 148 } catch (Exception e) 149 { 150 isStopTimeSupported_.set(Default.DEFAULT_STOP_TIME_SUPPORTED.equals("on")); 151 } 152 153 if (logger_.isInfoEnabled()) 154 { 155 logger_.info("set QoS: StopTimeSupported=" + isStopTimeSupported_); 156 } 157 } 158 159 protected void schedulePullTask(MessageSupplier target) 160 { 161 try 162 { 163 getTaskProcessor().scheduleTimedPullTask(target); 164 } catch (InterruptedException e) 165 { 166 logger_.info("interrupt during schedule pull for MessageSupplier", e); 167 } 168 } 169 170 173 protected void checkMessageProperties(Message m) 174 { 175 } 178 179 public FilterStage getFirstStage() 180 { 181 return this; 182 } 183 184 public boolean isTimeOutSupported() 185 { 186 return isStopTimeSupported_.get(); 187 } 188 189 public boolean isStartTimeSupported() 190 { 191 return isStartTimeSupported_.get(); 192 } 193 194 public final SupplierAdmin MyAdmin() 195 { 196 return supplierAdmin_; 197 } 198 199 public final MessageConsumer getMessageConsumer() 200 { 201 throw new UnsupportedOperationException (); 202 } 203 204 public final boolean hasMessageConsumer() 205 { 206 return false; 207 } 208 209 public void offer_change(EventType[] added, EventType[] removed) throws InvalidEventType 210 { 211 offerManager_.offer_change(added, removed); 212 } 213 214 public final EventType[] obtain_subscription_types(ObtainInfoMode obtainInfoMode) 215 { 216 final EventType[] _subscriptionTypes; 217 218 switch (obtainInfoMode.value()) { 219 case ObtainInfoMode._ALL_NOW_UPDATES_ON: 220 224 registerListener(); 225 226 _subscriptionTypes = subscriptionManager_.obtain_subscription_types(); 227 break; 228 case ObtainInfoMode._ALL_NOW_UPDATES_OFF: 229 _subscriptionTypes = subscriptionManager_.obtain_subscription_types(); 230 231 removeListener(); 232 break; 233 case ObtainInfoMode._NONE_NOW_UPDATES_ON: 234 _subscriptionTypes = EMPTY_EVENT_TYPE_ARRAY; 235 236 registerListener(); 237 break; 238 case ObtainInfoMode._NONE_NOW_UPDATES_OFF: 239 _subscriptionTypes = EMPTY_EVENT_TYPE_ARRAY; 240 241 removeListener(); 242 break; 243 default: 244 throw new IllegalArgumentException ("Illegal ObtainInfoMode: ObtainInfoMode." 245 + obtainInfoMode.value()); 246 } 247 248 return _subscriptionTypes; 249 } 250 251 private void registerListener() 252 { 253 if (proxySubscriptionListener_ == null) 254 { 255 final NotifySubscribeOperations _listener = getSubscriptionListener(); 256 257 if (_listener != null) 258 { 259 proxySubscriptionListener_ = new NotifySubscribeOperations() 260 { 261 public void subscription_change(EventType[] added, EventType[] removed) 262 { 263 try 264 { 265 _listener.subscription_change(added, removed); 266 } catch (NO_IMPLEMENT e) 267 { 268 logger_.info("disable subscription_change for Supplier", e); 269 270 removeListener(); 271 } catch (InvalidEventType e) 272 { 273 if (logger_.isDebugEnabled()) 274 { 275 logger_.debug("subscription_change(" 276 + EventTypeWrapper.toString(added) + ", " 277 + EventTypeWrapper.toString(removed) + ") failed", e); 278 } 279 else 280 { 281 logger_.error("invalid event type", e); 282 } 283 } catch (Exception e) 284 { 285 logger_.error("subscription change failed", e); 286 } 287 } 288 }; 289 subscriptionManager_.addListener(proxySubscriptionListener_); 290 } 291 } 292 } 293 294 297 protected void removeListener() 298 { 299 if (proxySubscriptionListener_ != null) 300 { 301 subscriptionManager_.removeListener(proxySubscriptionListener_); 302 303 proxySubscriptionListener_ = null; 304 } 305 } 306 307 protected void connectClient(org.omg.CORBA.Object client) 308 { 309 super.connectClient(client); 310 311 try 312 { 313 subscriptionListener_ = NotifySubscribeHelper.narrow(client); 314 315 logger_.debug("successfully narrowed connecting Supplier to NotifySubscribe"); 316 } catch (Throwable t) 317 { 318 logger_.info("connecting Supplier does not support subscription_change"); 319 } 320 } 321 322 final NotifySubscribeOperations getSubscriptionListener() 323 { 324 return subscriptionListener_; 325 } 326 327 protected void processMessage(Message mesg) 328 { 329 getTaskProcessor().processMessage(mesg); 330 } 331 } | Popular Tags |