KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > notification > servant > ProxyPullConsumerImpl


1 package org.jacorb.notification.servant;
2
3 /*
4  * JacORB - a free Java ORB
5  *
6  * Copyright (C) 1997-2004 Gerald Brose.
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the Free
20  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */

22
23 import org.apache.avalon.framework.configuration.Configuration;
24 import org.jacorb.notification.MessageFactory;
25 import org.jacorb.notification.OfferManager;
26 import org.jacorb.notification.SubscriptionManager;
27 import org.jacorb.notification.conf.Attributes;
28 import org.jacorb.notification.conf.Default;
29 import org.jacorb.notification.engine.TaskProcessor;
30 import org.jacorb.notification.interfaces.Message;
31 import org.jacorb.notification.interfaces.MessageSupplier;
32 import org.omg.CORBA.Any JavaDoc;
33 import org.omg.CORBA.BooleanHolder JavaDoc;
34 import org.omg.CORBA.ORB JavaDoc;
35 import org.omg.CosEventChannelAdmin.AlreadyConnected;
36 import org.omg.CosEventComm.Disconnected;
37 import org.omg.CosEventComm.PullSupplier;
38 import org.omg.CosNotifyChannelAdmin.ProxyConsumerHelper;
39 import org.omg.CosNotifyChannelAdmin.ProxyPullConsumerOperations;
40 import org.omg.CosNotifyChannelAdmin.ProxyPullConsumerPOATie;
41 import org.omg.CosNotifyChannelAdmin.ProxyType;
42 import org.omg.PortableServer.POA JavaDoc;
43 import org.omg.PortableServer.Servant JavaDoc;
44
45 import EDU.oswego.cs.dl.util.concurrent.Semaphore;
46 import EDU.oswego.cs.dl.util.concurrent.Sync;
47
48 /**
49  * @author Alphonse Bendt
50  * @version $Id: ProxyPullConsumerImpl.java,v 1.12 2005/04/27 10:45:46 alphonse.bendt Exp $
51  */

52
53 public class ProxyPullConsumerImpl
54     extends AbstractProxyConsumer
55     implements ProxyPullConsumerOperations,
56                MessageSupplier
57 {
58     /**
59      * this sync is accessed during a pull operation. therby the
60      * maximal number of concurrent pull operations per pull supplier
61      * can be controlled.
62      */

63     private final Sync pullSync_ =
64         new Semaphore(Default.DEFAULT_CONCURRENT_PULL_OPERATIONS_ALLOWED);
65
66     /**
67      * the connected PullSupplier
68      */

69     private PullSupplier pullSupplier_;
70     private long pollInterval_;
71     private Object JavaDoc timerRegistration_;
72
73     /**
74      * Callback that is run by the Timer.
75      */

76     private final Runnable JavaDoc runQueueThis_;
77
78     //////////////////////////////
79
// Some Management Information
80

81     /**
82      * Total number of pull-Operations
83      */

84     private int pullCounter_;
85
86     /**
87      * Total time spent within pull-Operations
88      */

89     private long timeSpentInPull_;
90
91     /**
92      * Total number of successful pull-Operations
93      */

94     private int successfulPullCounter_;
95
96     ////////////////////////////////////////
97

98     public ProxyPullConsumerImpl(IAdmin admin, ORB JavaDoc orb, POA JavaDoc poa, Configuration conf, TaskProcessor taskProcessor, MessageFactory messageFactory, OfferManager offerManager, SubscriptionManager subscriptionManager)
99     {
100         super(admin, orb, poa, conf, taskProcessor, messageFactory, null, offerManager, subscriptionManager);
101
102         
103         pollInterval_ =
104             conf.getAttributeAsLong (Attributes.PULL_CONSUMER_POLL_INTERVAL,
105                                      Default.DEFAULT_PULL_CONSUMER_POLL_INTERVAL);
106     
107         runQueueThis_ = new Runnable JavaDoc()
108         {
109             public void run()
110             {
111                 schedulePullTask( ProxyPullConsumerImpl.this );
112             }
113         };
114     }
115
116     ////////////////////////////////////////
117

118     public ProxyType MyType() {
119         return ProxyType.PULL_ANY;
120     }
121
122
123     public void disconnect_pull_consumer()
124     {
125         destroy();
126     }
127
128
129     protected void disconnectClient()
130     {
131         stopTask();
132
133         pullSupplier_.disconnect_pull_supplier();
134
135         pullSupplier_ = null;
136     }
137
138
139     protected void connectionSuspended()
140     {
141         stopTask();
142     }
143
144
145     protected void connectionResumed()
146     {
147         startTask();
148     }
149
150
151     public void runPullMessage() throws Disconnected
152     {
153         if ( !isConnected() )
154             {
155                 return;
156             }
157
158         try {
159             runPullEventInternal();
160         } catch (InterruptedException JavaDoc e) {
161             logger_.error("pull was interrupted", e);
162         }
163     }
164
165
166     private void runPullEventInternal()
167         throws InterruptedException JavaDoc,
168                Disconnected
169     {
170         BooleanHolder JavaDoc hasEvent = new BooleanHolder JavaDoc();
171         Any JavaDoc event = null;
172
173         try {
174             pullSync_.acquire();
175
176             ++pullCounter_;
177
178             long _start = System.currentTimeMillis();
179
180             event = pullSupplier_.try_pull( hasEvent );
181
182             timeSpentInPull_ += System.currentTimeMillis() - _start;
183         }
184         finally {
185             pullSync_.release();
186         }
187
188         if ( hasEvent.value )
189             {
190                 ++successfulPullCounter_;
191
192                 Message _message =
193                     getMessageFactory().newMessage( event, this );
194
195                 checkMessageProperties(_message);
196
197                 processMessage( _message );
198             }
199     }
200
201
202     public void connect_any_pull_supplier( PullSupplier pullSupplier )
203         throws AlreadyConnected
204     {
205         checkIsNotConnected();
206
207         pullSupplier_ = pullSupplier;
208
209         connectClient(pullSupplier);
210
211         startTask();
212     }
213
214
215     synchronized private void startTask()
216     {
217         if ( timerRegistration_ == null )
218         {
219             timerRegistration_ =
220                 getTaskProcessor().executeTaskPeriodically( pollInterval_,
221                                                             runQueueThis_,
222                                                             true );
223         }
224     }
225
226
227     synchronized private void stopTask()
228     {
229         if ( timerRegistration_ != null )
230         {
231             getTaskProcessor().cancelTask( timerRegistration_ );
232
233             timerRegistration_ = null;
234         }
235     }
236
237
238     public synchronized Servant JavaDoc getServant()
239     {
240         if ( thisServant_ == null )
241         {
242             thisServant_ = new ProxyPullConsumerPOATie( this );
243         }
244
245         return thisServant_;
246     }
247
248
249     public org.omg.CORBA.Object JavaDoc activate()
250     {
251         return ProxyConsumerHelper.narrow(getServant()._this_object(getORB()));
252     }
253
254     ////////////////////////////////////////
255
// todo collect management informations
256

257     public long getPollInterval()
258     {
259         return pollInterval_;
260     }
261
262
263     public long getPullTimer()
264     {
265         return timeSpentInPull_;
266     }
267
268
269     public int getPullCounter()
270     {
271         return pullCounter_;
272     }
273
274
275     public int getSuccessfulPullCounter()
276     {
277         return successfulPullCounter_;
278     }
279 }
280
Popular Tags