KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > notification > servant > AbstractProxyConsumer


1 package org.jacorb.notification.servant;
2
3 /*
4  * JacORB - a free Java ORB
5  *
6  * Copyright (C) 1997-2004 Gerald Brose.
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the Free
20  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */

22
23 import java.util.List JavaDoc;
24
25 import org.apache.avalon.framework.configuration.Configuration;
26 import org.jacorb.notification.EventTypeWrapper;
27 import org.jacorb.notification.MessageFactory;
28 import org.jacorb.notification.OfferManager;
29 import org.jacorb.notification.SubscriptionManager;
30 import org.jacorb.notification.conf.Default;
31 import org.jacorb.notification.engine.TaskProcessor;
32 import org.jacorb.notification.interfaces.FilterStage;
33 import org.jacorb.notification.interfaces.Message;
34 import org.jacorb.notification.interfaces.MessageConsumer;
35 import org.jacorb.notification.interfaces.MessageSupplier;
36 import org.jacorb.notification.util.PropertySet;
37 import org.jacorb.notification.util.PropertySetAdapter;
38 import org.omg.CORBA.NO_IMPLEMENT JavaDoc;
39 import org.omg.CORBA.ORB JavaDoc;
40 import org.omg.CosNotification.EventType;
41 import org.omg.CosNotification.Priority;
42 import org.omg.CosNotification.StartTimeSupported;
43 import org.omg.CosNotification.StopTimeSupported;
44 import org.omg.CosNotification.Timeout;
45 import org.omg.CosNotifyChannelAdmin.ObtainInfoMode;
46 import org.omg.CosNotifyChannelAdmin.SupplierAdmin;
47 import org.omg.CosNotifyComm.InvalidEventType;
48 import org.omg.CosNotifyComm.NotifyPublishOperations;
49 import org.omg.CosNotifyComm.NotifySubscribe;
50 import org.omg.CosNotifyComm.NotifySubscribeHelper;
51 import org.omg.CosNotifyComm.NotifySubscribeOperations;
52 import org.omg.PortableServer.POA JavaDoc;
53
54 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
55
56 /**
57  * @author Alphonse Bendt
58  * @version $Id: AbstractProxyConsumer.java,v 1.14 2005/04/27 10:45:46 alphonse.bendt Exp $
59  */

60
61 abstract class AbstractProxyConsumer extends AbstractProxy implements AbstractProxyConsumerI,
62         NotifyPublishOperations
63 {
64     private final static EventType[] EMPTY_EVENT_TYPE_ARRAY = new EventType[0];
65
66     // //////////////////////////////////////
67

68     private final MessageFactory messageFactory_;
69
70     private final SynchronizedBoolean isStartTimeSupported_ = new SynchronizedBoolean(true);
71
72     private final SynchronizedBoolean isStopTimeSupported_ = new SynchronizedBoolean(true);
73
74     private List JavaDoc subsequentDestinations_;
75
76     private NotifySubscribeOperations proxySubscriptionListener_;
77
78     private NotifySubscribe subscriptionListener_;
79
80     protected final SupplierAdmin supplierAdmin_;
81
82     // //////////////////////////////////////
83

84     protected AbstractProxyConsumer(IAdmin admin, ORB JavaDoc orb, POA JavaDoc poa, Configuration conf,
85             TaskProcessor taskProcessor, MessageFactory messageFactory,
86             SupplierAdmin supplierAdmin, OfferManager offerManager,
87             SubscriptionManager subscriptionManager)
88     {
89         super(admin, orb, poa, conf, taskProcessor, offerManager, subscriptionManager);
90
91         supplierAdmin_ = supplierAdmin;
92         messageFactory_ = messageFactory;
93
94         configureStartTimeSupported();
95
96         configureStopTimeSupported();
97
98         qosSettings_.addPropertySetListener(new String JavaDoc[] { Priority.value, Timeout.value,
99                 StartTimeSupported.value, StopTimeSupported.value }, reconfigureQoS_);
100     }
101
102     protected MessageFactory getMessageFactory()
103     {
104         return messageFactory_;
105     }
106
107     public final List JavaDoc getSubsequentFilterStages()
108     {
109         return subsequentDestinations_;
110     }
111
112     public void setSubsequentDestinations(List JavaDoc list)
113     {
114         subsequentDestinations_ = list;
115     }
116
117     private PropertySetAdapter reconfigureQoS_ = new PropertySetAdapter()
118     {
119         public void actionPropertySetChanged(PropertySet source)
120         {
121             configureStartTimeSupported();
122
123             configureStopTimeSupported();
124         }
125     };
126
127     private void configureStartTimeSupported()
128     {
129         try
130         {
131             isStartTimeSupported_.set(qosSettings_.get(StartTimeSupported.value).extract_boolean());
132         } catch (Exception JavaDoc e)
133         {
134             isStartTimeSupported_.set(Default.DEFAULT_START_TIME_SUPPORTED.equals("on"));
135         }
136
137         if (logger_.isInfoEnabled())
138         {
139             logger_.info("set QoS: StartTimeSupported=" + isStartTimeSupported_);
140         }
141     }
142
143     private void configureStopTimeSupported()
144     {
145         try
146         {
147             isStopTimeSupported_.set(qosSettings_.get(StopTimeSupported.value).extract_boolean());
148         } catch (Exception JavaDoc e)
149         {
150             isStopTimeSupported_.set(Default.DEFAULT_STOP_TIME_SUPPORTED.equals("on"));
151         }
152
153         if (logger_.isInfoEnabled())
154         {
155             logger_.info("set QoS: StopTimeSupported=" + isStopTimeSupported_);
156         }
157     }
158
159     protected void schedulePullTask(MessageSupplier target)
160     {
161         try
162         {
163             getTaskProcessor().scheduleTimedPullTask(target);
164         } catch (InterruptedException JavaDoc e)
165         {
166             logger_.info("interrupt during schedule pull for MessageSupplier", e);
167         }
168     }
169
170     /**
171      * check if a Message is acceptable to the QoS Settings of this ProxyConsumer
172      */

173     protected void checkMessageProperties(Message m)
174     {
175         // No Op
176
// TODO fixme
177
}
178
179     public FilterStage getFirstStage()
180     {
181         return this;
182     }
183
184     public boolean isTimeOutSupported()
185     {
186         return isStopTimeSupported_.get();
187     }
188
189     public boolean isStartTimeSupported()
190     {
191         return isStartTimeSupported_.get();
192     }
193
194     public final SupplierAdmin MyAdmin()
195     {
196         return supplierAdmin_;
197     }
198
199     public final MessageConsumer getMessageConsumer()
200     {
201         throw new UnsupportedOperationException JavaDoc();
202     }
203
204     public final boolean hasMessageConsumer()
205     {
206         return false;
207     }
208
209     public void offer_change(EventType[] added, EventType[] removed) throws InvalidEventType
210     {
211         offerManager_.offer_change(added, removed);
212     }
213
214     public final EventType[] obtain_subscription_types(ObtainInfoMode obtainInfoMode)
215     {
216         final EventType[] _subscriptionTypes;
217
218         switch (obtainInfoMode.value()) {
219         case ObtainInfoMode._ALL_NOW_UPDATES_ON:
220             // attach the listener first, then return the current
221
// subscription types. order is important so that no
222
// updates are lost.
223

224             registerListener();
225
226             _subscriptionTypes = subscriptionManager_.obtain_subscription_types();
227             break;
228         case ObtainInfoMode._ALL_NOW_UPDATES_OFF:
229             _subscriptionTypes = subscriptionManager_.obtain_subscription_types();
230
231             removeListener();
232             break;
233         case ObtainInfoMode._NONE_NOW_UPDATES_ON:
234             _subscriptionTypes = EMPTY_EVENT_TYPE_ARRAY;
235
236             registerListener();
237             break;
238         case ObtainInfoMode._NONE_NOW_UPDATES_OFF:
239             _subscriptionTypes = EMPTY_EVENT_TYPE_ARRAY;
240
241             removeListener();
242             break;
243         default:
244             throw new IllegalArgumentException JavaDoc("Illegal ObtainInfoMode: ObtainInfoMode."
245                     + obtainInfoMode.value());
246         }
247
248         return _subscriptionTypes;
249     }
250
251     private void registerListener()
252     {
253         if (proxySubscriptionListener_ == null)
254         {
255             final NotifySubscribeOperations _listener = getSubscriptionListener();
256
257             if (_listener != null)
258             {
259                 proxySubscriptionListener_ = new NotifySubscribeOperations()
260                 {
261                     public void subscription_change(EventType[] added, EventType[] removed)
262                     {
263                         try
264                         {
265                             _listener.subscription_change(added, removed);
266                         } catch (NO_IMPLEMENT JavaDoc e)
267                         {
268                             logger_.info("disable subscription_change for Supplier", e);
269
270                             removeListener();
271                         } catch (InvalidEventType e)
272                         {
273                             if (logger_.isDebugEnabled())
274                             {
275                                 logger_.debug("subscription_change("
276                                         + EventTypeWrapper.toString(added) + ", "
277                                         + EventTypeWrapper.toString(removed) + ") failed", e);
278                             }
279                             else
280                             {
281                                 logger_.error("invalid event type", e);
282                             }
283                         } catch (Exception JavaDoc e)
284                         {
285                             logger_.error("subscription change failed", e);
286                         }
287                     }
288                 };
289                 subscriptionManager_.addListener(proxySubscriptionListener_);
290             }
291         }
292     }
293
294     /**
295      * removes the listener. subscription_change will no more be issued to the connected Supplier
296      */

297     protected void removeListener()
298     {
299         if (proxySubscriptionListener_ != null)
300         {
301             subscriptionManager_.removeListener(proxySubscriptionListener_);
302
303             proxySubscriptionListener_ = null;
304         }
305     }
306
307     protected void connectClient(org.omg.CORBA.Object JavaDoc client)
308     {
309         super.connectClient(client);
310
311         try
312         {
313             subscriptionListener_ = NotifySubscribeHelper.narrow(client);
314
315             logger_.debug("successfully narrowed connecting Supplier to NotifySubscribe");
316         } catch (Throwable JavaDoc t)
317         {
318             logger_.info("connecting Supplier does not support subscription_change");
319         }
320     }
321
322     final NotifySubscribeOperations getSubscriptionListener()
323     {
324         return subscriptionListener_;
325     }
326     
327     protected void processMessage(Message mesg)
328     {
329         getTaskProcessor().processMessage(mesg);
330     }
331 }
Popular Tags