KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > notification > servant > StructuredProxyPullConsumerImpl


1 package org.jacorb.notification.servant;
2
3 /*
4  * JacORB - a free Java ORB
5  *
6  * Copyright (C) 1999-2004 Gerald Brose
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the Free
20  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */

23
24
25 import org.apache.avalon.framework.configuration.Configuration;
26 import org.jacorb.notification.MessageFactory;
27 import org.jacorb.notification.OfferManager;
28 import org.jacorb.notification.SubscriptionManager;
29 import org.jacorb.notification.conf.Attributes;
30 import org.jacorb.notification.conf.Default;
31 import org.jacorb.notification.engine.TaskProcessor;
32 import org.jacorb.notification.interfaces.Message;
33 import org.jacorb.notification.interfaces.MessageSupplier;
34 import org.omg.CORBA.BooleanHolder JavaDoc;
35 import org.omg.CORBA.ORB JavaDoc;
36 import org.omg.CosEventChannelAdmin.AlreadyConnected;
37 import org.omg.CosEventComm.Disconnected;
38 import org.omg.CosNotification.StructuredEvent;
39 import org.omg.CosNotifyChannelAdmin.ProxyConsumerHelper;
40 import org.omg.CosNotifyChannelAdmin.ProxyType;
41 import org.omg.CosNotifyChannelAdmin.StructuredProxyPullConsumerOperations;
42 import org.omg.CosNotifyChannelAdmin.StructuredProxyPullConsumerPOATie;
43 import org.omg.CosNotifyComm.StructuredPullSupplier;
44 import org.omg.PortableServer.POA JavaDoc;
45 import org.omg.PortableServer.Servant JavaDoc;
46
47 import EDU.oswego.cs.dl.util.concurrent.Semaphore;
48 import EDU.oswego.cs.dl.util.concurrent.Sync;
49
50
51 /**
52  * @author Alphonse Bendt
53  * @version $Id: StructuredProxyPullConsumerImpl.java,v 1.13 2005/04/27 10:45:46 alphonse.bendt Exp $
54  */

55
56 public class StructuredProxyPullConsumerImpl
57     extends AbstractProxyConsumer
58     implements StructuredProxyPullConsumerOperations,
59                MessageSupplier
60 {
61     protected final Sync pullSync_ = new Semaphore(Default.DEFAULT_CONCURRENT_PULL_OPERATIONS_ALLOWED);
62
63     protected long pollInterval_;
64
65     private StructuredPullSupplier pullSupplier_;
66
67     private Object JavaDoc taskId_;
68
69     private final Runnable JavaDoc runQueueThis_;
70
71     ////////////////////////////////////////
72

73     public StructuredProxyPullConsumerImpl(IAdmin admin, ORB JavaDoc orb, POA JavaDoc poa, Configuration conf, TaskProcessor taskProcessor, MessageFactory mf, OfferManager offerManager, SubscriptionManager subscriptionManager)
74     {
75         super(admin, orb, poa, conf, taskProcessor, mf, null, offerManager, subscriptionManager);
76
77         runQueueThis_ = new Runnable JavaDoc()
78         {
79             public void run()
80             {
81                 schedulePullTask(StructuredProxyPullConsumerImpl.this);
82             }
83         };
84     }
85
86     ////////////////////////////////////////
87

88     public ProxyType MyType() {
89         return ProxyType.PULL_STRUCTURED;
90     }
91
92
93     public void configure (Configuration conf)
94     {
95         super.configure (conf);
96
97         pollInterval_ =
98             conf.getAttributeAsLong (Attributes.PULL_CONSUMER_POLL_INTERVAL,
99                                         Default.DEFAULT_PULL_CONSUMER_POLL_INTERVAL);
100     }
101
102
103     public void disconnect_structured_pull_consumer()
104     {
105         destroy();
106     }
107
108
109     public synchronized void connect_structured_pull_supplier( StructuredPullSupplier pullSupplier )
110         throws AlreadyConnected
111     {
112         checkIsNotConnected();
113         pullSupplier_ = pullSupplier;
114         connectClient(pullSupplier);
115         startTask();
116     }
117
118
119     protected void connectionSuspended()
120     {
121         stopTask();
122     }
123
124
125     public void connectionResumed()
126     {
127         startTask();
128     }
129
130
131     public void runPullMessage() throws Disconnected
132     {
133         if (!isConnected() || isSuspended()) {
134             return;
135         }
136
137         try
138         {
139             runPullEventInternal();
140         }
141         catch (InterruptedException JavaDoc e)
142         {
143             logger_.error("pull interrupted", e);
144         }
145     }
146
147
148     protected void runPullEventInternal()
149         throws InterruptedException JavaDoc,
150                Disconnected
151     {
152         BooleanHolder JavaDoc _hasEvent = new BooleanHolder JavaDoc();
153         _hasEvent.value = false;
154         StructuredEvent _event = null;
155
156         try
157         {
158             pullSync_.acquire();
159             _event = pullSupplier_.try_pull_structured_event( _hasEvent );
160         }
161         finally
162         {
163             pullSync_.release();
164         }
165
166         if ( _hasEvent.value )
167         {
168             Message _mesg =
169                 getMessageFactory().newMessage( _event, this );
170
171             processMessage( _mesg );
172         }
173     }
174
175
176     protected void disconnectClient()
177     {
178         stopTask();
179         pullSupplier_.disconnect_structured_pull_supplier();
180
181         pullSupplier_ = null;
182     }
183
184
185     protected void startTask()
186     {
187         if ( taskId_ == null )
188         {
189             taskId_ = getTaskProcessor()
190                 .executeTaskPeriodically( pollInterval_,
191                                           runQueueThis_,
192                                           true );
193         }
194     }
195
196
197     protected void stopTask()
198     {
199         if ( taskId_ != null )
200         {
201             getTaskProcessor().cancelTask( taskId_ );
202             taskId_ = null;
203         }
204     }
205
206
207     public synchronized Servant JavaDoc getServant()
208     {
209         if ( thisServant_ == null )
210         {
211             thisServant_ = new StructuredProxyPullConsumerPOATie( this );
212         }
213
214         return thisServant_;
215     }
216
217
218     public org.omg.CORBA.Object JavaDoc activate()
219     {
220         return ProxyConsumerHelper.narrow(getServant()._this_object(getORB()));
221     }
222 }
223
Popular Tags