1 package org.jacorb.notification; 2 3 22 23 import java.io.FileWriter ; 24 import java.io.IOException ; 25 import java.io.PrintWriter ; 26 import java.util.ArrayList ; 27 import java.util.Iterator ; 28 import java.util.List ; 29 import java.util.Map ; 30 import java.util.Properties ; 31 32 import org.apache.avalon.framework.configuration.Configuration; 33 import org.apache.avalon.framework.configuration.ConfigurationException; 34 import org.apache.avalon.framework.logger.Logger; 35 import org.jacorb.notification.conf.Attributes; 36 import org.jacorb.notification.container.BiDirGiopPOAComponentAdapter; 37 import org.jacorb.notification.container.PicoContainerFactory; 38 import org.jacorb.notification.interfaces.Disposable; 39 import org.jacorb.notification.servant.ManageableServant; 40 import org.jacorb.notification.util.AdminPropertySet; 41 import org.jacorb.notification.util.PropertySet; 42 import org.jacorb.notification.util.QoSPropertySet; 43 import org.omg.CORBA.Any ; 44 import org.omg.CORBA.IntHolder ; 45 import org.omg.CORBA.ORB ; 46 import org.omg.CORBA.UserException ; 47 import org.omg.CosNaming.NameComponent ; 48 import org.omg.CosNaming.NamingContext ; 49 import org.omg.CosNaming.NamingContextHelper ; 50 import org.omg.CosNotification.BestEffort; 51 import org.omg.CosNotification.ConnectionReliability; 52 import org.omg.CosNotification.EventReliability; 53 import org.omg.CosNotification.Persistent; 54 import org.omg.CosNotification.Property; 55 import org.omg.CosNotification.PropertyError; 56 import org.omg.CosNotification.PropertyRange; 57 import org.omg.CosNotification.QoSError_code; 58 import org.omg.CosNotification.UnsupportedAdmin; 59 import org.omg.CosNotification.UnsupportedQoS; 60 import org.omg.CosNotifyChannelAdmin.ChannelNotFound; 61 import org.omg.PortableServer.IdAssignmentPolicyValue ; 62 import org.omg.PortableServer.POA ; 63 import org.omg.PortableServer.Servant ; 64 import org.picocontainer.MutablePicoContainer; 65 import org.picocontainer.PicoContainer; 66 import org.picocontainer.defaults.ComponentAdapterFactory; 67 68 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 69 70 74 75 public abstract class AbstractChannelFactory implements ManageableServant, Disposable 76 { 77 interface ShutdownCallback 78 { 79 void needTime(int time); 80 81 void shutdownComplete(); 82 } 83 84 86 private static final String STANDARD_IMPL_NAME = "JacORB-NotificationService"; 87 88 private static final long SHUTDOWN_INTERVAL = 1000; 89 90 private static final String EVENTCHANNEL_FACTORY_POA_NAME = "EventChannelFactoryPOA"; 91 92 94 private NameComponent [] registeredName_ = null; 95 96 private NamingContext namingContext_; 97 98 101 private Runnable destroyMethod_ = new Runnable () 102 { 103 public void run() 104 { 105 dispose(); 106 } 107 }; 108 109 111 protected final MutablePicoContainer container_; 112 113 protected final ComponentAdapterFactory componentAdapterFactory_; 114 115 protected final Configuration config_; 116 117 protected final org.omg.CORBA.Object thisRef_; 118 119 protected final Logger logger_; 120 121 private final String ior_; 122 123 private final String corbaLoc_; 124 125 private final POA eventChannelFactoryPOA_; 126 127 private final ChannelManager channelManager_ = new ChannelManager(); 128 129 private final SynchronizedInt eventChannelIDPool_ = new SynchronizedInt(-1); 130 131 133 protected AbstractChannelFactory(PicoContainer container, final ORB orb) throws UserException 134 { 135 container_ = PicoContainerFactory.createRootContainer(container, (org.jacorb.orb.ORB) orb); 136 137 config_ = (Configuration) container_.getComponentInstance(Configuration.class); 138 139 logger_ = ((org.jacorb.config.Configuration) config_).getNamedLogger(getClass().getName()); 140 141 componentAdapterFactory_ = (ComponentAdapterFactory) container_ 142 .getComponentInstance(ComponentAdapterFactory.class); 143 144 POA _rootPOA = (POA ) container_.getComponentInstance(POA .class); 145 146 List _ps = new ArrayList (); 147 148 _ps.add(_rootPOA 149 .create_id_assignment_policy(IdAssignmentPolicyValue.USER_ID)); 150 151 BiDirGiopPOAComponentAdapter.addBiDirGiopPolicy(_ps, orb, config_); 152 153 org.omg.CORBA.Policy [] _policies = (org.omg.CORBA.Policy []) _ps.toArray(new org.omg.CORBA.Policy [_ps.size()]); 154 155 eventChannelFactoryPOA_ = _rootPOA.create_POA(EVENTCHANNEL_FACTORY_POA_NAME, _rootPOA 156 .the_POAManager(), _policies); 157 158 for (int x = 0; x < _policies.length; ++x) 159 { 160 _policies[x].destroy(); 161 } 162 163 _rootPOA.the_POAManager().activate(); 164 165 byte[] oid = (getObjectName().getBytes()); 166 167 eventChannelFactoryPOA_.activate_object_with_id(oid, getServant()); 168 169 thisRef_ = eventChannelFactoryPOA_.id_to_reference(oid); 170 171 if (logger_.isDebugEnabled()) 172 { 173 logger_.debug("activated EventChannelFactory with OID '" + new String (oid) + "' on '" 174 + eventChannelFactoryPOA_.the_name() + "'"); 175 } 176 177 ior_ = orb.object_to_string(eventChannelFactoryPOA_.id_to_reference(oid)); 178 179 corbaLoc_ = createCorbaLoc(); 180 181 ((org.jacorb.orb.ORB) orb).addObjectKey(getShortcut(), ior_); 182 } 183 184 186 protected abstract AbstractEventChannel newEventChannel() throws ConfigurationException; 187 188 protected abstract org.omg.CORBA.Object create_abstract_channel(Property[] admin, 189 Property[] qos, IntHolder id) throws UnsupportedAdmin, UnsupportedQoS; 190 191 protected abstract String getObjectName(); 192 193 protected abstract String getShortcut(); 194 195 protected abstract Servant getServant(); 196 197 199 protected int getLocalPort() 200 { 201 org.jacorb.orb.ORB jorb = (org.jacorb.orb.ORB) getORB(); 202 203 return jorb.getBasicAdapter().getPort(); 204 } 205 206 protected String getLocalAddress() 207 { 208 org.jacorb.orb.ORB jorb = (org.jacorb.orb.ORB) getORB(); 209 210 return jorb.getBasicAdapter().getAddress(); 211 } 212 213 private String createCorbaLoc() 214 { 215 StringBuffer _corbaLoc = new StringBuffer ("corbaloc::"); 216 217 _corbaLoc.append(getLocalAddress()); 218 _corbaLoc.append(":"); 219 _corbaLoc.append(getLocalPort()); 220 _corbaLoc.append("/"); 221 _corbaLoc.append(getShortcut()); 222 223 return _corbaLoc.toString(); 224 } 225 226 public synchronized org.omg.CORBA.Object activate() 227 { 228 return thisRef_; 229 } 230 231 public void setDestroyMethod(Runnable destroyMethod) 232 { 233 destroyMethod_ = destroyMethod; 234 } 235 236 protected ORB getORB() 237 { 238 return (ORB ) container_.getComponentInstance(ORB .class); 239 } 240 241 public final void deactivate() 242 { 243 try 244 { 245 eventChannelFactoryPOA_.deactivate_object(eventChannelFactoryPOA_ 246 .servant_to_id(getServant())); 247 } catch (Exception e) 248 { 249 logger_.fatalError("unable to deactivate object", e); 250 251 throw new RuntimeException (); 252 } 253 } 254 255 256 protected Configuration getConfiguration() 257 { 258 return config_; 259 } 260 261 public void dispose() 262 { 263 try 264 { 265 unregisterName(); 266 } catch (Exception e) 267 { 268 logger_.error("unable to unregister NameService registration", e); 269 } 270 271 channelManager_.dispose(); 272 273 container_.dispose(); 274 275 getORB().shutdown(true); 276 } 277 278 protected void addToChannels(int id, AbstractEventChannel channel) 279 { 280 channelManager_.add_channel(id, channel); 281 } 282 283 protected int[] getAllChannels() 284 { 285 return channelManager_.get_all_channels(); 286 } 287 288 protected AbstractEventChannel get_event_channel_servant(int id) throws ChannelNotFound 289 { 290 return channelManager_.get_channel_servant(id); 291 } 292 293 protected Iterator getChannelIterator() 294 { 295 return channelManager_.getChannelIterator(); 296 } 297 298 protected AbstractEventChannel create_channel_servant(IntHolder id, Property[] qosProps, 299 Property[] adminProps) throws UnsupportedAdmin, UnsupportedQoS, ConfigurationException 300 { 301 303 AdminPropertySet _adminSettings = new AdminPropertySet(config_); 304 305 _adminSettings.set_admin(adminProps); 306 307 QoSPropertySet _qosSettings = new QoSPropertySet(config_, QoSPropertySet.CHANNEL_QOS); 308 309 _qosSettings.set_qos(qosProps); 310 311 if (logger_.isDebugEnabled()) 312 { 313 logger_.debug("uniqueQoSProps: " + _qosSettings); 314 logger_.debug("uniqueAdminProps: " + _adminSettings); 315 } 316 317 checkQoSSettings(_qosSettings); 318 319 AbstractEventChannel _eventChannelServant = newEventChannel(); 320 321 id.value = _eventChannelServant.getID(); 322 323 _eventChannelServant.set_qos(_qosSettings.toArray()); 324 _eventChannelServant.set_admin(_adminSettings.toArray()); 325 326 if (logger_.isDebugEnabled()) 327 { 328 logger_.debug("created channel_servant id=" + id.value); 329 } 330 331 return _eventChannelServant; 332 } 333 334 private int createChannelIdentifier() 335 { 336 return eventChannelIDPool_.increment(); 337 } 338 339 private void checkQoSSettings(PropertySet uniqueQoSProperties) throws UnsupportedQoS 340 { 341 if (uniqueQoSProperties.containsKey(EventReliability.value)) 342 { 343 short _eventReliabilty = uniqueQoSProperties.get(EventReliability.value) 344 .extract_short(); 345 346 switch (_eventReliabilty) { 347 case BestEffort.value: 348 logger_.info("EventReliability=BestEffort"); 349 break; 350 351 case Persistent.value: 352 throwPersistentNotSupported(EventReliability.value); 353 354 default: 356 throwBadValue(EventReliability.value); 357 } 358 } 359 360 short _connectionReliability = BestEffort.value; 361 362 if (uniqueQoSProperties.containsKey(ConnectionReliability.value)) 363 { 364 _connectionReliability = uniqueQoSProperties.get(ConnectionReliability.value) 365 .extract_short(); 366 367 switch (_connectionReliability) { 368 case BestEffort.value: 369 logger_.info("ConnectionReliability=BestEffort"); 370 break; 371 372 case Persistent.value: 373 throwPersistentNotSupported(ConnectionReliability.value); 374 375 break; default: 377 throwBadValue(ConnectionReliability.value); 378 } 379 } 380 } 381 382 private void throwPersistentNotSupported(String property) throws UnsupportedQoS 383 { 384 Any _lowVal = getORB().create_any(); 385 Any _highVal = getORB().create_any(); 386 387 _lowVal.insert_short(BestEffort.value); 388 _highVal.insert_short(BestEffort.value); 389 390 UnsupportedQoS _e = new UnsupportedQoS(new PropertyError[] { new PropertyError( 391 QoSError_code.UNSUPPORTED_VALUE, property, new PropertyRange(_lowVal, _highVal)) }); 392 393 throw _e; 394 } 395 396 private void throwBadValue(String property) throws UnsupportedQoS 397 { 398 Any _lowVal = getORB().create_any(); 399 Any _highVal = getORB().create_any(); 400 401 _lowVal.insert_short(BestEffort.value); 402 _highVal.insert_short(BestEffort.value); 403 404 UnsupportedQoS _e = new UnsupportedQoS("The specified Property Value is not supported", 405 new PropertyError[] { new PropertyError(QoSError_code.BAD_VALUE, property, 406 new PropertyRange(_lowVal, _highVal)) }); 407 throw _e; 408 } 409 410 public void destroy() 411 { 412 Thread _shutdown = new Thread () 417 { 418 public void run() 419 { 420 try 421 { 422 logger_.info("Notification Service is going down in " + SHUTDOWN_INTERVAL 423 + " ms"); 424 425 Thread.sleep(SHUTDOWN_INTERVAL); 426 } catch (InterruptedException e) 427 { 428 } 430 431 destroyMethod_.run(); 432 } 433 }; 434 435 _shutdown.start(); 436 } 437 438 441 public void shutdown(ShutdownCallback cb) 442 { 443 450 int _numberOfClients = 0; 451 452 Iterator i = getChannelIterator(); 453 454 while (i.hasNext()) 455 { 456 AbstractEventChannel _channel = (AbstractEventChannel) ((Map.Entry ) i.next()) 457 .getValue(); 458 459 _numberOfClients += _channel.getNumberOfConnectedClients(); 460 } 461 462 int _connectionTimeout = 4000; 464 465 int _estimatedShutdowntime = _numberOfClients * _connectionTimeout; 466 467 if (logger_.isInfoEnabled()) 468 { 469 logger_.info("Connected Clients: " + _numberOfClients); 470 logger_.info("Connection Timeout: " + _connectionTimeout + " ms"); 471 logger_.info("Estimated Shutdowntime: " + _estimatedShutdowntime + " ms"); 472 } 473 474 cb.needTime(_estimatedShutdowntime); 476 477 logger_.info("NotificationService is going down"); 478 479 dispose(); 480 481 logger_.info("NotificationService down"); 482 483 cb.shutdownComplete(); 484 } 485 486 public String getIOR() 487 { 488 return ior_; 489 } 490 491 public String getCorbaLoc() 492 { 493 return corbaLoc_; 494 } 495 496 private static AbstractChannelFactory newChannelFactory(PicoContainer container, ORB orb, 497 boolean typed) throws UserException 498 { 499 if (typed) 500 { 501 return new TypedEventChannelFactoryImpl(container, orb); 502 } 503 504 return new EventChannelFactoryImpl(container, orb); 505 } 506 507 public static AbstractChannelFactory newFactory(PicoContainer container, final ORB orb, 508 boolean startThread, Properties props) throws Exception 509 { 510 AbstractChannelFactory _factory = newChannelFactory(container, orb, "on".equals(props 511 .get(Attributes.ENABLE_TYPED_CHANNEL))); 512 513 _factory.activate(); 515 516 _factory.printIOR(props); 517 518 _factory.printCorbaLoc(props); 519 520 _factory.writeFile(props); 521 522 _factory.registerName(props); 523 524 _factory.startChannels(props); 525 526 if (startThread) 527 { 528 Thread _orbThread = new Thread (new Runnable () 529 { 530 public void run() 531 { 532 orb.run(); 533 } 534 }); 535 536 _orbThread.setName("Notification ORB Runner Thread"); 537 538 _orbThread.setDaemon(false); 539 540 _orbThread.start(); 541 } 542 543 return _factory; 544 } 545 546 public static AbstractChannelFactory newFactory(final ORB orb, boolean startThread, 547 Properties props) throws Exception 548 { 549 return newFactory(null, orb, startThread, props); 550 } 551 552 public static AbstractChannelFactory newFactory(PicoContainer container, Properties props) 553 throws Exception 554 { 555 props.put("jacorb.implname", STANDARD_IMPL_NAME); 556 557 ORB _orb = ORB.init(new String [] {}, props); 558 559 AbstractChannelFactory factory = newFactory(container, _orb, true, props); 560 561 563 return factory; 564 } 565 566 public static AbstractChannelFactory newFactory(Properties props) throws Exception 567 { 568 return newFactory(null, props); 569 } 570 571 private void registerName(Properties props) throws Exception 572 { 573 registerName(props.getProperty(Attributes.REGISTER_NAME_ID), props.getProperty( 574 Attributes.REGISTER_NAME_KIND, "")); 575 } 576 577 private synchronized void registerName(String nameId, String nameKind) throws Exception 578 { 579 if (nameId == null) 580 { 581 return; 582 } 583 584 namingContext_ = NamingContextHelper.narrow(getORB().resolve_initial_references( 585 "NameService")); 586 587 if (namingContext_ == null) 588 { 589 throw new ConfigurationException("could not resolve initial reference 'NameService'"); 590 } 591 592 NameComponent [] _name = new NameComponent [] { new NameComponent (nameId, nameKind) }; 593 594 if (logger_.isInfoEnabled()) 595 { 596 logger_.info("namingContext.rebind(" + nameId 597 + ((nameKind != null && nameKind.length() > 0) ? ("." + nameKind) : "") 598 + " => " + getCorbaLoc() + ")"); 599 } 600 601 namingContext_.rebind(_name, thisRef_); 602 603 registeredName_ = _name; 604 } 605 606 private synchronized void unregisterName() throws Exception 607 { 608 if (namingContext_ != null) 609 { 610 if (registeredName_ != null) 611 { 612 namingContext_.unbind(registeredName_); 613 614 registeredName_ = null; 615 } 616 } 617 } 618 619 private void startChannels(Properties props) throws UnsupportedQoS, UnsupportedAdmin 620 { 621 if (props.containsKey(Attributes.START_CHANNELS)) 622 { 623 startChannels(Integer.parseInt((String ) props.get(Attributes.START_CHANNELS))); 624 } 625 } 626 627 private void startChannels(int channels) throws UnsupportedQoS, UnsupportedAdmin 628 { 629 for (int i = 0; i < channels; i++) 630 { 631 IntHolder ih = new IntHolder (); 632 create_abstract_channel(new Property[0], new Property[0], ih); 633 } 634 } 635 636 private void printIOR(Properties props) 637 { 638 if ("on".equals(props.get(Attributes.PRINT_IOR))) 639 { 640 System.out.println(getIOR()); 641 } 642 } 643 644 private void printCorbaLoc(Properties props) 645 { 646 if ("on".equals(props.get(Attributes.PRINT_CORBALOC))) 647 { 648 System.out.println(getCorbaLoc()); 649 } 650 } 651 652 private void writeFile(Properties props) 653 { 654 String _iorFileName = (String ) props.get(Attributes.IOR_FILE); 655 656 if (_iorFileName != null) 657 { 658 try 659 { 660 PrintWriter out = new PrintWriter (new FileWriter (_iorFileName)); 661 try 662 { 663 out.println(getIOR()); 664 out.flush(); 665 } finally { 666 out.close(); 667 } 668 } catch (IOException e) 669 { 670 e.printStackTrace(); 671 } 672 } 673 } 674 675 public POA _default_POA() 676 { 677 return eventChannelFactoryPOA_; 678 } 679 680 protected MutablePicoContainer newContainerForChannel() 681 { 682 final MutablePicoContainer _channelContainer = PicoContainerFactory 683 .createChildContainer(container_); 684 685 final int _channelID = createChannelIdentifier(); 687 IFactory _factory = new IFactory() 688 { 689 public MutablePicoContainer getContainer() 690 { 691 return _channelContainer; 692 } 693 694 public int getChannelID() 695 { 696 return _channelID; 697 } 698 699 public void destroy() 700 { 701 container_.removeChildContainer(_channelContainer); 702 } 703 }; 704 705 _channelContainer.registerComponentInstance(IFactory.class, _factory); 706 return _channelContainer; 707 } 708 } | Popular Tags |