KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > notification > AbstractEventChannel


1 package org.jacorb.notification;
2
3 /*
4  * JacORB - a free Java ORB
5  *
6  * Copyright (C) 1997-2004 Gerald Brose.
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the Free
20  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */

22
23 import java.lang.ref.WeakReference JavaDoc;
24 import java.util.HashMap JavaDoc;
25 import java.util.Iterator JavaDoc;
26 import java.util.List JavaDoc;
27 import java.util.Map JavaDoc;
28
29 import org.apache.avalon.framework.configuration.Configuration;
30 import org.apache.avalon.framework.logger.Logger;
31 import org.jacorb.notification.interfaces.Disposable;
32 import org.jacorb.notification.interfaces.FilterStage;
33 import org.jacorb.notification.interfaces.FilterStageSource;
34 import org.jacorb.notification.interfaces.ProxyEvent;
35 import org.jacorb.notification.interfaces.ProxyEventListener;
36 import org.jacorb.notification.servant.AbstractAdmin;
37 import org.jacorb.notification.servant.AbstractSupplierAdmin;
38 import org.jacorb.notification.servant.FilterStageListManager;
39 import org.jacorb.notification.servant.ManageableServant;
40 import org.jacorb.notification.util.AdminPropertySet;
41 import org.jacorb.notification.util.DisposableManager;
42 import org.jacorb.notification.util.PropertySet;
43 import org.jacorb.notification.util.QoSPropertySet;
44 import org.omg.CORBA.Any JavaDoc;
45 import org.omg.CORBA.IntHolder JavaDoc;
46 import org.omg.CORBA.OBJECT_NOT_EXIST JavaDoc;
47 import org.omg.CORBA.ORB JavaDoc;
48 import org.omg.CosNotification.EventReliability;
49 import org.omg.CosNotification.MaxConsumers;
50 import org.omg.CosNotification.MaxSuppliers;
51 import org.omg.CosNotification.NamedPropertyRangeSeqHolder;
52 import org.omg.CosNotification.Property;
53 import org.omg.CosNotification.UnsupportedAdmin;
54 import org.omg.CosNotification.UnsupportedQoS;
55 import org.omg.CosNotifyChannelAdmin.AdminLimit;
56 import org.omg.CosNotifyChannelAdmin.AdminLimitExceeded;
57 import org.omg.CosNotifyChannelAdmin.AdminNotFound;
58 import org.omg.CosNotifyChannelAdmin.InterFilterGroupOperator;
59 import org.omg.CosNotifyFilter.FilterFactory;
60 import org.omg.PortableServer.POA JavaDoc;
61 import org.omg.PortableServer.Servant JavaDoc;
62 import org.picocontainer.MutablePicoContainer;
63 import org.picocontainer.defaults.CachingComponentAdapter;
64 import org.picocontainer.defaults.ConstructorInjectionComponentAdapter;
65
66 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
67 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
68
69 /**
70  * @author Alphonse Bendt
71  * @version $Id: AbstractEventChannel.java,v 1.5 2005/04/27 10:32:06 alphonse.bendt Exp $
72  */

73
74 public abstract class AbstractEventChannel implements Disposable, ManageableServant
75 {
76     /**
77      * This key is reserved for the default supplier admin and the default consumer admin.
78      */

79     private static final Integer JavaDoc DEFAULT_ADMIN_KEY = new Integer JavaDoc(0);
80
81     private final DisposableManager disposables_ = new DisposableManager();
82
83     protected final Logger logger_;
84
85     protected final ORB JavaDoc orb_;
86
87     protected final POA JavaDoc poa_;
88
89     protected final Configuration configuration_;
90
91     /**
92      * max number of Suppliers that may be connected at a time to this Channel
93      */

94     private final SynchronizedInt maxNumberOfSuppliers_ = new SynchronizedInt(0);
95
96     /**
97      * max number of Consumers that may be connected at a time to this Channel
98      */

99     private final SynchronizedInt maxNumberOfConsumers_ = new SynchronizedInt(0);
100
101     private final AdminPropertySet adminSettings_;
102
103     private final QoSPropertySet qosSettings_;
104
105     private final FilterStageListManager listManager_;
106
107     private final FilterFactory defaultFilterFactory_;
108
109     /**
110      * lock variable used to access allConsumerAdmins_ and consumerAdminServants_.
111      */

112     private final Object JavaDoc modifyConsumerAdminsLock_ = new Object JavaDoc();
113
114     /**
115      * lock variable used to access allConsumerAdmins_.
116      */

117     private final Object JavaDoc modifySupplierAdminsLock_ = new Object JavaDoc();
118
119     /**
120      * maps id's to ConsumerAdminServants (notify style).
121      */

122     private final Map JavaDoc consumerAdminServants_ = new HashMap JavaDoc();
123
124     /**
125      * maps id's to SupplierAdminServants (notify style).
126      */

127     private final Map JavaDoc supplierAdminServants_ = new HashMap JavaDoc();
128
129     /**
130      * pool of available ID's for Admin Objects. The Pool is used for Consumer and Supplier Admins.
131      * NOTE: The least available ID is 1 as the ID 0 has a special meaning.
132      *
133      * @see #DEFAULT_ADMIN_KEY DEFAULT_ADMIN_KEY.
134      */

135     private final SynchronizedInt adminIdPool_ = new SynchronizedInt(1);
136
137     /**
138      * number of Consumers that are connected to this Channel
139      */

140     private final SynchronizedInt numberOfConsumers_ = new SynchronizedInt(0);
141
142     /**
143      * number of Suppliers that are connected to this Channel
144      */

145     private final SynchronizedInt numberOfSuppliers_ = new SynchronizedInt(0);
146
147     protected boolean duringConstruction_ = true;
148
149     private final ProxyEventListener proxyConsumerEventListener_ = new ProxyEventListener()
150     {
151         public void actionProxyCreationRequest(ProxyEvent event) throws AdminLimitExceeded
152         {
153             addConsumer();
154         }
155
156         public void actionProxyCreated(ProxyEvent event)
157         {
158             // No Op
159
}
160
161         public void actionProxyDisposed(ProxyEvent event)
162         {
163             removeConsumer();
164         }
165     };
166
167     private final ProxyEventListener proxySupplierEventListener_ = new ProxyEventListener()
168     {
169         public void actionProxyCreationRequest(ProxyEvent event) throws AdminLimitExceeded
170         {
171             addSupplier();
172         }
173
174         public void actionProxyCreated(ProxyEvent event)
175         {
176             // No OP
177
}
178
179         public void actionProxyDisposed(ProxyEvent event)
180         {
181             removeSupplier();
182         }
183     };
184
185     protected final MutablePicoContainer container_;
186
187     private final int id_;
188
189     private final SynchronizedBoolean destroyed_ = new SynchronizedBoolean(false);
190
191     ////////////////////////////////////////
192

193     public AbstractEventChannel(IFactory factory, ORB JavaDoc orb, POA JavaDoc poa, Configuration config,
194             FilterFactory filterFactory)
195     {
196         super();
197
198         id_ = factory.getChannelID();
199
200         orb_ = orb;
201         poa_ = poa;
202         configuration_ = config;
203         defaultFilterFactory_ = filterFactory;
204         container_ = factory.getContainer();
205
206         logger_ = ((org.jacorb.config.Configuration) config).getNamedLogger(getClass().getName());
207
208         container_.registerComponent(new CachingComponentAdapter(
209                 new ConstructorInjectionComponentAdapter(SubscriptionManager.class,
210                         SubscriptionManager.class)));
211
212         container_.registerComponent(new CachingComponentAdapter(
213                 new ConstructorInjectionComponentAdapter(OfferManager.class, OfferManager.class)));
214
215         adminSettings_ = new AdminPropertySet(configuration_);
216
217         qosSettings_ = new QoSPropertySet(configuration_, QoSPropertySet.CHANNEL_QOS);
218
219         listManager_ = new FilterStageListManager()
220         {
221             public void fetchListData(FilterStageListManager.List list)
222             {
223                 synchronized (modifyConsumerAdminsLock_)
224                 {
225                     Iterator JavaDoc i = consumerAdminServants_.keySet().iterator();
226
227                     while (i.hasNext())
228                     {
229                         Integer JavaDoc _key = (Integer JavaDoc) i.next();
230                         list.add((FilterStage) consumerAdminServants_.get(_key));
231                     }
232                 }
233             }
234         };
235     }
236
237     ////////////////////////////////////////
238

239     /**
240      * Callback to help keep track of the number of Consumers.
241      *
242      * @exception AdminLimitExceeded
243      * if creation of another Consumer is prohibited.
244      */

245     private void addConsumer() throws AdminLimitExceeded
246     {
247         if ((maxNumberOfConsumers_.get() == 0)
248                 || (numberOfConsumers_.compareTo(maxNumberOfConsumers_) < 0))
249         {
250             numberOfConsumers_.increment();
251         }
252         else
253         {
254             Any JavaDoc _any = orb_.create_any();
255             _any.insert_long(maxNumberOfConsumers_.get());
256
257             AdminLimit _limit = new AdminLimit("consumer limit", _any);
258
259             throw new AdminLimitExceeded("Consumer creation request exceeds AdminLimit.", _limit);
260         }
261     }
262
263     private void removeConsumer()
264     {
265         numberOfConsumers_.decrement();
266     }
267
268     /**
269      * Callback to keep track of the number of Suppliers
270      *
271      * @exception AdminLimitExceeded
272      * if creation of another Suppliers is prohibited
273      */

274     private void addSupplier() throws AdminLimitExceeded
275     {
276         if ((maxNumberOfSuppliers_.get() == 0)
277                 || (numberOfSuppliers_.compareTo(maxNumberOfSuppliers_) < 0))
278         {
279             numberOfSuppliers_.increment();
280         }
281         else
282         {
283             Any JavaDoc _any = orb_.create_any();
284             _any.insert_long(maxNumberOfSuppliers_.get());
285
286             AdminLimit _limit = new AdminLimit("suppliers limit", _any);
287
288             throw new AdminLimitExceeded("supplier creation request exceeds AdminLimit.", _limit);
289         }
290     }
291
292     private void removeSupplier()
293     {
294         numberOfSuppliers_.decrement();
295     }
296
297     public final int getAdminID()
298     {
299         if (duringConstruction_)
300         {
301             return 0;
302         }
303         return adminIdPool_.increment();
304     }
305
306     protected final boolean isDefaultConsumerAdminActive()
307     {
308         synchronized (modifyConsumerAdminsLock_)
309         {
310             return consumerAdminServants_.containsKey(DEFAULT_ADMIN_KEY);
311         }
312     }
313
314     protected final boolean isDefaultSupplierAdminActive()
315     {
316         synchronized (modifySupplierAdminsLock_)
317         {
318             return supplierAdminServants_.containsKey(DEFAULT_ADMIN_KEY);
319         }
320     }
321
322     /**
323      * The default_filter_factory attribute is a readonly attribute that maintains an object
324      * reference to the default factory to be used by the EventChannel instance with which it is
325      * associated for creating filter objects. If the target channel does not support a default
326      * filter factory, the attribute will maintain the value of OBJECT_NIL.
327      */

328     public final FilterFactory default_filter_factory()
329     {
330         return defaultFilterFactory_;
331     }
332
333     public final int[] get_all_consumeradmins()
334     {
335         int[] _allKeys;// }
336

337         synchronized (modifyConsumerAdminsLock_)
338         {
339             _allKeys = new int[consumerAdminServants_.size()]; // + _defaultConsumerAdmin];
340

341             Iterator JavaDoc i = consumerAdminServants_.keySet().iterator();
342             int x = 0;
343             while (i.hasNext())
344             {
345                 _allKeys[x++] = ((Integer JavaDoc) i.next()).intValue();
346             }
347         }
348
349         return _allKeys;
350     }
351
352     public final int[] get_all_supplieradmins()
353     {
354         int[] _allKeys;
355
356         synchronized (modifySupplierAdminsLock_)
357         {
358             _allKeys = new int[supplierAdminServants_.size()];
359
360             Iterator JavaDoc i = supplierAdminServants_.keySet().iterator();
361             int x = 0;
362             while (i.hasNext())
363             {
364                 _allKeys[x++] = ((Integer JavaDoc) i.next()).intValue();
365             }
366         }
367
368         return _allKeys;
369     }
370
371     public final Property[] get_admin()
372     {
373         return adminSettings_.toArray();
374     }
375
376     public final Property[] get_qos()
377     {
378         return qosSettings_.toArray();
379     }
380
381     public final void set_qos(Property[] props) throws UnsupportedQoS
382     {
383         qosSettings_.validate_qos(props, new NamedPropertyRangeSeqHolder());
384
385         qosSettings_.set_qos(props);
386     }
387
388     public final void validate_qos(Property[] props,
389             NamedPropertyRangeSeqHolder namedPropertySeqHolder) throws UnsupportedQoS
390     {
391         qosSettings_.validate_qos(props, namedPropertySeqHolder);
392     }
393
394     public final void set_admin(Property[] adminProps) throws UnsupportedAdmin
395     {
396         adminSettings_.validate_admin(adminProps);
397
398         adminSettings_.set_admin(adminProps);
399
400         configureAdminLimits(adminSettings_);
401     }
402
403     private void configureAdminLimits(PropertySet adminProperties)
404     {
405         Any JavaDoc _maxConsumers = adminProperties.get(MaxConsumers.value);
406         maxNumberOfConsumers_.set(_maxConsumers.extract_long());
407
408         Any JavaDoc _maxSuppliers = adminProperties.get(MaxSuppliers.value);
409         maxNumberOfSuppliers_.set(_maxSuppliers.extract_long());
410
411         if (logger_.isInfoEnabled())
412         {
413             logger_.info("set MaxNumberOfConsumers=" + maxNumberOfConsumers_);
414             logger_.info("set MaxNumberOfSuppliers=" + maxNumberOfSuppliers_);
415         }
416     }
417
418     /**
419      * destroy this Channel, all created Admins and all Proxies.
420      */

421     public final void destroy()
422     {
423         if (destroyed_.commit(false, true))
424         {
425             container_.dispose();
426             
427             List JavaDoc list = container_.getComponentInstancesOfType(IContainer.class);
428             for (Iterator JavaDoc i = list.iterator(); i.hasNext();)
429             {
430                 IContainer element = (IContainer) i.next();
431                 element.destroy();
432             }
433         }
434         else
435         {
436             throw new OBJECT_NOT_EXIST JavaDoc();
437         }
438     }
439
440     public final void dispose()
441     {
442         logger_.info("destroy channel " + id_);
443
444         deactivate();
445
446         disposables_.dispose();
447     }
448
449     /**
450      * Override this method from the Servant baseclass. Fintan Bolton in his book "Pure CORBA"
451      * suggests that you override this method to avoid the risk that a servant object (like this
452      * one) could be activated by the <b>wrong </b> POA object.
453      */

454     public final POA JavaDoc _default_POA()
455     {
456         return poa_;
457     }
458
459     public boolean isPersistent()
460     {
461         return false;
462     }
463
464     /**
465      * get the number of clients connected to this event channel. the number is the total of all
466      * Suppliers and Consumers connected to this channel.
467      */

468     public final int getNumberOfConnectedClients()
469     {
470         return numberOfConsumers_.get() + numberOfSuppliers_.get();
471     }
472
473     public final int getMaxNumberOfSuppliers()
474     {
475         return maxNumberOfSuppliers_.get();
476     }
477
478     public final int getMaxNumberOfConsumers()
479     {
480         return maxNumberOfConsumers_.get();
481     }
482
483     public final void deactivate()
484     {
485         try
486         {
487             poa_.deactivate_object(poa_.servant_to_id(getServant()));
488         } catch (Exception JavaDoc e)
489         {
490             logger_.error("Unable to deactivate EventChannel Object", e);
491
492             throw new RuntimeException JavaDoc();
493         }
494     }
495
496     abstract protected Servant getServant();
497
498     private Property[] createQoSPropertiesForAdmin()
499     {
500         Map JavaDoc _copy = new HashMap JavaDoc(qosSettings_.toMap());
501
502         _copy.remove(EventReliability.value);
503
504         return PropertySet.map2Props(_copy);
505     }
506
507     protected AbstractAdmin get_consumeradmin_internal(int identifier) throws AdminNotFound
508     {
509         synchronized (modifyConsumerAdminsLock_)
510         {
511             Integer JavaDoc _key = new Integer JavaDoc(identifier);
512
513             if (consumerAdminServants_.containsKey(_key))
514             {
515                 AbstractAdmin _admin = (AbstractAdmin) consumerAdminServants_.get(_key);
516
517                 return _admin;
518             }
519
520             throw new AdminNotFound("ID " + identifier + " does not exist.");
521         }
522     }
523
524     protected AbstractAdmin get_supplieradmin_internal(int identifier) throws AdminNotFound
525     {
526         synchronized (modifySupplierAdminsLock_)
527         {
528             Integer JavaDoc _key = new Integer JavaDoc(identifier);
529
530             if (supplierAdminServants_.containsKey(_key))
531             {
532                 AbstractAdmin _admin = (AbstractAdmin) supplierAdminServants_.get(_key);
533
534                 return _admin;
535             }
536
537             throw new AdminNotFound("ID " + identifier + " does not exist.");
538         }
539     }
540
541     /**
542      * fetch the List of all ConsumerAdmins that are connected to this EventChannel.
543      */

544     List JavaDoc getAllConsumerAdmins()
545     {
546         return listManager_.getList();
547     }
548
549     protected AbstractAdmin getDefaultConsumerAdminServant()
550     {
551         AbstractAdmin _admin;
552
553         synchronized (modifyConsumerAdminsLock_)
554         {
555             _admin = (AbstractAdmin) consumerAdminServants_.get(DEFAULT_ADMIN_KEY);
556
557             if (_admin == null)
558             {
559                 _admin = newConsumerAdminServant(DEFAULT_ADMIN_KEY.intValue());
560
561                 try
562                 {
563                     _admin.set_qos(createQoSPropertiesForAdmin());
564                 } catch (UnsupportedQoS e)
565                 {
566                     logger_.fatalError("unable to set qos", e);
567                 }
568
569                 addToConsumerAdmins(_admin);
570             }
571         }
572
573         return _admin;
574     }
575
576     private void addToConsumerAdmins(AbstractAdmin admin)
577     {
578         final Integer JavaDoc _key = admin.getID();
579
580         admin.addDisposeHook(new Disposable()
581         {
582             public void dispose()
583             {
584                 synchronized (modifyConsumerAdminsLock_)
585                 {
586                     consumerAdminServants_.remove(_key);
587                     listManager_.actionSourceModified();
588                 }
589             }
590         });
591
592         synchronized (modifyConsumerAdminsLock_)
593         {
594             consumerAdminServants_.put(_key, admin);
595
596             listManager_.actionSourceModified();
597         }
598     }
599
600     protected AbstractAdmin new_for_consumers_servant(InterFilterGroupOperator filterGroupOperator,
601             IntHolder JavaDoc intHolder)
602     {
603         AbstractAdmin _admin = newConsumerAdminServant(createAdminID());
604
605         _admin.setInterFilterGroupOperator(filterGroupOperator);
606
607         intHolder.value = _admin.getID().intValue();
608
609         try
610         {
611             _admin.set_qos(createQoSPropertiesForAdmin());
612         } catch (UnsupportedQoS e)
613         {
614             logger_.error("unable to set QoS", e);
615         }
616
617         _admin.addProxyEventListener(proxySupplierEventListener_);
618
619         addToConsumerAdmins(_admin);
620
621         return _admin;
622     }
623
624     private int createAdminID()
625     {
626         return adminIdPool_.increment();
627     }
628
629     private void addToSupplierAdmins(AbstractAdmin admin)
630     {
631         final Integer JavaDoc _key = admin.getID();
632
633         admin.addDisposeHook(new Disposable()
634         {
635             public void dispose()
636             {
637                 synchronized (modifySupplierAdminsLock_)
638                 {
639                     supplierAdminServants_.remove(_key);
640                 }
641             }
642         });
643
644         synchronized (modifySupplierAdminsLock_)
645         {
646             supplierAdminServants_.put(_key, admin);
647         }
648     }
649
650     protected AbstractAdmin new_for_suppliers_servant(InterFilterGroupOperator filterGroupOperator,
651             IntHolder JavaDoc intHolder)
652     {
653         AbstractAdmin _admin = newSupplierAdminServant(createAdminID());
654
655         intHolder.value = _admin.getID().intValue();
656
657         _admin.setInterFilterGroupOperator(filterGroupOperator);
658
659         try
660         {
661             _admin.set_qos(createQoSPropertiesForAdmin());
662         } catch (UnsupportedQoS e)
663         {
664             logger_.fatalError("error setting qos", e);
665         }
666
667         _admin.addProxyEventListener(proxyConsumerEventListener_);
668
669         addToSupplierAdmins(_admin);
670
671         return _admin;
672     }
673
674     protected AbstractAdmin getDefaultSupplierAdminServant()
675     {
676         AbstractAdmin _admin;
677
678         synchronized (modifySupplierAdminsLock_)
679         {
680             _admin = (AbstractAdmin) supplierAdminServants_.get(DEFAULT_ADMIN_KEY);
681
682             if (_admin == null)
683             {
684                 _admin = newSupplierAdminServant(DEFAULT_ADMIN_KEY.intValue());
685
686                 try
687                 {
688                     _admin.set_qos(createQoSPropertiesForAdmin());
689                 } catch (UnsupportedQoS e)
690                 {
691                     logger_.fatalError("unable to set qos", e);
692                 }
693
694                 addToSupplierAdmins(_admin);
695             }
696         }
697
698         return _admin;
699     }
700
701     ////////////////////////////////////////
702

703     private AbstractAdmin newConsumerAdminServant(int id)
704     {
705         AbstractAdmin _admin = newConsumerAdmin(id);
706
707         return _admin;
708     }
709
710     protected abstract AbstractAdmin newConsumerAdmin(int id);
711
712     ////////////////////////////////////////
713

714     private static class FilterStageSourceAdapter implements FilterStageSource
715     {
716         final WeakReference JavaDoc channelRef_;
717
718         FilterStageSourceAdapter(AbstractEventChannel channel)
719         {
720             channelRef_ = new WeakReference JavaDoc(channel);
721         }
722
723         public List JavaDoc getSubsequentFilterStages()
724         {
725             return ((AbstractEventChannel) channelRef_.get()).getAllConsumerAdmins();
726         }
727     }
728
729     private AbstractAdmin newSupplierAdminServant(int id)
730     {
731         AbstractSupplierAdmin _admin = newSupplierAdmin(id);
732
733         _admin.setSubsequentFilterStageSource(new FilterStageSourceAdapter(this));
734
735         return _admin;
736     }
737
738     protected abstract AbstractSupplierAdmin newSupplierAdmin(int id);
739
740     public int getID()
741     {
742         return id_;
743     }
744
745     public final void addDisposeHook(Disposable d)
746     {
747         disposables_.addDisposable(d);
748     }
749 }
750
751
Popular Tags