1 package org.jacorb.notification; 2 3 22 23 import java.lang.ref.WeakReference ; 24 import java.util.HashMap ; 25 import java.util.Iterator ; 26 import java.util.List ; 27 import java.util.Map ; 28 29 import org.apache.avalon.framework.configuration.Configuration; 30 import org.apache.avalon.framework.logger.Logger; 31 import org.jacorb.notification.interfaces.Disposable; 32 import org.jacorb.notification.interfaces.FilterStage; 33 import org.jacorb.notification.interfaces.FilterStageSource; 34 import org.jacorb.notification.interfaces.ProxyEvent; 35 import org.jacorb.notification.interfaces.ProxyEventListener; 36 import org.jacorb.notification.servant.AbstractAdmin; 37 import org.jacorb.notification.servant.AbstractSupplierAdmin; 38 import org.jacorb.notification.servant.FilterStageListManager; 39 import org.jacorb.notification.servant.ManageableServant; 40 import org.jacorb.notification.util.AdminPropertySet; 41 import org.jacorb.notification.util.DisposableManager; 42 import org.jacorb.notification.util.PropertySet; 43 import org.jacorb.notification.util.QoSPropertySet; 44 import org.omg.CORBA.Any ; 45 import org.omg.CORBA.IntHolder ; 46 import org.omg.CORBA.OBJECT_NOT_EXIST ; 47 import org.omg.CORBA.ORB ; 48 import org.omg.CosNotification.EventReliability; 49 import org.omg.CosNotification.MaxConsumers; 50 import org.omg.CosNotification.MaxSuppliers; 51 import org.omg.CosNotification.NamedPropertyRangeSeqHolder; 52 import org.omg.CosNotification.Property; 53 import org.omg.CosNotification.UnsupportedAdmin; 54 import org.omg.CosNotification.UnsupportedQoS; 55 import org.omg.CosNotifyChannelAdmin.AdminLimit; 56 import org.omg.CosNotifyChannelAdmin.AdminLimitExceeded; 57 import org.omg.CosNotifyChannelAdmin.AdminNotFound; 58 import org.omg.CosNotifyChannelAdmin.InterFilterGroupOperator; 59 import org.omg.CosNotifyFilter.FilterFactory; 60 import org.omg.PortableServer.POA ; 61 import org.omg.PortableServer.Servant ; 62 import org.picocontainer.MutablePicoContainer; 63 import org.picocontainer.defaults.CachingComponentAdapter; 64 import org.picocontainer.defaults.ConstructorInjectionComponentAdapter; 65 66 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 67 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 68 69 73 74 public abstract class AbstractEventChannel implements Disposable, ManageableServant 75 { 76 79 private static final Integer DEFAULT_ADMIN_KEY = new Integer (0); 80 81 private final DisposableManager disposables_ = new DisposableManager(); 82 83 protected final Logger logger_; 84 85 protected final ORB orb_; 86 87 protected final POA poa_; 88 89 protected final Configuration configuration_; 90 91 94 private final SynchronizedInt maxNumberOfSuppliers_ = new SynchronizedInt(0); 95 96 99 private final SynchronizedInt maxNumberOfConsumers_ = new SynchronizedInt(0); 100 101 private final AdminPropertySet adminSettings_; 102 103 private final QoSPropertySet qosSettings_; 104 105 private final FilterStageListManager listManager_; 106 107 private final FilterFactory defaultFilterFactory_; 108 109 112 private final Object modifyConsumerAdminsLock_ = new Object (); 113 114 117 private final Object modifySupplierAdminsLock_ = new Object (); 118 119 122 private final Map consumerAdminServants_ = new HashMap (); 123 124 127 private final Map supplierAdminServants_ = new HashMap (); 128 129 135 private final SynchronizedInt adminIdPool_ = new SynchronizedInt(1); 136 137 140 private final SynchronizedInt numberOfConsumers_ = new SynchronizedInt(0); 141 142 145 private final SynchronizedInt numberOfSuppliers_ = new SynchronizedInt(0); 146 147 protected boolean duringConstruction_ = true; 148 149 private final ProxyEventListener proxyConsumerEventListener_ = new ProxyEventListener() 150 { 151 public void actionProxyCreationRequest(ProxyEvent event) throws AdminLimitExceeded 152 { 153 addConsumer(); 154 } 155 156 public void actionProxyCreated(ProxyEvent event) 157 { 158 } 160 161 public void actionProxyDisposed(ProxyEvent event) 162 { 163 removeConsumer(); 164 } 165 }; 166 167 private final ProxyEventListener proxySupplierEventListener_ = new ProxyEventListener() 168 { 169 public void actionProxyCreationRequest(ProxyEvent event) throws AdminLimitExceeded 170 { 171 addSupplier(); 172 } 173 174 public void actionProxyCreated(ProxyEvent event) 175 { 176 } 178 179 public void actionProxyDisposed(ProxyEvent event) 180 { 181 removeSupplier(); 182 } 183 }; 184 185 protected final MutablePicoContainer container_; 186 187 private final int id_; 188 189 private final SynchronizedBoolean destroyed_ = new SynchronizedBoolean(false); 190 191 193 public AbstractEventChannel(IFactory factory, ORB orb, POA poa, Configuration config, 194 FilterFactory filterFactory) 195 { 196 super(); 197 198 id_ = factory.getChannelID(); 199 200 orb_ = orb; 201 poa_ = poa; 202 configuration_ = config; 203 defaultFilterFactory_ = filterFactory; 204 container_ = factory.getContainer(); 205 206 logger_ = ((org.jacorb.config.Configuration) config).getNamedLogger(getClass().getName()); 207 208 container_.registerComponent(new CachingComponentAdapter( 209 new ConstructorInjectionComponentAdapter(SubscriptionManager.class, 210 SubscriptionManager.class))); 211 212 container_.registerComponent(new CachingComponentAdapter( 213 new ConstructorInjectionComponentAdapter(OfferManager.class, OfferManager.class))); 214 215 adminSettings_ = new AdminPropertySet(configuration_); 216 217 qosSettings_ = new QoSPropertySet(configuration_, QoSPropertySet.CHANNEL_QOS); 218 219 listManager_ = new FilterStageListManager() 220 { 221 public void fetchListData(FilterStageListManager.List list) 222 { 223 synchronized (modifyConsumerAdminsLock_) 224 { 225 Iterator i = consumerAdminServants_.keySet().iterator(); 226 227 while (i.hasNext()) 228 { 229 Integer _key = (Integer ) i.next(); 230 list.add((FilterStage) consumerAdminServants_.get(_key)); 231 } 232 } 233 } 234 }; 235 } 236 237 239 245 private void addConsumer() throws AdminLimitExceeded 246 { 247 if ((maxNumberOfConsumers_.get() == 0) 248 || (numberOfConsumers_.compareTo(maxNumberOfConsumers_) < 0)) 249 { 250 numberOfConsumers_.increment(); 251 } 252 else 253 { 254 Any _any = orb_.create_any(); 255 _any.insert_long(maxNumberOfConsumers_.get()); 256 257 AdminLimit _limit = new AdminLimit("consumer limit", _any); 258 259 throw new AdminLimitExceeded("Consumer creation request exceeds AdminLimit.", _limit); 260 } 261 } 262 263 private void removeConsumer() 264 { 265 numberOfConsumers_.decrement(); 266 } 267 268 274 private void addSupplier() throws AdminLimitExceeded 275 { 276 if ((maxNumberOfSuppliers_.get() == 0) 277 || (numberOfSuppliers_.compareTo(maxNumberOfSuppliers_) < 0)) 278 { 279 numberOfSuppliers_.increment(); 280 } 281 else 282 { 283 Any _any = orb_.create_any(); 284 _any.insert_long(maxNumberOfSuppliers_.get()); 285 286 AdminLimit _limit = new AdminLimit("suppliers limit", _any); 287 288 throw new AdminLimitExceeded("supplier creation request exceeds AdminLimit.", _limit); 289 } 290 } 291 292 private void removeSupplier() 293 { 294 numberOfSuppliers_.decrement(); 295 } 296 297 public final int getAdminID() 298 { 299 if (duringConstruction_) 300 { 301 return 0; 302 } 303 return adminIdPool_.increment(); 304 } 305 306 protected final boolean isDefaultConsumerAdminActive() 307 { 308 synchronized (modifyConsumerAdminsLock_) 309 { 310 return consumerAdminServants_.containsKey(DEFAULT_ADMIN_KEY); 311 } 312 } 313 314 protected final boolean isDefaultSupplierAdminActive() 315 { 316 synchronized (modifySupplierAdminsLock_) 317 { 318 return supplierAdminServants_.containsKey(DEFAULT_ADMIN_KEY); 319 } 320 } 321 322 328 public final FilterFactory default_filter_factory() 329 { 330 return defaultFilterFactory_; 331 } 332 333 public final int[] get_all_consumeradmins() 334 { 335 int[] _allKeys; 337 synchronized (modifyConsumerAdminsLock_) 338 { 339 _allKeys = new int[consumerAdminServants_.size()]; 341 Iterator i = consumerAdminServants_.keySet().iterator(); 342 int x = 0; 343 while (i.hasNext()) 344 { 345 _allKeys[x++] = ((Integer ) i.next()).intValue(); 346 } 347 } 348 349 return _allKeys; 350 } 351 352 public final int[] get_all_supplieradmins() 353 { 354 int[] _allKeys; 355 356 synchronized (modifySupplierAdminsLock_) 357 { 358 _allKeys = new int[supplierAdminServants_.size()]; 359 360 Iterator i = supplierAdminServants_.keySet().iterator(); 361 int x = 0; 362 while (i.hasNext()) 363 { 364 _allKeys[x++] = ((Integer ) i.next()).intValue(); 365 } 366 } 367 368 return _allKeys; 369 } 370 371 public final Property[] get_admin() 372 { 373 return adminSettings_.toArray(); 374 } 375 376 public final Property[] get_qos() 377 { 378 return qosSettings_.toArray(); 379 } 380 381 public final void set_qos(Property[] props) throws UnsupportedQoS 382 { 383 qosSettings_.validate_qos(props, new NamedPropertyRangeSeqHolder()); 384 385 qosSettings_.set_qos(props); 386 } 387 388 public final void validate_qos(Property[] props, 389 NamedPropertyRangeSeqHolder namedPropertySeqHolder) throws UnsupportedQoS 390 { 391 qosSettings_.validate_qos(props, namedPropertySeqHolder); 392 } 393 394 public final void set_admin(Property[] adminProps) throws UnsupportedAdmin 395 { 396 adminSettings_.validate_admin(adminProps); 397 398 adminSettings_.set_admin(adminProps); 399 400 configureAdminLimits(adminSettings_); 401 } 402 403 private void configureAdminLimits(PropertySet adminProperties) 404 { 405 Any _maxConsumers = adminProperties.get(MaxConsumers.value); 406 maxNumberOfConsumers_.set(_maxConsumers.extract_long()); 407 408 Any _maxSuppliers = adminProperties.get(MaxSuppliers.value); 409 maxNumberOfSuppliers_.set(_maxSuppliers.extract_long()); 410 411 if (logger_.isInfoEnabled()) 412 { 413 logger_.info("set MaxNumberOfConsumers=" + maxNumberOfConsumers_); 414 logger_.info("set MaxNumberOfSuppliers=" + maxNumberOfSuppliers_); 415 } 416 } 417 418 421 public final void destroy() 422 { 423 if (destroyed_.commit(false, true)) 424 { 425 container_.dispose(); 426 427 List list = container_.getComponentInstancesOfType(IContainer.class); 428 for (Iterator i = list.iterator(); i.hasNext();) 429 { 430 IContainer element = (IContainer) i.next(); 431 element.destroy(); 432 } 433 } 434 else 435 { 436 throw new OBJECT_NOT_EXIST (); 437 } 438 } 439 440 public final void dispose() 441 { 442 logger_.info("destroy channel " + id_); 443 444 deactivate(); 445 446 disposables_.dispose(); 447 } 448 449 454 public final POA _default_POA() 455 { 456 return poa_; 457 } 458 459 public boolean isPersistent() 460 { 461 return false; 462 } 463 464 468 public final int getNumberOfConnectedClients() 469 { 470 return numberOfConsumers_.get() + numberOfSuppliers_.get(); 471 } 472 473 public final int getMaxNumberOfSuppliers() 474 { 475 return maxNumberOfSuppliers_.get(); 476 } 477 478 public final int getMaxNumberOfConsumers() 479 { 480 return maxNumberOfConsumers_.get(); 481 } 482 483 public final void deactivate() 484 { 485 try 486 { 487 poa_.deactivate_object(poa_.servant_to_id(getServant())); 488 } catch (Exception e) 489 { 490 logger_.error("Unable to deactivate EventChannel Object", e); 491 492 throw new RuntimeException (); 493 } 494 } 495 496 abstract protected Servant getServant(); 497 498 private Property[] createQoSPropertiesForAdmin() 499 { 500 Map _copy = new HashMap (qosSettings_.toMap()); 501 502 _copy.remove(EventReliability.value); 503 504 return PropertySet.map2Props(_copy); 505 } 506 507 protected AbstractAdmin get_consumeradmin_internal(int identifier) throws AdminNotFound 508 { 509 synchronized (modifyConsumerAdminsLock_) 510 { 511 Integer _key = new Integer (identifier); 512 513 if (consumerAdminServants_.containsKey(_key)) 514 { 515 AbstractAdmin _admin = (AbstractAdmin) consumerAdminServants_.get(_key); 516 517 return _admin; 518 } 519 520 throw new AdminNotFound("ID " + identifier + " does not exist."); 521 } 522 } 523 524 protected AbstractAdmin get_supplieradmin_internal(int identifier) throws AdminNotFound 525 { 526 synchronized (modifySupplierAdminsLock_) 527 { 528 Integer _key = new Integer (identifier); 529 530 if (supplierAdminServants_.containsKey(_key)) 531 { 532 AbstractAdmin _admin = (AbstractAdmin) supplierAdminServants_.get(_key); 533 534 return _admin; 535 } 536 537 throw new AdminNotFound("ID " + identifier + " does not exist."); 538 } 539 } 540 541 544 List getAllConsumerAdmins() 545 { 546 return listManager_.getList(); 547 } 548 549 protected AbstractAdmin getDefaultConsumerAdminServant() 550 { 551 AbstractAdmin _admin; 552 553 synchronized (modifyConsumerAdminsLock_) 554 { 555 _admin = (AbstractAdmin) consumerAdminServants_.get(DEFAULT_ADMIN_KEY); 556 557 if (_admin == null) 558 { 559 _admin = newConsumerAdminServant(DEFAULT_ADMIN_KEY.intValue()); 560 561 try 562 { 563 _admin.set_qos(createQoSPropertiesForAdmin()); 564 } catch (UnsupportedQoS e) 565 { 566 logger_.fatalError("unable to set qos", e); 567 } 568 569 addToConsumerAdmins(_admin); 570 } 571 } 572 573 return _admin; 574 } 575 576 private void addToConsumerAdmins(AbstractAdmin admin) 577 { 578 final Integer _key = admin.getID(); 579 580 admin.addDisposeHook(new Disposable() 581 { 582 public void dispose() 583 { 584 synchronized (modifyConsumerAdminsLock_) 585 { 586 consumerAdminServants_.remove(_key); 587 listManager_.actionSourceModified(); 588 } 589 } 590 }); 591 592 synchronized (modifyConsumerAdminsLock_) 593 { 594 consumerAdminServants_.put(_key, admin); 595 596 listManager_.actionSourceModified(); 597 } 598 } 599 600 protected AbstractAdmin new_for_consumers_servant(InterFilterGroupOperator filterGroupOperator, 601 IntHolder intHolder) 602 { 603 AbstractAdmin _admin = newConsumerAdminServant(createAdminID()); 604 605 _admin.setInterFilterGroupOperator(filterGroupOperator); 606 607 intHolder.value = _admin.getID().intValue(); 608 609 try 610 { 611 _admin.set_qos(createQoSPropertiesForAdmin()); 612 } catch (UnsupportedQoS e) 613 { 614 logger_.error("unable to set QoS", e); 615 } 616 617 _admin.addProxyEventListener(proxySupplierEventListener_); 618 619 addToConsumerAdmins(_admin); 620 621 return _admin; 622 } 623 624 private int createAdminID() 625 { 626 return adminIdPool_.increment(); 627 } 628 629 private void addToSupplierAdmins(AbstractAdmin admin) 630 { 631 final Integer _key = admin.getID(); 632 633 admin.addDisposeHook(new Disposable() 634 { 635 public void dispose() 636 { 637 synchronized (modifySupplierAdminsLock_) 638 { 639 supplierAdminServants_.remove(_key); 640 } 641 } 642 }); 643 644 synchronized (modifySupplierAdminsLock_) 645 { 646 supplierAdminServants_.put(_key, admin); 647 } 648 } 649 650 protected AbstractAdmin new_for_suppliers_servant(InterFilterGroupOperator filterGroupOperator, 651 IntHolder intHolder) 652 { 653 AbstractAdmin _admin = newSupplierAdminServant(createAdminID()); 654 655 intHolder.value = _admin.getID().intValue(); 656 657 _admin.setInterFilterGroupOperator(filterGroupOperator); 658 659 try 660 { 661 _admin.set_qos(createQoSPropertiesForAdmin()); 662 } catch (UnsupportedQoS e) 663 { 664 logger_.fatalError("error setting qos", e); 665 } 666 667 _admin.addProxyEventListener(proxyConsumerEventListener_); 668 669 addToSupplierAdmins(_admin); 670 671 return _admin; 672 } 673 674 protected AbstractAdmin getDefaultSupplierAdminServant() 675 { 676 AbstractAdmin _admin; 677 678 synchronized (modifySupplierAdminsLock_) 679 { 680 _admin = (AbstractAdmin) supplierAdminServants_.get(DEFAULT_ADMIN_KEY); 681 682 if (_admin == null) 683 { 684 _admin = newSupplierAdminServant(DEFAULT_ADMIN_KEY.intValue()); 685 686 try 687 { 688 _admin.set_qos(createQoSPropertiesForAdmin()); 689 } catch (UnsupportedQoS e) 690 { 691 logger_.fatalError("unable to set qos", e); 692 } 693 694 addToSupplierAdmins(_admin); 695 } 696 } 697 698 return _admin; 699 } 700 701 703 private AbstractAdmin newConsumerAdminServant(int id) 704 { 705 AbstractAdmin _admin = newConsumerAdmin(id); 706 707 return _admin; 708 } 709 710 protected abstract AbstractAdmin newConsumerAdmin(int id); 711 712 714 private static class FilterStageSourceAdapter implements FilterStageSource 715 { 716 final WeakReference channelRef_; 717 718 FilterStageSourceAdapter(AbstractEventChannel channel) 719 { 720 channelRef_ = new WeakReference (channel); 721 } 722 723 public List getSubsequentFilterStages() 724 { 725 return ((AbstractEventChannel) channelRef_.get()).getAllConsumerAdmins(); 726 } 727 } 728 729 private AbstractAdmin newSupplierAdminServant(int id) 730 { 731 AbstractSupplierAdmin _admin = newSupplierAdmin(id); 732 733 _admin.setSubsequentFilterStageSource(new FilterStageSourceAdapter(this)); 734 735 return _admin; 736 } 737 738 protected abstract AbstractSupplierAdmin newSupplierAdmin(int id); 739 740 public int getID() 741 { 742 return id_; 743 } 744 745 public final void addDisposeHook(Disposable d) 746 { 747 disposables_.addDisposable(d); 748 } 749 } 750 751 | Popular Tags |