KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > notification > AbstractChannelFactory


1 package org.jacorb.notification;
2
3 /*
4  * JacORB - a free Java ORB
5  *
6  * Copyright (C) 1997-2003 Gerald Brose.
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the Free
20  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */

22
23 import java.io.FileWriter JavaDoc;
24 import java.io.IOException JavaDoc;
25 import java.io.PrintWriter JavaDoc;
26 import java.util.ArrayList JavaDoc;
27 import java.util.Iterator JavaDoc;
28 import java.util.List JavaDoc;
29 import java.util.Map JavaDoc;
30 import java.util.Properties JavaDoc;
31
32 import org.apache.avalon.framework.configuration.Configuration;
33 import org.apache.avalon.framework.configuration.ConfigurationException;
34 import org.apache.avalon.framework.logger.Logger;
35 import org.jacorb.notification.conf.Attributes;
36 import org.jacorb.notification.container.BiDirGiopPOAComponentAdapter;
37 import org.jacorb.notification.container.PicoContainerFactory;
38 import org.jacorb.notification.interfaces.Disposable;
39 import org.jacorb.notification.servant.ManageableServant;
40 import org.jacorb.notification.util.AdminPropertySet;
41 import org.jacorb.notification.util.PropertySet;
42 import org.jacorb.notification.util.QoSPropertySet;
43 import org.omg.CORBA.Any JavaDoc;
44 import org.omg.CORBA.IntHolder JavaDoc;
45 import org.omg.CORBA.ORB JavaDoc;
46 import org.omg.CORBA.UserException JavaDoc;
47 import org.omg.CosNaming.NameComponent JavaDoc;
48 import org.omg.CosNaming.NamingContext JavaDoc;
49 import org.omg.CosNaming.NamingContextHelper JavaDoc;
50 import org.omg.CosNotification.BestEffort;
51 import org.omg.CosNotification.ConnectionReliability;
52 import org.omg.CosNotification.EventReliability;
53 import org.omg.CosNotification.Persistent;
54 import org.omg.CosNotification.Property;
55 import org.omg.CosNotification.PropertyError;
56 import org.omg.CosNotification.PropertyRange;
57 import org.omg.CosNotification.QoSError_code;
58 import org.omg.CosNotification.UnsupportedAdmin;
59 import org.omg.CosNotification.UnsupportedQoS;
60 import org.omg.CosNotifyChannelAdmin.ChannelNotFound;
61 import org.omg.PortableServer.IdAssignmentPolicyValue JavaDoc;
62 import org.omg.PortableServer.POA JavaDoc;
63 import org.omg.PortableServer.Servant JavaDoc;
64 import org.picocontainer.MutablePicoContainer;
65 import org.picocontainer.PicoContainer;
66 import org.picocontainer.defaults.ComponentAdapterFactory;
67
68 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
69
70 /**
71  * @author Alphonse Bendt
72  * @version $Id: AbstractChannelFactory.java,v 1.12 2005/05/04 13:58:47 alphonse.bendt Exp $
73  */

74
75 public abstract class AbstractChannelFactory implements ManageableServant, Disposable
76 {
77     interface ShutdownCallback
78     {
79         void needTime(int time);
80
81         void shutdownComplete();
82     }
83
84     ////////////////////////////////////////
85

86     private static final String JavaDoc STANDARD_IMPL_NAME = "JacORB-NotificationService";
87
88     private static final long SHUTDOWN_INTERVAL = 1000;
89
90     private static final String JavaDoc EVENTCHANNEL_FACTORY_POA_NAME = "EventChannelFactoryPOA";
91
92     ////////////////////////////////////////
93

94     private NameComponent JavaDoc[] registeredName_ = null;
95
96     private NamingContext JavaDoc namingContext_;
97
98     /**
99      * the method that is executed when destroy is invoked.
100      */

101     private Runnable JavaDoc destroyMethod_ = new Runnable JavaDoc()
102     {
103         public void run()
104         {
105             dispose();
106         }
107     };
108
109     /////////
110

111     protected final MutablePicoContainer container_;
112
113     protected final ComponentAdapterFactory componentAdapterFactory_;
114
115     protected final Configuration config_;
116
117     protected final org.omg.CORBA.Object JavaDoc thisRef_;
118
119     protected final Logger logger_;
120
121     private final String JavaDoc ior_;
122
123     private final String JavaDoc corbaLoc_;
124
125     private final POA JavaDoc eventChannelFactoryPOA_;
126
127     private final ChannelManager channelManager_ = new ChannelManager();
128
129     private final SynchronizedInt eventChannelIDPool_ = new SynchronizedInt(-1);
130
131     ////////////////////////////////////////
132

133     protected AbstractChannelFactory(PicoContainer container, final ORB JavaDoc orb) throws UserException JavaDoc
134     {
135         container_ = PicoContainerFactory.createRootContainer(container, (org.jacorb.orb.ORB) orb);
136
137         config_ = (Configuration) container_.getComponentInstance(Configuration.class);
138
139         logger_ = ((org.jacorb.config.Configuration) config_).getNamedLogger(getClass().getName());
140
141         componentAdapterFactory_ = (ComponentAdapterFactory) container_
142                 .getComponentInstance(ComponentAdapterFactory.class);
143
144         POA JavaDoc _rootPOA = (POA JavaDoc) container_.getComponentInstance(POA JavaDoc.class);
145
146         List JavaDoc _ps = new ArrayList JavaDoc();
147         
148         _ps.add(_rootPOA
149                 .create_id_assignment_policy(IdAssignmentPolicyValue.USER_ID));
150         
151         BiDirGiopPOAComponentAdapter.addBiDirGiopPolicy(_ps, orb, config_);
152         
153         org.omg.CORBA.Policy JavaDoc[] _policies = (org.omg.CORBA.Policy JavaDoc[]) _ps.toArray(new org.omg.CORBA.Policy JavaDoc[_ps.size()]);
154         
155         eventChannelFactoryPOA_ = _rootPOA.create_POA(EVENTCHANNEL_FACTORY_POA_NAME, _rootPOA
156                 .the_POAManager(), _policies);
157
158         for (int x = 0; x < _policies.length; ++x)
159         {
160             _policies[x].destroy();
161         }
162
163         _rootPOA.the_POAManager().activate();
164
165         byte[] oid = (getObjectName().getBytes());
166
167         eventChannelFactoryPOA_.activate_object_with_id(oid, getServant());
168
169         thisRef_ = eventChannelFactoryPOA_.id_to_reference(oid);
170
171         if (logger_.isDebugEnabled())
172         {
173             logger_.debug("activated EventChannelFactory with OID '" + new String JavaDoc(oid) + "' on '"
174                     + eventChannelFactoryPOA_.the_name() + "'");
175         }
176
177         ior_ = orb.object_to_string(eventChannelFactoryPOA_.id_to_reference(oid));
178
179         corbaLoc_ = createCorbaLoc();
180
181         ((org.jacorb.orb.ORB) orb).addObjectKey(getShortcut(), ior_);
182     }
183
184     ////////////////////////////////////////
185

186     protected abstract AbstractEventChannel newEventChannel() throws ConfigurationException;
187
188     protected abstract org.omg.CORBA.Object JavaDoc create_abstract_channel(Property[] admin,
189             Property[] qos, IntHolder JavaDoc id) throws UnsupportedAdmin, UnsupportedQoS;
190
191     protected abstract String JavaDoc getObjectName();
192
193     protected abstract String JavaDoc getShortcut();
194
195     protected abstract Servant getServant();
196
197     ////////////////////////////////////////
198

199     protected int getLocalPort()
200     {
201         org.jacorb.orb.ORB jorb = (org.jacorb.orb.ORB) getORB();
202
203         return jorb.getBasicAdapter().getPort();
204     }
205
206     protected String JavaDoc getLocalAddress()
207     {
208         org.jacorb.orb.ORB jorb = (org.jacorb.orb.ORB) getORB();
209
210         return jorb.getBasicAdapter().getAddress();
211     }
212
213     private String JavaDoc createCorbaLoc()
214     {
215         StringBuffer JavaDoc _corbaLoc = new StringBuffer JavaDoc("corbaloc::");
216
217         _corbaLoc.append(getLocalAddress());
218         _corbaLoc.append(":");
219         _corbaLoc.append(getLocalPort());
220         _corbaLoc.append("/");
221         _corbaLoc.append(getShortcut());
222
223         return _corbaLoc.toString();
224     }
225
226     public synchronized org.omg.CORBA.Object JavaDoc activate()
227     {
228         return thisRef_;
229     }
230
231     public void setDestroyMethod(Runnable JavaDoc destroyMethod)
232     {
233         destroyMethod_ = destroyMethod;
234     }
235
236     protected ORB JavaDoc getORB()
237     {
238         return (ORB JavaDoc) container_.getComponentInstance(ORB JavaDoc.class);
239     }
240
241     public final void deactivate()
242     {
243         try
244         {
245             eventChannelFactoryPOA_.deactivate_object(eventChannelFactoryPOA_
246                     .servant_to_id(getServant()));
247         } catch (Exception JavaDoc e)
248         {
249             logger_.fatalError("unable to deactivate object", e);
250
251             throw new RuntimeException JavaDoc();
252         }
253     }
254
255
256     protected Configuration getConfiguration()
257     {
258         return config_;
259     }
260
261     public void dispose()
262     {
263         try
264         {
265             unregisterName();
266         } catch (Exception JavaDoc e)
267         {
268             logger_.error("unable to unregister NameService registration", e);
269         }
270
271         channelManager_.dispose();
272
273         container_.dispose();
274
275         getORB().shutdown(true);
276     }
277
278     protected void addToChannels(int id, AbstractEventChannel channel)
279     {
280         channelManager_.add_channel(id, channel);
281     }
282
283     protected int[] getAllChannels()
284     {
285         return channelManager_.get_all_channels();
286     }
287
288     protected AbstractEventChannel get_event_channel_servant(int id) throws ChannelNotFound
289     {
290         return channelManager_.get_channel_servant(id);
291     }
292
293     protected Iterator JavaDoc getChannelIterator()
294     {
295         return channelManager_.getChannelIterator();
296     }
297
298     protected AbstractEventChannel create_channel_servant(IntHolder JavaDoc id, Property[] qosProps,
299             Property[] adminProps) throws UnsupportedAdmin, UnsupportedQoS, ConfigurationException
300     {
301         // check QoS and Admin Settings
302

303         AdminPropertySet _adminSettings = new AdminPropertySet(config_);
304
305         _adminSettings.set_admin(adminProps);
306
307         QoSPropertySet _qosSettings = new QoSPropertySet(config_, QoSPropertySet.CHANNEL_QOS);
308
309         _qosSettings.set_qos(qosProps);
310
311         if (logger_.isDebugEnabled())
312         {
313             logger_.debug("uniqueQoSProps: " + _qosSettings);
314             logger_.debug("uniqueAdminProps: " + _adminSettings);
315         }
316
317         checkQoSSettings(_qosSettings);
318
319         AbstractEventChannel _eventChannelServant = newEventChannel();
320
321         id.value = _eventChannelServant.getID();
322
323         _eventChannelServant.set_qos(_qosSettings.toArray());
324         _eventChannelServant.set_admin(_adminSettings.toArray());
325
326         if (logger_.isDebugEnabled())
327         {
328             logger_.debug("created channel_servant id=" + id.value);
329         }
330
331         return _eventChannelServant;
332     }
333
334     private int createChannelIdentifier()
335     {
336         return eventChannelIDPool_.increment();
337     }
338
339     private void checkQoSSettings(PropertySet uniqueQoSProperties) throws UnsupportedQoS
340     {
341         if (uniqueQoSProperties.containsKey(EventReliability.value))
342         {
343             short _eventReliabilty = uniqueQoSProperties.get(EventReliability.value)
344                     .extract_short();
345
346             switch (_eventReliabilty) {
347             case BestEffort.value:
348                 logger_.info("EventReliability=BestEffort");
349                 break;
350
351             case Persistent.value:
352                 throwPersistentNotSupported(EventReliability.value);
353
354             // fallthrough
355
default:
356                 throwBadValue(EventReliability.value);
357             }
358         }
359
360         short _connectionReliability = BestEffort.value;
361
362         if (uniqueQoSProperties.containsKey(ConnectionReliability.value))
363         {
364             _connectionReliability = uniqueQoSProperties.get(ConnectionReliability.value)
365                     .extract_short();
366
367             switch (_connectionReliability) {
368             case BestEffort.value:
369                 logger_.info("ConnectionReliability=BestEffort");
370                 break;
371
372             case Persistent.value:
373                 throwPersistentNotSupported(ConnectionReliability.value);
374
375                 break; // to satisfy compiler
376
default:
377                 throwBadValue(ConnectionReliability.value);
378             }
379         }
380     }
381
382     private void throwPersistentNotSupported(String JavaDoc property) throws UnsupportedQoS
383     {
384         Any JavaDoc _lowVal = getORB().create_any();
385         Any JavaDoc _highVal = getORB().create_any();
386
387         _lowVal.insert_short(BestEffort.value);
388         _highVal.insert_short(BestEffort.value);
389
390         UnsupportedQoS _e = new UnsupportedQoS(new PropertyError[] { new PropertyError(
391                 QoSError_code.UNSUPPORTED_VALUE, property, new PropertyRange(_lowVal, _highVal)) });
392
393         throw _e;
394     }
395
396     private void throwBadValue(String JavaDoc property) throws UnsupportedQoS
397     {
398         Any JavaDoc _lowVal = getORB().create_any();
399         Any JavaDoc _highVal = getORB().create_any();
400
401         _lowVal.insert_short(BestEffort.value);
402         _highVal.insert_short(BestEffort.value);
403
404         UnsupportedQoS _e = new UnsupportedQoS("The specified Property Value is not supported",
405                 new PropertyError[] { new PropertyError(QoSError_code.BAD_VALUE, property,
406                         new PropertyRange(_lowVal, _highVal)) });
407         throw _e;
408     }
409
410     public void destroy()
411     {
412         // start extra thread to
413
// shut down the Notification Service.
414
// otherwise ORB.shutdown() would be called inside
415
// a remote invocation which causes an exception.
416
Thread JavaDoc _shutdown = new Thread JavaDoc()
417         {
418             public void run()
419             {
420                 try
421                 {
422                     logger_.info("Notification Service is going down in " + SHUTDOWN_INTERVAL
423                             + " ms");
424
425                     Thread.sleep(SHUTDOWN_INTERVAL);
426                 } catch (InterruptedException JavaDoc e)
427                 {
428                     // ignore
429
}
430
431                 destroyMethod_.run();
432             }
433         };
434         
435         _shutdown.start();
436     }
437
438     /**
439      * shutdown is called by the Java Wrapper
440      */

441     public void shutdown(ShutdownCallback cb)
442     {
443         // estimate shutdown time.
444
// during shutdown disconnect must be called on every
445
// connected client. in worst case the client is not
446
// acccessible anymore and disconnect raises TRANSIENT. as
447
// this could take some time request some more time from the
448
// WrapperManager who is initiating the shutdown.
449

450         int _numberOfClients = 0;
451
452         Iterator JavaDoc i = getChannelIterator();
453
454         while (i.hasNext())
455         {
456             AbstractEventChannel _channel = (AbstractEventChannel) ((Map.Entry JavaDoc) i.next())
457                     .getValue();
458
459             _numberOfClients += _channel.getNumberOfConnectedClients();
460         }
461
462         // TODO fetch this from somewhere?
463
int _connectionTimeout = 4000;
464
465         int _estimatedShutdowntime = _numberOfClients * _connectionTimeout;
466
467         if (logger_.isInfoEnabled())
468         {
469             logger_.info("Connected Clients: " + _numberOfClients);
470             logger_.info("Connection Timeout: " + _connectionTimeout + " ms");
471             logger_.info("Estimated Shutdowntime: " + _estimatedShutdowntime + " ms");
472         }
473
474         // estimate 4000ms shutdowntime per channel
475
cb.needTime(_estimatedShutdowntime);
476
477         logger_.info("NotificationService is going down");
478
479         dispose();
480
481         logger_.info("NotificationService down");
482
483         cb.shutdownComplete();
484     }
485
486     public String JavaDoc getIOR()
487     {
488         return ior_;
489     }
490
491     public String JavaDoc getCorbaLoc()
492     {
493         return corbaLoc_;
494     }
495
496     private static AbstractChannelFactory newChannelFactory(PicoContainer container, ORB JavaDoc orb,
497             boolean typed) throws UserException JavaDoc
498     {
499         if (typed)
500         {
501             return new TypedEventChannelFactoryImpl(container, orb);
502         }
503
504         return new EventChannelFactoryImpl(container, orb);
505     }
506
507     public static AbstractChannelFactory newFactory(PicoContainer container, final ORB JavaDoc orb,
508             boolean startThread, Properties JavaDoc props) throws Exception JavaDoc
509     {
510         AbstractChannelFactory _factory = newChannelFactory(container, orb, "on".equals(props
511                 .get(Attributes.ENABLE_TYPED_CHANNEL)));
512
513         // force activation
514
_factory.activate();
515
516         _factory.printIOR(props);
517
518         _factory.printCorbaLoc(props);
519
520         _factory.writeFile(props);
521
522         _factory.registerName(props);
523
524         _factory.startChannels(props);
525
526         if (startThread)
527         {
528             Thread JavaDoc _orbThread = new Thread JavaDoc(new Runnable JavaDoc()
529             {
530                 public void run()
531                 {
532                     orb.run();
533                 }
534             });
535
536             _orbThread.setName("Notification ORB Runner Thread");
537
538             _orbThread.setDaemon(false);
539
540             _orbThread.start();
541         }
542
543         return _factory;
544     }
545
546     public static AbstractChannelFactory newFactory(final ORB JavaDoc orb, boolean startThread,
547             Properties JavaDoc props) throws Exception JavaDoc
548     {
549         return newFactory(null, orb, startThread, props);
550     }
551
552     public static AbstractChannelFactory newFactory(PicoContainer container, Properties JavaDoc props)
553             throws Exception JavaDoc
554     {
555         props.put("jacorb.implname", STANDARD_IMPL_NAME);
556
557         ORB JavaDoc _orb = ORB.init(new String JavaDoc[] {}, props);
558
559         AbstractChannelFactory factory = newFactory(container, _orb, true, props);
560
561         // factory.startChannels(1);
562

563         return factory;
564     }
565
566     public static AbstractChannelFactory newFactory(Properties JavaDoc props) throws Exception JavaDoc
567     {
568         return newFactory(null, props);
569     }
570
571     private void registerName(Properties JavaDoc props) throws Exception JavaDoc
572     {
573         registerName(props.getProperty(Attributes.REGISTER_NAME_ID), props.getProperty(
574                 Attributes.REGISTER_NAME_KIND, ""));
575     }
576
577     private synchronized void registerName(String JavaDoc nameId, String JavaDoc nameKind) throws Exception JavaDoc
578     {
579         if (nameId == null)
580         {
581             return;
582         }
583
584         namingContext_ = NamingContextHelper.narrow(getORB().resolve_initial_references(
585                 "NameService"));
586
587         if (namingContext_ == null)
588         {
589             throw new ConfigurationException("could not resolve initial reference 'NameService'");
590         }
591
592         NameComponent JavaDoc[] _name = new NameComponent JavaDoc[] { new NameComponent JavaDoc(nameId, nameKind) };
593
594         if (logger_.isInfoEnabled())
595         {
596             logger_.info("namingContext.rebind(" + nameId
597                     + ((nameKind != null && nameKind.length() > 0) ? ("." + nameKind) : "")
598                     + " => " + getCorbaLoc() + ")");
599         }
600
601         namingContext_.rebind(_name, thisRef_);
602
603         registeredName_ = _name;
604     }
605
606     private synchronized void unregisterName() throws Exception JavaDoc
607     {
608         if (namingContext_ != null)
609         {
610             if (registeredName_ != null)
611             {
612                 namingContext_.unbind(registeredName_);
613
614                 registeredName_ = null;
615             }
616         }
617     }
618
619     private void startChannels(Properties JavaDoc props) throws UnsupportedQoS, UnsupportedAdmin
620     {
621         if (props.containsKey(Attributes.START_CHANNELS))
622         {
623             startChannels(Integer.parseInt((String JavaDoc) props.get(Attributes.START_CHANNELS)));
624         }
625     }
626
627     private void startChannels(int channels) throws UnsupportedQoS, UnsupportedAdmin
628     {
629         for (int i = 0; i < channels; i++)
630         {
631             IntHolder JavaDoc ih = new IntHolder JavaDoc();
632             create_abstract_channel(new Property[0], new Property[0], ih);
633         }
634     }
635
636     private void printIOR(Properties JavaDoc props)
637     {
638         if ("on".equals(props.get(Attributes.PRINT_IOR)))
639         {
640             System.out.println(getIOR());
641         }
642     }
643
644     private void printCorbaLoc(Properties JavaDoc props)
645     {
646         if ("on".equals(props.get(Attributes.PRINT_CORBALOC)))
647         {
648             System.out.println(getCorbaLoc());
649         }
650     }
651
652     private void writeFile(Properties JavaDoc props)
653     {
654         String JavaDoc _iorFileName = (String JavaDoc) props.get(Attributes.IOR_FILE);
655
656         if (_iorFileName != null)
657         {
658             try
659             {
660                 PrintWriter JavaDoc out = new PrintWriter JavaDoc(new FileWriter JavaDoc(_iorFileName));
661                 try
662                 {
663                     out.println(getIOR());
664                     out.flush();
665                 } finally {
666                     out.close();
667                 }
668             } catch (IOException JavaDoc e)
669             {
670                 e.printStackTrace();
671             }
672         }
673     }
674
675     public POA JavaDoc _default_POA()
676     {
677         return eventChannelFactoryPOA_;
678     }
679
680     protected MutablePicoContainer newContainerForChannel()
681     {
682         final MutablePicoContainer _channelContainer = PicoContainerFactory
683                 .createChildContainer(container_);
684     
685         // create identifier
686
final int _channelID = createChannelIdentifier();
687         IFactory _factory = new IFactory()
688         {
689             public MutablePicoContainer getContainer()
690             {
691                 return _channelContainer;
692             }
693     
694             public int getChannelID()
695             {
696                 return _channelID;
697             }
698     
699             public void destroy()
700             {
701                 container_.removeChildContainer(_channelContainer);
702             }
703         };
704         
705         _channelContainer.registerComponentInstance(IFactory.class, _factory);
706         return _channelContainer;
707     }
708 }
Popular Tags