1 package org.jacorb.notification.servant; 2 3 23 24 import java.lang.ref.WeakReference ; 25 import java.util.ArrayList ; 26 import java.util.HashMap ; 27 import java.util.Iterator ; 28 import java.util.List ; 29 import java.util.Map ; 30 31 import org.apache.avalon.framework.configuration.Configuration; 32 import org.apache.avalon.framework.logger.Logger; 33 import org.jacorb.notification.FilterManager; 34 import org.jacorb.notification.IContainer; 35 import org.jacorb.notification.MessageFactory; 36 import org.jacorb.notification.OfferManager; 37 import org.jacorb.notification.SubscriptionManager; 38 import org.jacorb.notification.container.PicoContainerFactory; 39 import org.jacorb.notification.interfaces.Disposable; 40 import org.jacorb.notification.interfaces.FilterStage; 41 import org.jacorb.notification.interfaces.ProxyEvent; 42 import org.jacorb.notification.interfaces.ProxyEventListener; 43 import org.jacorb.notification.util.DisposableManager; 44 import org.jacorb.notification.util.QoSPropertySet; 45 import org.omg.CORBA.OBJECT_NOT_EXIST ; 46 import org.omg.CORBA.ORB ; 47 import org.omg.CosNotification.NamedPropertyRangeSeqHolder; 48 import org.omg.CosNotification.Property; 49 import org.omg.CosNotification.QoSAdminOperations; 50 import org.omg.CosNotification.UnsupportedQoS; 51 import org.omg.CosNotifyChannelAdmin.AdminLimitExceeded; 52 import org.omg.CosNotifyChannelAdmin.EventChannel; 53 import org.omg.CosNotifyChannelAdmin.InterFilterGroupOperator; 54 import org.omg.CosNotifyChannelAdmin.ProxyNotFound; 55 import org.omg.CosNotifyFilter.Filter; 56 import org.omg.CosNotifyFilter.FilterAdminOperations; 57 import org.omg.CosNotifyFilter.FilterNotFound; 58 import org.omg.CosNotifyFilter.MappingFilter; 59 import org.omg.PortableServer.POA ; 60 import org.omg.PortableServer.Servant ; 61 import org.picocontainer.ComponentAdapter; 62 import org.picocontainer.MutablePicoContainer; 63 import org.picocontainer.defaults.CachingComponentAdapter; 64 import org.picocontainer.defaults.ComponentAdapterFactory; 65 66 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 67 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 68 69 75 76 public abstract class AbstractAdmin implements QoSAdminOperations, 77 FilterAdminOperations, FilterStage, ManageableServant 78 { 79 private static final class ITypedAdminImpl implements ITypedAdmin 80 { 81 private final IAdmin admin_; 82 83 private final MutablePicoContainer container_; 84 85 private final String supportedInterface_; 86 87 private ITypedAdminImpl(IAdmin admin, MutablePicoContainer container, 88 String supportedInterface) 89 { 90 super(); 91 92 admin_ = admin; 93 container_ = container; 94 supportedInterface_ = supportedInterface; 95 } 96 97 public String getSupportedInterface() 98 { 99 return supportedInterface_; 100 } 101 102 public int getProxyID() 103 { 104 return admin_.getProxyID(); 105 } 106 107 public boolean isIDPublic() 108 { 109 return admin_.isIDPublic(); 110 } 111 112 public MutablePicoContainer getContainer() 113 { 114 return admin_.getContainer(); 115 } 116 117 public void destroy() 118 { 119 container_.unregisterComponent(ITypedAdmin.class); 120 admin_.destroy(); 121 } 122 } 123 124 127 protected static final InterFilterGroupOperator DEFAULT_FILTER_GROUP_OPERATOR = InterFilterGroupOperator.AND_OP; 128 129 131 private final DisposableManager disposables_ = new DisposableManager(); 132 133 private final Integer id_; 134 135 private InterFilterGroupOperator filterGroupOperator_; 136 137 protected final MutablePicoContainer container_; 138 139 private final FilterManager filterManager_; 140 141 private final WeakReference eventChannelReference_; 142 143 private final QoSPropertySet qosSettings_; 144 145 private final MessageFactory messageFactory_; 146 147 protected final OfferManager offerManager_; 148 149 protected final SubscriptionManager subscriptionManager_; 150 151 protected final Logger logger_; 152 153 private final ORB orb_; 154 155 private final POA poa_; 156 157 protected final Object modifyProxiesLock_ = new Object (); 158 159 protected final Map pullServants_ = new HashMap (); 160 161 protected final Map pushServants_ = new HashMap (); 162 163 private final SynchronizedInt proxyIdPool_ = new SynchronizedInt(0); 164 165 private final SynchronizedBoolean disposed_ = new SynchronizedBoolean(false); 166 167 private final List proxyEventListener_ = new ArrayList (); 168 169 public final int channelID_; 170 171 private final ComponentAdapterFactory componentAdapterFactory_; 172 173 175 protected AbstractAdmin(IEventChannel channel, ORB orb, POA poa, Configuration config, 176 MessageFactory messageFactory, OfferManager offerManager, 177 SubscriptionManager subscriptionManager) 178 { 179 container_ = channel.getContainer(); 180 181 id_ = new Integer (channel.getAdminID()); 182 183 orb_ = orb; 184 poa_ = poa; 185 messageFactory_ = messageFactory; 186 filterManager_ = new FilterManager(); 187 188 eventChannelReference_ = new WeakReference (channel.getEventChannel()); 189 190 channelID_ = channel.getChannelID(); 191 192 logger_ = ((org.jacorb.config.Configuration) config).getNamedLogger(getClass().getName()); 193 194 qosSettings_ = new QoSPropertySet(config, QoSPropertySet.ADMIN_QOS); 195 196 offerManager_ = offerManager; 197 198 subscriptionManager_ = subscriptionManager; 199 200 componentAdapterFactory_ = (ComponentAdapterFactory) container_ 201 .getComponentInstance(ComponentAdapterFactory.class); 202 } 203 204 public final void addDisposeHook(Disposable d) 205 { 206 disposables_.addDisposable(d); 207 } 208 209 public void setInterFilterGroupOperator(InterFilterGroupOperator op) 210 { 211 filterGroupOperator_ = op; 212 } 213 214 protected POA getPOA() 215 { 216 return poa_; 217 } 218 219 protected ORB getORB() 220 { 221 return orb_; 222 } 223 224 protected MessageFactory getMessageFactory() 225 { 226 return messageFactory_; 227 } 228 229 public POA _default_POA() 230 { 231 return getPOA(); 232 } 233 234 int getProxyID() 235 { 236 return proxyIdPool_.increment(); 237 } 238 239 public List getFilters() 240 { 241 return filterManager_.getFilters(); 242 } 243 244 public int add_filter(Filter aFilter) 245 { 246 return filterManager_.add_filter(aFilter); 247 } 248 249 public void remove_filter(int aFilterId) throws FilterNotFound 250 { 251 filterManager_.remove_filter(aFilterId); 252 } 253 254 public Filter get_filter(int aFilterId) throws FilterNotFound 255 { 256 return filterManager_.get_filter(aFilterId); 257 } 258 259 public int[] get_all_filters() 260 { 261 return filterManager_.get_all_filters(); 262 } 263 264 public void remove_all_filters() 265 { 266 filterManager_.remove_all_filters(); 267 } 268 269 public final InterFilterGroupOperator MyOperator() 270 { 271 return filterGroupOperator_; 272 } 273 274 public final EventChannel MyChannel() 275 { 276 return (EventChannel) eventChannelReference_.get(); 277 } 278 279 public final int MyID() 280 { 281 return getID().intValue(); 282 } 283 284 public Property[] get_qos() 285 { 286 return qosSettings_.get_qos(); 287 } 288 289 public void set_qos(Property[] props) throws UnsupportedQoS 290 { 291 qosSettings_.validate_qos(props, new NamedPropertyRangeSeqHolder()); 292 293 qosSettings_.set_qos(props); 294 } 295 296 public void validate_qos(Property[] props, NamedPropertyRangeSeqHolder propertyRangeSeqHolder) 297 throws UnsupportedQoS 298 { 299 qosSettings_.validate_qos(props, propertyRangeSeqHolder); 300 } 301 302 public void destroy() 303 { 304 checkDestroyStatus(); 305 306 container_.dispose(); 307 308 List list = container_.getComponentInstancesOfType(IContainer.class); 309 310 for (Iterator i = list.iterator(); i.hasNext();) 311 { 312 IContainer element = (IContainer) i.next(); 313 element.destroy(); 314 } 315 } 316 317 private void checkDestroyStatus() throws OBJECT_NOT_EXIST 318 { 319 if (!disposed_.commit(false, true)) 320 { 321 throw new OBJECT_NOT_EXIST (); 322 } 323 } 324 325 public void dispose() 326 { 327 logger_.info("destroy Admin " + MyID()); 328 329 331 deactivate(); 332 333 335 remove_all_filters(); 336 337 339 disposables_.dispose(); 340 341 proxyEventListener_.clear(); 342 } 343 344 public void deactivate() 345 { 346 if (logger_.isDebugEnabled()) 347 { 348 logger_.debug("deactivate Admin: " + getID()); 349 } 350 351 try 352 { 353 byte[] _oid = getPOA().servant_to_id(getServant()); 354 getPOA().deactivate_object(_oid); 355 } catch (Exception e) 356 { 357 logger_.error("Couldn't deactivate Admin", e); 358 } 359 } 360 361 public abstract Servant getServant(); 362 363 public Integer getID() 364 { 365 return id_; 366 } 367 368 public boolean isDisposed() 369 { 370 return disposed_.get(); 371 } 372 373 protected void fireCreateProxyRequestEvent() throws AdminLimitExceeded 374 { 375 synchronized (proxyEventListener_) 376 { 377 ProxyEvent _event = new ProxyEvent(this); 378 379 Iterator _i = proxyEventListener_.iterator(); 380 381 while (_i.hasNext()) 382 { 383 ProxyEventListener _listener; 384 _listener = (ProxyEventListener) _i.next(); 385 _listener.actionProxyCreationRequest(_event); 386 } 387 } 388 } 389 390 393 public boolean hasLifetimeFilter() 394 { 395 return false; 396 } 397 398 401 public boolean hasPriorityFilter() 402 { 403 return false; 404 } 405 406 409 public MappingFilter getLifetimeFilter() 410 { 411 throw new UnsupportedOperationException (); 412 } 413 414 417 public MappingFilter getPriorityFilter() 418 { 419 throw new UnsupportedOperationException (); 420 } 421 422 public boolean hasInterFilterGroupOperatorOR() 423 { 424 return (filterGroupOperator_ != null && (filterGroupOperator_.value() == InterFilterGroupOperator._OR_OP)); 425 } 426 427 431 protected AbstractProxy getProxy(int id) throws ProxyNotFound 432 { 433 Integer _id = new Integer (id); 434 435 AbstractProxy _servant = null; 436 437 synchronized (modifyProxiesLock_) 438 { 439 _servant = (AbstractProxy) pullServants_.get(_id); 440 441 if (_servant == null) 442 { 443 _servant = (AbstractProxy) pushServants_.get(_id); 444 } 445 } 446 447 if (_servant == null) 448 { 449 throw new ProxyNotFound("The proxy with ID=" + id + " does not exist"); 450 } 451 452 if (!_servant.isIDPublic()) 453 { 454 throw new ProxyNotFound("The proxy with ID=" + id 455 + " is a EventStyle proxy and therefor not accessible by ID"); 456 } 457 458 return _servant; 459 } 460 461 464 protected int[] get_all_notify_proxies(Map map, Object lock) 465 { 466 List _allIDsList = new ArrayList (); 467 468 synchronized (lock) 469 { 470 Iterator _i = map.entrySet().iterator(); 471 472 while (_i.hasNext()) 473 { 474 Map.Entry _entry = (Map.Entry ) _i.next(); 475 476 if (((AbstractProxy) _entry.getValue()).isIDPublic()) 477 { 478 _allIDsList.add(_entry.getKey()); 479 } 480 } 481 } 482 483 int[] _allIDsArray = new int[_allIDsList.size()]; 484 485 for (int x = 0; x < _allIDsArray.length; ++x) 486 { 487 _allIDsArray[x] = ((Integer ) _allIDsList.get(x)).intValue(); 488 } 489 490 return _allIDsArray; 491 } 492 493 496 protected void configureQoS(AbstractProxy proxy) throws UnsupportedQoS 497 { 498 proxy.set_qos(qosSettings_.get_qos()); 499 } 500 501 504 protected void configureInterFilterGroupOperator(AbstractProxy proxy) 505 { 506 if (filterGroupOperator_ != null 507 && (filterGroupOperator_.value() == InterFilterGroupOperator._OR_OP)) 508 { 509 proxy.setInterFilterGroupOperatorOR(true); 510 } 511 } 512 513 public void addProxyEventListener(ProxyEventListener l) 514 { 515 synchronized (proxyEventListener_) 516 { 517 proxyEventListener_.add(l); 518 } 519 } 520 521 public void removeProxyEventListener(ProxyEventListener listener) 522 { 523 synchronized (proxyEventListener_) 524 { 525 proxyEventListener_.remove(listener); 526 } 527 } 528 529 void fireProxyRemoved(AbstractProxy proxy) 530 { 531 synchronized (proxyEventListener_) 532 { 533 Iterator i = proxyEventListener_.iterator(); 534 ProxyEvent e = new ProxyEvent(proxy); 535 536 while (i.hasNext()) 537 { 538 ((ProxyEventListener) i.next()).actionProxyDisposed(e); 539 } 540 } 541 } 542 543 private void fireProxyCreated(AbstractProxy proxy) 544 { 545 synchronized (proxyEventListener_) 546 { 547 Iterator i = proxyEventListener_.iterator(); 548 ProxyEvent e = new ProxyEvent(proxy); 549 550 while (i.hasNext()) 551 { 552 ((ProxyEventListener) i.next()).actionProxyCreated(e); 553 } 554 } 555 } 556 557 protected void addProxyToMap(final AbstractProxy proxy, final Map map, final Object lock) 558 { 559 synchronized (lock) 560 { 561 map.put(proxy.getID(), proxy); 562 fireProxyCreated(proxy); 563 } 564 565 proxy.addDisposeHook(new Disposable() 568 { 569 public void dispose() 570 { 571 synchronized (lock) 572 { 573 map.remove(proxy.getID()); 574 575 fireProxyRemoved(proxy); 576 } 577 } 578 }); 579 } 580 581 public final List getProxies() 582 { 583 List _list = new ArrayList (); 584 585 synchronized (modifyProxiesLock_) 586 { 587 _list.addAll(pullServants_.values()); 588 _list.addAll(pushServants_.values()); 589 } 590 591 return _list; 592 } 593 594 protected MutablePicoContainer newContainerForNotifyStyleProxy() 595 { 596 return newContainerForProxy(true); 597 } 598 599 protected MutablePicoContainer newContainerForEventStyleProxy() 600 { 601 return newContainerForProxy(false); 602 } 603 604 protected MutablePicoContainer newContainerForTypedProxy(final String supportedInterface) 605 { 606 final MutablePicoContainer _container = newContainerForNotifyStyleProxy(); 607 608 final IAdmin _admin = (IAdmin) _container.getComponentInstance(IAdmin.class); 609 610 ITypedAdmin _typedAdmin = new ITypedAdminImpl(_admin, _container, supportedInterface); 611 612 _container.registerComponentInstance(ITypedAdmin.class, _typedAdmin); 613 614 return _container; 615 } 616 617 private MutablePicoContainer newContainerForProxy(boolean isIDPublic) 618 { 619 int proxyID = getProxyID(); 620 621 return newContainerForProxy(proxyID, isIDPublic); 622 } 623 624 private MutablePicoContainer newContainerForProxy(final int proxyID, final boolean isIDPublic) 625 { 626 final MutablePicoContainer _containerForProxy = PicoContainerFactory 627 .createChildContainer(container_); 628 629 final IAdmin _admin = new IAdmin() 630 { 631 public MutablePicoContainer getContainer() 632 { 633 return _containerForProxy; 634 } 635 636 public int getProxyID() 637 { 638 return proxyID; 639 } 640 641 public boolean isIDPublic() 642 { 643 return isIDPublic; 644 } 645 646 public void destroy() 647 { 648 container_.removeChildContainer(_containerForProxy); 649 } 650 }; 651 652 _containerForProxy.registerComponentInstance(IAdmin.class, _admin); 653 654 return _containerForProxy; 655 } 656 657 protected ComponentAdapter newComponentAdapter(Object key, Class implementation) 658 { 659 return new CachingComponentAdapter(componentAdapterFactory_.createComponentAdapter(key, 660 implementation, null)); 661 } 662 } | Popular Tags |