KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > notification > servant > SequenceProxyPushSupplierImpl


1 package org.jacorb.notification.servant;
2
3 /*
4  * JacORB - a free Java ORB
5  *
6  * Copyright (C) 1999-2004 Gerald Brose
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the Free
20  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */

23
24 import org.apache.avalon.framework.configuration.Configuration;
25 import org.apache.avalon.framework.configuration.ConfigurationException;
26 import org.jacorb.notification.OfferManager;
27 import org.jacorb.notification.SubscriptionManager;
28 import org.jacorb.notification.engine.PushOperation;
29 import org.jacorb.notification.engine.PushTaskExecutorFactory;
30 import org.jacorb.notification.engine.TaskProcessor;
31 import org.jacorb.notification.interfaces.Message;
32 import org.jacorb.notification.util.PropertySet;
33 import org.jacorb.notification.util.PropertySetAdapter;
34 import org.omg.CORBA.ORB JavaDoc;
35 import org.omg.CosEventChannelAdmin.AlreadyConnected;
36 import org.omg.CosEventChannelAdmin.TypeError;
37 import org.omg.CosEventComm.Disconnected;
38 import org.omg.CosNotification.MaximumBatchSize;
39 import org.omg.CosNotification.PacingInterval;
40 import org.omg.CosNotification.StructuredEvent;
41 import org.omg.CosNotification.UnsupportedQoS;
42 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
43 import org.omg.CosNotifyChannelAdmin.ProxyType;
44 import org.omg.CosNotifyChannelAdmin.SequenceProxyPushSupplierOperations;
45 import org.omg.CosNotifyChannelAdmin.SequenceProxyPushSupplierPOATie;
46 import org.omg.CosNotifyComm.SequencePushConsumer;
47 import org.omg.PortableServer.POA JavaDoc;
48 import org.omg.PortableServer.Servant JavaDoc;
49 import org.omg.TimeBase.TimeTHelper;
50
51 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
52 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
53
54 /**
55  * @author Alphonse Bendt
56  * @version $Id: SequenceProxyPushSupplierImpl.java,v 1.17 2005/04/27 10:45:46 alphonse.bendt Exp $
57  */

58
59 public class SequenceProxyPushSupplierImpl extends StructuredProxyPushSupplierImpl implements
60         SequenceProxyPushSupplierOperations
61 {
62     private class PushSequenceOperation implements PushOperation
63     {
64         private final StructuredEvent[] structuredEvents_;
65
66         public PushSequenceOperation(StructuredEvent[] structuredEvents)
67         {
68             structuredEvents_ = structuredEvents;
69         }
70
71         public void invokePush() throws Disconnected
72         {
73             deliverPendingMessagesInternal(structuredEvents_);
74         }
75
76         public void dispose()
77         {
78             // nothing to do
79
}
80     }
81
82     public SequenceProxyPushSupplierImpl(IAdmin admin, ORB JavaDoc orb, POA JavaDoc poa, Configuration config,
83             TaskProcessor taskProcessor, PushTaskExecutorFactory pushTaskExecutorFactory, OfferManager offerManager,
84             SubscriptionManager subscriptionManager, ConsumerAdmin consumerAdmin)
85             throws ConfigurationException
86     {
87         super(admin, orb, poa, config, taskProcessor, pushTaskExecutorFactory, offerManager,
88                 subscriptionManager, consumerAdmin);
89
90         configureMaxBatchSize();
91
92         configurePacingInterval();
93
94         schedulePushOperation_ = new Runnable JavaDoc()
95         {
96             public void run()
97             {
98                 schedulePush();
99             }
100         };
101         
102         qosSettings_.addPropertySetListener(MaximumBatchSize.value, new PropertySetAdapter()
103         {
104             public void actionPropertySetChanged(PropertySet source) throws UnsupportedQoS
105             {
106                 configureMaxBatchSize();
107             }
108         });
109
110         qosSettings_.addPropertySetListener(PacingInterval.value, new PropertySetAdapter()
111         {
112             public void actionPropertySetChanged(PropertySet source) throws UnsupportedQoS
113             {
114                 configurePacingInterval();
115             }
116         });
117     }
118
119     private final Runnable JavaDoc schedulePushOperation_;
120     
121     /**
122      * The connected SequencePushConsumer.
123      */

124     private SequencePushConsumer sequencePushConsumer_;
125
126     /**
127      * registration for the Scheduled DeliverTask.
128      */

129     private Object JavaDoc taskId_;
130
131     /**
132      * maximum queue size before a delivery is forced.
133      */

134     private final SynchronizedInt maxBatchSize_ = new SynchronizedInt(1);
135
136     /**
137      * how long to wait between two scheduled deliveries.
138      */

139     private final SynchronizedLong pacingInterval_ = new SynchronizedLong(0);
140
141     private long timeSpent_ = 0;
142
143     /**
144      * this callback is called by the TimerDaemon. Check if there are pending Events and deliver
145      * them to the Consumer. As there's only one TimerDaemon its important to block the daemon only
146      * a minimal amount of time. Therefor the Callback does not do the actual delivery. Instead a
147      * DeliverTask is scheduled for this Supplier.
148      */

149     // private Runnable timerCallback_;
150
// //////////////////////////////////////
151
public ProxyType MyType()
152     {
153         return ProxyType.PUSH_SEQUENCE;
154     }
155
156     /**
157      * overrides the superclass version.
158      */

159     public void pushPendingData()
160     {
161         deliverPendingMessages(false);
162     }
163
164     private void deliverPendingMessages(boolean flush)
165     {
166         final Message[] _messages;
167
168         if (flush)
169         {
170             _messages = getAllMessages();
171         }
172         else
173         {
174             _messages = getAtLeastMessages(maxBatchSize_.get());
175         }
176
177         if (_messages != null && _messages.length > 0)
178         {
179             final StructuredEvent[] _structuredEvents = new StructuredEvent[_messages.length];
180
181             for (int x = 0; x < _messages.length; ++x)
182             {
183                 _structuredEvents[x] = _messages[x].toStructuredEvent();
184
185                 _messages[x].dispose();
186             }
187
188             try
189             {
190                 deliverPendingMessagesInternal(_structuredEvents);
191             } catch (Throwable JavaDoc e)
192             {
193                 PushSequenceOperation _failedOperation = new PushSequenceOperation(
194                         _structuredEvents);
195
196                 handleFailedPushOperation(_failedOperation, e);
197             }
198         }
199     }
200
201     void deliverPendingMessagesInternal(final StructuredEvent[] structuredEvents)
202             throws Disconnected
203     {
204         long now = System.currentTimeMillis();
205         sequencePushConsumer_.push_structured_events(structuredEvents);
206         timeSpent_ += (System.currentTimeMillis() - now);
207         resetErrorCounter();
208     }
209
210     public void connect_sequence_push_consumer(SequencePushConsumer consumer)
211             throws AlreadyConnected, TypeError
212     {
213         logger_.debug("connect_sequence_push_consumer");
214
215         checkIsNotConnected();
216
217         sequencePushConsumer_ = consumer;
218
219         connectClient(consumer);
220
221         startCronJob();
222     }
223
224     protected void connectionResumed()
225     {
226         schedulePush();
227         
228         startCronJob();
229     }
230
231     protected void connectionSuspended()
232     {
233         stopCronJob();
234     }
235
236     public void disconnect_sequence_push_supplier()
237     {
238         destroy();
239     }
240
241     protected void disconnectClient()
242     {
243         stopCronJob();
244
245         sequencePushConsumer_.disconnect_sequence_push_consumer();
246         sequencePushConsumer_ = null;
247     }
248
249     private void startCronJob()
250     {
251         if (pacingInterval_.get() > 0 && taskId_ != null)
252         {
253             taskId_ = getTaskProcessor().executeTaskPeriodically(pacingInterval_.get(),
254                     schedulePushOperation_, true);
255         }
256     }
257
258     synchronized private void stopCronJob()
259     {
260         if (taskId_ != null)
261         {
262             getTaskProcessor().cancelTask(taskId_);
263             taskId_ = null;
264         }
265     }
266
267     private void checkCronJob()
268     {
269         if (pacingInterval_.get() > 0)
270         {
271             startCronJob();
272         }
273         else
274         {
275             stopCronJob();
276         }
277     }
278
279     private boolean configurePacingInterval()
280     {
281         if (qosSettings_.containsKey(PacingInterval.value))
282         {
283             long _pacingInterval = TimeTHelper.extract(qosSettings_.get(PacingInterval.value));
284
285             if (pacingInterval_.get() != _pacingInterval)
286             {
287                 if (logger_.isInfoEnabled())
288                 {
289                     logger_.info("set PacingInterval=" + _pacingInterval);
290                 }
291                 pacingInterval_.set(_pacingInterval);
292
293                 checkCronJob();
294
295                 return true;
296             }
297         }
298         return false;
299     }
300
301     private boolean configureMaxBatchSize()
302     {
303         if (qosSettings_.containsKey(MaximumBatchSize.value))
304         {
305             int _maxBatchSize = qosSettings_.get(MaximumBatchSize.value).extract_long();
306
307             if (maxBatchSize_.get() != _maxBatchSize)
308             {
309                 if (logger_.isInfoEnabled())
310                 {
311                     logger_.info("set MaxBatchSize=" + _maxBatchSize);
312                 }
313
314                 maxBatchSize_.set(_maxBatchSize);
315
316                 return true;
317             }
318         }
319
320         return false;
321     }
322
323     public synchronized Servant JavaDoc getServant()
324     {
325         if (thisServant_ == null)
326         {
327             thisServant_ = new SequenceProxyPushSupplierPOATie(this);
328         }
329
330         return thisServant_;
331     }
332
333     protected long getCost()
334     {
335         return timeSpent_;
336     }
337 }
Popular Tags