KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > notification > servant > AbstractProxySupplier


1 package org.jacorb.notification.servant;
2
3 /*
4  * JacORB - a free Java ORB
5  *
6  * Copyright (C) 1997-2004 Gerald Brose.
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the Free
20  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */

22
23 import org.apache.avalon.framework.configuration.Configuration;
24 import org.apache.avalon.framework.configuration.ConfigurationException;
25 import org.jacorb.notification.OfferManager;
26 import org.jacorb.notification.SubscriptionManager;
27 import org.jacorb.notification.conf.Attributes;
28 import org.jacorb.notification.conf.Default;
29 import org.jacorb.notification.engine.TaskProcessor;
30 import org.jacorb.notification.interfaces.Message;
31 import org.jacorb.notification.interfaces.MessageConsumer;
32 import org.jacorb.notification.queue.EventQueueFactory;
33 import org.jacorb.notification.queue.MessageQueueAdapter;
34 import org.jacorb.notification.queue.RWLockEventQueueDecorator;
35 import org.jacorb.notification.util.PropertySet;
36 import org.jacorb.notification.util.PropertySetAdapter;
37 import org.omg.CORBA.NO_IMPLEMENT JavaDoc;
38 import org.omg.CORBA.ORB JavaDoc;
39 import org.omg.CosNotification.DiscardPolicy;
40 import org.omg.CosNotification.EventType;
41 import org.omg.CosNotification.OrderPolicy;
42 import org.omg.CosNotification.UnsupportedQoS;
43 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
44 import org.omg.CosNotifyChannelAdmin.ObtainInfoMode;
45 import org.omg.CosNotifyComm.InvalidEventType;
46 import org.omg.CosNotifyComm.NotifyPublish;
47 import org.omg.CosNotifyComm.NotifyPublishHelper;
48 import org.omg.CosNotifyComm.NotifyPublishOperations;
49 import org.omg.CosNotifyComm.NotifySubscribeOperations;
50 import org.omg.PortableServer.POA JavaDoc;
51
52 /**
53  * Abstract base class for ProxySuppliers. This class provides following logic for the different
54  * ProxySuppliers:
55  * <ul>
56  * <li>queue management,
57  * <li>error threshold settings.
58  * </ul>
59  *
60  * @author Alphonse Bendt
61  * @version $Id: AbstractProxySupplier.java,v 1.21 2005/04/27 10:45:46 alphonse.bendt Exp $
62  */

63
64 public abstract class AbstractProxySupplier extends AbstractProxy implements MessageConsumer,
65         NotifySubscribeOperations
66 {
67     private static final Runnable JavaDoc EMPTY_RUNNABLE = new Runnable JavaDoc()
68     {
69         public void run()
70         {
71             // no operation
72
}
73     };
74
75     private static final EventType[] EMPTY_EVENT_TYPE_ARRAY = new EventType[0];
76
77     private static final Message[] EMPTY_MESSAGE = new Message[0];
78
79     // //////////////////////////////////////
80

81     private final RWLockEventQueueDecorator pendingMessages_;
82
83     private final int errorThreshold_;
84
85     private final ConsumerAdmin consumerAdmin_;
86
87     private final EventQueueFactory eventQueueFactory_;
88
89     private NotifyPublishOperations proxyOfferListener_;
90
91     private NotifyPublish offerListener_;
92
93     // //////////////////////////////////////
94

95     protected AbstractProxySupplier(IAdmin admin, ORB JavaDoc orb, POA JavaDoc poa, Configuration conf,
96             TaskProcessor taskProcessor, OfferManager offerManager, SubscriptionManager subscriptionManager,
97             ConsumerAdmin consumerAdmin)
98             throws ConfigurationException
99     {
100         super(admin, orb, poa, conf, taskProcessor, offerManager, subscriptionManager);
101         
102         consumerAdmin_ = consumerAdmin;
103
104         eventQueueFactory_ = new EventQueueFactory(conf);
105
106         errorThreshold_ = conf.getAttributeAsInteger(Attributes.EVENTCONSUMER_ERROR_THRESHOLD,
107                 Default.DEFAULT_EVENTCONSUMER_ERROR_THRESHOLD);
108
109         if (logger_.isInfoEnabled())
110         {
111             logger_.info("set Error Threshold to : " + errorThreshold_);
112         }
113
114         qosSettings_.addPropertySetListener(
115                 new String JavaDoc[] { OrderPolicy.value, DiscardPolicy.value },
116                 eventQueueConfigurationChangedCB);
117
118         try
119         {
120             MessageQueueAdapter initialEventQueue = getMessageQueueFactory().newMessageQueue(
121                     qosSettings_);
122
123             pendingMessages_ = new RWLockEventQueueDecorator(initialEventQueue);
124         } catch (InterruptedException JavaDoc e)
125         {
126             throw new RuntimeException JavaDoc();
127         }
128     }
129
130     // //////////////////////////////////////
131

132     protected EventQueueFactory getMessageQueueFactory()
133     {
134         return eventQueueFactory_;
135     }
136
137     /**
138      * configure pending messages queue. the queue is reconfigured according to the current QoS
139      * Settings. the contents of the queue are reorganized according to the new OrderPolicy.
140      */

141     private final void configureEventQueue() // throws UnsupportedQoS
142
{
143         MessageQueueAdapter _newQueue = getMessageQueueFactory().newMessageQueue(qosSettings_);
144
145         try
146         {
147             pendingMessages_.replaceDelegate(_newQueue);
148         } catch (InterruptedException JavaDoc e)
149         {
150             // ignored
151
}
152     }
153
154     private PropertySetAdapter eventQueueConfigurationChangedCB = new PropertySetAdapter()
155     {
156         public void actionPropertySetChanged(PropertySet source) throws UnsupportedQoS
157         {
158             configureEventQueue();
159         }
160     };
161
162     
163
164     public int getPendingMessagesCount()
165     {
166         try
167         {
168             return pendingMessages_.getPendingMessagesCount();
169         } catch (InterruptedException JavaDoc e)
170         {
171             return -1;
172         }
173     }
174
175     public boolean hasPendingData()
176     {
177         try
178         {
179             return pendingMessages_.hasPendingMessages();
180         } catch (InterruptedException JavaDoc e)
181         {
182             return false;
183         }
184     }
185
186     /**
187      * put a copy of the Message in the queue of pending Messages.
188      *
189      * @param message
190      * the <code>Message</code> to queue.
191      */

192     protected void enqueue(Message message)
193     {
194         Message _copy = (Message) message.clone();
195
196         try
197         {
198             pendingMessages_.enqeue(message);
199
200             if (logger_.isDebugEnabled())
201             {
202                 logger_.debug("added " + message + " to pending Messages.");
203             }
204         } catch (InterruptedException JavaDoc e)
205         {
206             _copy.dispose();
207             logger_.info("enqueue was interrupted", e);
208         }
209     }
210
211     public Message getMessageBlocking() throws InterruptedException JavaDoc
212     {
213         return pendingMessages_.getMessageBlocking();
214     }
215
216     protected Message getMessageNoBlock()
217     {
218         try
219         {
220             return pendingMessages_.getMessageNoBlock();
221         } catch (InterruptedException JavaDoc e)
222         {
223             Thread.currentThread().interrupt();
224
225             return null;
226         }
227     }
228
229     protected Message[] getAllMessages()
230     {
231         try
232         {
233             return pendingMessages_.getAllMessages();
234         } catch (InterruptedException JavaDoc e)
235         {
236             Thread.currentThread().interrupt();
237
238             return EMPTY_MESSAGE;
239         }
240     }
241
242     public void deliverMessage(final Message message)
243     {
244         if (logger_.isDebugEnabled())
245         {
246             logger_.debug("deliverMessage() connected=" + isConnected() + " suspended="
247                     + isSuspended());
248         }
249
250         if (isConnected())
251         {
252             enqueue(message);
253
254             messageDelivered();
255         }
256     }
257
258     /**
259      * this is an extension point.
260      */

261     protected void messageDelivered()
262     {
263         // no operation
264
}
265
266     /**
267      * @param max
268      * maximum number of messages
269      * @return an array containing at most max Messages
270      */

271     protected Message[] getUpToMessages(int max)
272     {
273         try
274         {
275             return pendingMessages_.getUpToMessages(max);
276         } catch (InterruptedException JavaDoc e)
277         {
278             Thread.currentThread().interrupt();
279
280             return EMPTY_MESSAGE;
281         }
282     }
283
284     /**
285      * @param min
286      * minimum number of messages
287      * @return an array containing the requested number of Messages or null
288      */

289     protected Message[] getAtLeastMessages(int min)
290     {
291         try
292         {
293             return pendingMessages_.getAtLeastMessages(min);
294         } catch (InterruptedException JavaDoc e)
295         {
296             Thread.currentThread().interrupt();
297
298             return EMPTY_MESSAGE;
299         }
300     }
301
302     public int getErrorThreshold()
303     {
304         return errorThreshold_;
305     }
306
307     public final void dispose()
308     {
309         super.dispose();
310
311         pendingMessages_.clear();
312
313         // insert an empty command into the taskProcessor's queue.
314
// otherwise queue seems to contain old entries that prevent GC'ing
315
getTaskProcessor().executeTaskAfterDelay(1000, EMPTY_RUNNABLE);
316     }
317
318     public final ConsumerAdmin MyAdmin()
319     {
320         return consumerAdmin_;
321     }
322
323     public final void subscription_change(EventType[] added, EventType[] removed)
324             throws InvalidEventType
325     {
326         subscriptionManager_.subscription_change(added, removed);
327     }
328
329     public final EventType[] obtain_offered_types(ObtainInfoMode obtainInfoMode)
330     {
331         EventType[] _offeredTypes = EMPTY_EVENT_TYPE_ARRAY;
332
333         switch (obtainInfoMode.value()) {
334         case ObtainInfoMode._ALL_NOW_UPDATES_ON:
335             registerListener();
336             _offeredTypes = offerManager_.obtain_offered_types();
337             break;
338         case ObtainInfoMode._ALL_NOW_UPDATES_OFF:
339             _offeredTypes = offerManager_.obtain_offered_types();
340             removeListener();
341             break;
342         case ObtainInfoMode._NONE_NOW_UPDATES_ON:
343             registerListener();
344             break;
345         case ObtainInfoMode._NONE_NOW_UPDATES_OFF:
346             removeListener();
347             break;
348         default:
349             throw new IllegalArgumentException JavaDoc("Illegal ObtainInfoMode");
350         }
351
352         return _offeredTypes;
353     }
354
355     private void registerListener()
356     {
357         if (proxyOfferListener_ == null)
358         {
359             final NotifyPublishOperations _listener = getOfferListener();
360
361             if (_listener != null)
362             {
363                 proxyOfferListener_ = new NotifyPublishOperations()
364                 {
365                     public void offer_change(EventType[] added, EventType[] removed)
366                     {
367                         try
368                         {
369                             _listener.offer_change(added, removed);
370                         } catch (NO_IMPLEMENT JavaDoc e)
371                         {
372                             logger_.info("disable offer_change for connected Consumer.", e);
373
374                             removeListener();
375                         } catch (InvalidEventType e)
376                         {
377                             logger_.error("invalid event type", e);
378                         } catch (Exception JavaDoc e)
379                         {
380                             logger_.error("offer_change failed", e);
381                         }
382                     }
383                 };
384
385                 offerManager_.addListener(proxyOfferListener_);
386             }
387         }
388     }
389
390     protected void removeListener()
391     {
392         if (proxyOfferListener_ != null)
393         {
394             offerManager_.removeListener(proxyOfferListener_);
395             proxyOfferListener_ = null;
396         }
397     }
398
399     final NotifyPublishOperations getOfferListener()
400     {
401         return offerListener_;
402     }
403
404     public void connectClient(org.omg.CORBA.Object JavaDoc client)
405     {
406         super.connectClient(client);
407
408         try
409         {
410             offerListener_ = NotifyPublishHelper.narrow(client);
411
412             logger_.debug("successfully narrowed connecting Client to IF NotifyPublish");
413         } catch (Throwable JavaDoc t)
414         {
415             logger_.info("disable offer_change for connecting Consumer");
416         }
417     }
418
419     public boolean isRetryAllowed()
420     {
421         return !isDisposed() && getErrorCounter() < getErrorThreshold();
422     }
423
424     protected abstract long getCost();
425
426     public int compareTo(Object JavaDoc o)
427     {
428         AbstractProxySupplier other = (AbstractProxySupplier) o;
429
430         return (int) (getCost() - other.getCost());
431     }
432
433     public final boolean hasMessageConsumer()
434     {
435         return true;
436     }
437 }
Popular Tags