1 package org.jacorb.notification.servant; 2 3 23 24 import java.util.Collections ; 25 import java.util.Comparator ; 26 import java.util.Iterator ; 27 import java.util.List ; 28 import java.util.Map ; 29 30 import org.apache.avalon.framework.configuration.Configuration; 31 import org.jacorb.notification.MessageFactory; 32 import org.jacorb.notification.OfferManager; 33 import org.jacorb.notification.SubscriptionManager; 34 import org.jacorb.notification.container.CORBAObjectComponentAdapter; 35 import org.jacorb.notification.interfaces.Disposable; 36 import org.jacorb.notification.interfaces.FilterStage; 37 import org.jacorb.notification.interfaces.MessageConsumer; 38 import org.jacorb.notification.interfaces.ProxyEvent; 39 import org.jacorb.notification.interfaces.ProxyEventListener; 40 import org.omg.CORBA.BAD_PARAM ; 41 import org.omg.CORBA.IntHolder ; 42 import org.omg.CORBA.ORB ; 43 import org.omg.CORBA.UNKNOWN ; 44 import org.omg.CosEventChannelAdmin.ProxyPullSupplier; 45 import org.omg.CosEventChannelAdmin.ProxyPushSupplier; 46 import org.omg.CosNotification.EventType; 47 import org.omg.CosNotification.UnsupportedQoS; 48 import org.omg.CosNotifyChannelAdmin.AdminLimitExceeded; 49 import org.omg.CosNotifyChannelAdmin.ClientType; 50 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin; 51 import org.omg.CosNotifyChannelAdmin.ConsumerAdminHelper; 52 import org.omg.CosNotifyChannelAdmin.ConsumerAdminOperations; 53 import org.omg.CosNotifyChannelAdmin.ConsumerAdminPOATie; 54 import org.omg.CosNotifyChannelAdmin.ProxyNotFound; 55 import org.omg.CosNotifyChannelAdmin.ProxySupplier; 56 import org.omg.CosNotifyChannelAdmin.ProxySupplierHelper; 57 import org.omg.CosNotifyComm.InvalidEventType; 58 import org.omg.CosNotifyFilter.MappingFilter; 59 import org.omg.CosNotifyFilter.MappingFilterHelper; 60 import org.omg.PortableServer.POA ; 61 import org.omg.PortableServer.Servant ; 62 import org.picocontainer.MutablePicoContainer; 63 import org.picocontainer.defaults.CachingComponentAdapter; 64 65 69 70 public class ConsumerAdminImpl extends AbstractAdmin implements ConsumerAdminOperations, 71 Disposable, ProxyEventListener 72 { 73 private final static class FilterstageWithMessageConsumerComparator implements Comparator 74 { 75 78 public int compare(Object l, Object r) 79 { 80 FilterStage left = (FilterStage) l; 81 FilterStage right = (FilterStage) r; 82 83 return left.getMessageConsumer().compareTo(right.getMessageConsumer()); 84 } 85 } 86 87 static final FilterstageWithMessageConsumerComparator FILTERSTAGE_COMPARATOR = new FilterstageWithMessageConsumerComparator(); 88 89 private final ConsumerAdmin thisRef_; 90 91 protected final Servant thisServant_; 92 93 private final FilterStageListManager listManager_; 94 95 private MappingFilter priorityFilter_; 96 97 private MappingFilter lifetimeFilter_; 98 99 101 public ConsumerAdminImpl(IEventChannel channelServant, ORB orb, POA poa, Configuration config, 102 MessageFactory messageFactory, OfferManager offerManager, 103 SubscriptionManager subscriptionManager) 104 { 105 super(channelServant, orb, poa, config, messageFactory, offerManager, subscriptionManager); 106 107 109 listManager_ = new FilterStageListManager() 110 { 111 public void fetchListData(FilterStageListManager.List listProxy) 112 { 113 Iterator i = pullServants_.entrySet().iterator(); 114 115 while (i.hasNext()) 116 { 117 listProxy.add((FilterStage) ((Map.Entry ) i.next()).getValue()); 118 } 119 120 i = pushServants_.entrySet().iterator(); 121 122 while (i.hasNext()) 123 { 124 listProxy.add((FilterStage) ((Map.Entry ) i.next()).getValue()); 125 } 126 } 127 128 protected void sortCheckedList(java.util.List list) 129 { 130 Collections.sort(list, FILTERSTAGE_COMPARATOR); 131 } 132 }; 133 134 lifetimeFilter_ = MappingFilterHelper.unchecked_narrow(getORB().string_to_object( 135 getORB().object_to_string(null))); 136 137 priorityFilter_ = MappingFilterHelper.unchecked_narrow(getORB().string_to_object( 138 getORB().object_to_string(null))); 139 140 addProxyEventListener(this); 141 142 thisServant_ = createServant(); 143 144 thisRef_ = ConsumerAdminHelper.narrow(getServant()._this_object(getORB())); 145 146 container_.registerComponent(new CachingComponentAdapter(new CORBAObjectComponentAdapter( 147 ConsumerAdmin.class, thisRef_))); 148 149 addDisposeHook(new Disposable() 150 { 151 public void dispose() 152 { 153 container_.unregisterComponent(ConsumerAdmin.class); 154 } 155 }); 156 } 157 158 160 protected Servant createServant() 161 { 162 return new ConsumerAdminPOATie(this); 163 } 164 165 public final Servant getServant() 166 { 167 return thisServant_; 168 } 169 170 public org.omg.CORBA.Object activate() 171 { 172 return thisRef_; 173 } 174 175 public void subscription_change(EventType[] added, EventType[] removed) throws InvalidEventType 176 { 177 subscriptionManager_.subscription_change(added, removed); 178 } 179 180 public ProxySupplier get_proxy_supplier(int key) throws ProxyNotFound 181 { 182 return ProxySupplierHelper.narrow(getProxy(key).activate()); 183 } 184 185 public void lifetime_filter(MappingFilter lifetimeFilter) 186 { 187 lifetimeFilter_ = lifetimeFilter; 188 } 189 190 public MappingFilter lifetime_filter() 191 { 192 return lifetimeFilter_; 193 } 194 195 public MappingFilter priority_filter() 196 { 197 return priorityFilter_; 198 } 199 200 public void priority_filter(MappingFilter priorityFilter) 201 { 202 priorityFilter_ = priorityFilter; 203 } 204 205 public ProxySupplier obtain_notification_pull_supplier(ClientType clientType, 206 IntHolder intHolder) throws AdminLimitExceeded 207 { 208 fireCreateProxyRequestEvent(); 210 211 try 212 { 213 AbstractProxy _servant = obtain_notification_pull_supplier_servant(clientType); 214 215 intHolder.value = _servant.getID().intValue(); 216 217 return ProxySupplierHelper.narrow(_servant.activate()); 218 } catch (Exception e) 219 { 220 logger_.fatalError("obtain_notification_pull_supplier: unexpected error", e); 221 222 throw new UNKNOWN (); 223 } 224 } 225 226 protected void configureMappingFilters(AbstractProxySupplier servant) 227 { 228 if (lifetimeFilter_ != null) 229 { 230 servant.lifetime_filter(lifetimeFilter_); 231 } 232 233 if (priorityFilter_ != null) 234 { 235 servant.priority_filter(priorityFilter_); 236 } 237 } 238 239 private AbstractProxy obtain_notification_pull_supplier_servant(ClientType clientType) 240 throws UnsupportedQoS 241 { 242 AbstractProxySupplier _servant = newProxyPullSupplier(clientType); 243 244 configureMappingFilters(_servant); 245 246 configureQoS(_servant); 247 248 configureInterFilterGroupOperator(_servant); 249 250 addProxyToMap(_servant, pullServants_, modifyProxiesLock_); 251 252 return _servant; 253 } 254 255 public int[] pull_suppliers() 256 { 257 return get_all_notify_proxies(pullServants_, modifyProxiesLock_); 258 } 259 260 public int[] push_suppliers() 261 { 262 return get_all_notify_proxies(pushServants_, modifyProxiesLock_); 263 } 264 265 public ProxySupplier obtain_notification_push_supplier(ClientType clientType, 266 IntHolder intHolder) throws AdminLimitExceeded 267 { 268 fireCreateProxyRequestEvent(); 270 271 try 272 { 273 AbstractProxy _servant = obtain_notification_push_supplier_servant(clientType); 274 275 intHolder.value = _servant.getID().intValue(); 276 277 return ProxySupplierHelper.narrow(_servant.activate()); 278 } catch (Exception e) 279 { 280 logger_.fatalError("obtain_notification_push_supplier: unexpected error", e); 281 282 throw new UNKNOWN (); 283 } 284 } 285 286 private AbstractProxy obtain_notification_push_supplier_servant(ClientType clientType) 287 throws UnsupportedQoS 288 { 289 AbstractProxySupplier _servant = newProxyPushSupplier(clientType); 290 291 configureMappingFilters(_servant); 292 293 configureQoS(_servant); 294 295 configureInterFilterGroupOperator(_servant); 296 297 addProxyToMap(_servant, pushServants_, modifyProxiesLock_); 298 299 return _servant; 300 } 301 302 public ProxyPullSupplier obtain_pull_supplier() 303 { 304 try 305 { 306 MutablePicoContainer _container = newContainerForEventStyleProxy(); 307 308 _container.registerComponent(newComponentAdapter(ECProxyPullSupplierImpl.class, ECProxyPullSupplierImpl.class)); 309 310 ProxyPullSupplierImpl _servant = (ProxyPullSupplierImpl) _container 311 .getComponentInstance(ECProxyPullSupplierImpl.class); 312 313 configureQoS(_servant); 314 315 addProxyToMap(_servant, pullServants_, modifyProxiesLock_); 316 317 return org.omg.CosEventChannelAdmin.ProxyPullSupplierHelper.narrow(_servant.activate()); 318 } catch (Exception e) 319 { 320 logger_.fatalError("obtain_pull_supplier: exception", e); 321 322 throw new UNKNOWN (); 323 } 324 } 325 326 329 public ProxyPushSupplier obtain_push_supplier() 330 { 331 try 332 { 333 MutablePicoContainer _container = newContainerForEventStyleProxy(); 334 335 _container.registerComponent(newComponentAdapter(ECProxyPushSupplierImpl.class, ECProxyPushSupplierImpl.class)); 336 337 final ProxyPushSupplierImpl _servant = (ProxyPushSupplierImpl) _container 338 .getComponentInstance(ECProxyPushSupplierImpl.class); 339 340 configureQoS(_servant); 341 342 addProxyToMap(_servant, pushServants_, modifyProxiesLock_); 343 344 return org.omg.CosEventChannelAdmin.ProxyPushSupplierHelper.narrow(_servant.activate()); 345 } catch (Exception e) 346 { 347 logger_.fatalError("obtain_push_supplier: exception", e); 348 349 throw new UNKNOWN (); 350 } 351 } 352 353 public List getSubsequentFilterStages() 354 { 355 return listManager_.getList(); 356 } 357 358 361 public MessageConsumer getMessageConsumer() 362 { 363 return null; 364 } 365 366 369 public boolean hasMessageConsumer() 370 { 371 return false; 372 } 373 374 public void actionProxyCreationRequest(ProxyEvent event) 375 { 376 } 377 378 public void actionProxyDisposed(ProxyEvent event) 379 { 380 listManager_.actionSourceModified(); 381 } 382 383 public void actionProxyCreated(ProxyEvent event) 384 { 385 listManager_.actionSourceModified(); 386 } 387 388 391 AbstractProxySupplier newProxyPullSupplier(ClientType clientType) 392 { 393 final MutablePicoContainer _containerForProxy = newContainerForNotifyStyleProxy(); 394 final Class _proxyClass; 395 396 switch (clientType.value()) { 397 case ClientType._ANY_EVENT: 398 _proxyClass = ProxyPullSupplierImpl.class; 399 400 break; 401 case ClientType._STRUCTURED_EVENT: 402 _proxyClass = StructuredProxyPullSupplierImpl.class; 403 404 break; 405 case ClientType._SEQUENCE_EVENT: 406 _proxyClass = SequenceProxyPullSupplierImpl.class; 407 408 break; 409 default: 410 throw new BAD_PARAM (); 411 } 412 413 _containerForProxy 414 .registerComponent(newComponentAdapter(AbstractProxySupplier.class, _proxyClass)); 415 416 final AbstractProxySupplier _servant = (AbstractProxySupplier) _containerForProxy 417 .getComponentInstance(AbstractProxySupplier.class); 418 419 return _servant; 420 } 421 422 425 AbstractProxySupplier newProxyPushSupplier(ClientType clientType) 426 { 427 final Class _proxyClass; 428 429 switch (clientType.value()) { 430 431 case ClientType._ANY_EVENT: 432 _proxyClass = ProxyPushSupplierImpl.class; 433 break; 434 435 case ClientType._STRUCTURED_EVENT: 436 _proxyClass = StructuredProxyPushSupplierImpl.class; 437 break; 438 439 case ClientType._SEQUENCE_EVENT: 440 _proxyClass = SequenceProxyPushSupplierImpl.class; 441 break; 442 443 default: 444 throw new BAD_PARAM ("The ClientType: " + clientType.value() + " is unknown"); 445 } 446 447 final MutablePicoContainer _containerForProxy = newContainerForNotifyStyleProxy(); 448 449 _containerForProxy 450 .registerComponent(newComponentAdapter(AbstractProxySupplier.class, _proxyClass)); 451 452 final AbstractProxySupplier _servant = (AbstractProxySupplier) _containerForProxy 453 .getComponentInstance(AbstractProxySupplier.class); 454 455 return _servant; 456 } 457 } | Popular Tags |