1 package org.jacorb.notification.servant; 2 3 22 23 import org.apache.avalon.framework.configuration.Configuration; 24 import org.jacorb.notification.MessageFactory; 25 import org.jacorb.notification.OfferManager; 26 import org.jacorb.notification.SubscriptionManager; 27 import org.jacorb.notification.conf.Attributes; 28 import org.jacorb.notification.conf.Default; 29 import org.jacorb.notification.engine.TaskProcessor; 30 import org.jacorb.notification.interfaces.Message; 31 import org.jacorb.notification.interfaces.MessageSupplier; 32 import org.omg.CORBA.Any ; 33 import org.omg.CORBA.BooleanHolder ; 34 import org.omg.CORBA.ORB ; 35 import org.omg.CosEventChannelAdmin.AlreadyConnected; 36 import org.omg.CosEventComm.Disconnected; 37 import org.omg.CosEventComm.PullSupplier; 38 import org.omg.CosNotifyChannelAdmin.ProxyConsumerHelper; 39 import org.omg.CosNotifyChannelAdmin.ProxyPullConsumerOperations; 40 import org.omg.CosNotifyChannelAdmin.ProxyPullConsumerPOATie; 41 import org.omg.CosNotifyChannelAdmin.ProxyType; 42 import org.omg.PortableServer.POA ; 43 import org.omg.PortableServer.Servant ; 44 45 import EDU.oswego.cs.dl.util.concurrent.Semaphore; 46 import EDU.oswego.cs.dl.util.concurrent.Sync; 47 48 52 53 public class ProxyPullConsumerImpl 54 extends AbstractProxyConsumer 55 implements ProxyPullConsumerOperations, 56 MessageSupplier 57 { 58 63 private final Sync pullSync_ = 64 new Semaphore(Default.DEFAULT_CONCURRENT_PULL_OPERATIONS_ALLOWED); 65 66 69 private PullSupplier pullSupplier_; 70 private long pollInterval_; 71 private Object timerRegistration_; 72 73 76 private final Runnable runQueueThis_; 77 78 81 84 private int pullCounter_; 85 86 89 private long timeSpentInPull_; 90 91 94 private int successfulPullCounter_; 95 96 98 public ProxyPullConsumerImpl(IAdmin admin, ORB orb, POA poa, Configuration conf, TaskProcessor taskProcessor, MessageFactory messageFactory, OfferManager offerManager, SubscriptionManager subscriptionManager) 99 { 100 super(admin, orb, poa, conf, taskProcessor, messageFactory, null, offerManager, subscriptionManager); 101 102 103 pollInterval_ = 104 conf.getAttributeAsLong (Attributes.PULL_CONSUMER_POLL_INTERVAL, 105 Default.DEFAULT_PULL_CONSUMER_POLL_INTERVAL); 106 107 runQueueThis_ = new Runnable () 108 { 109 public void run() 110 { 111 schedulePullTask( ProxyPullConsumerImpl.this ); 112 } 113 }; 114 } 115 116 118 public ProxyType MyType() { 119 return ProxyType.PULL_ANY; 120 } 121 122 123 public void disconnect_pull_consumer() 124 { 125 destroy(); 126 } 127 128 129 protected void disconnectClient() 130 { 131 stopTask(); 132 133 pullSupplier_.disconnect_pull_supplier(); 134 135 pullSupplier_ = null; 136 } 137 138 139 protected void connectionSuspended() 140 { 141 stopTask(); 142 } 143 144 145 protected void connectionResumed() 146 { 147 startTask(); 148 } 149 150 151 public void runPullMessage() throws Disconnected 152 { 153 if ( !isConnected() ) 154 { 155 return; 156 } 157 158 try { 159 runPullEventInternal(); 160 } catch (InterruptedException e) { 161 logger_.error("pull was interrupted", e); 162 } 163 } 164 165 166 private void runPullEventInternal() 167 throws InterruptedException , 168 Disconnected 169 { 170 BooleanHolder hasEvent = new BooleanHolder (); 171 Any event = null; 172 173 try { 174 pullSync_.acquire(); 175 176 ++pullCounter_; 177 178 long _start = System.currentTimeMillis(); 179 180 event = pullSupplier_.try_pull( hasEvent ); 181 182 timeSpentInPull_ += System.currentTimeMillis() - _start; 183 } 184 finally { 185 pullSync_.release(); 186 } 187 188 if ( hasEvent.value ) 189 { 190 ++successfulPullCounter_; 191 192 Message _message = 193 getMessageFactory().newMessage( event, this ); 194 195 checkMessageProperties(_message); 196 197 processMessage( _message ); 198 } 199 } 200 201 202 public void connect_any_pull_supplier( PullSupplier pullSupplier ) 203 throws AlreadyConnected 204 { 205 checkIsNotConnected(); 206 207 pullSupplier_ = pullSupplier; 208 209 connectClient(pullSupplier); 210 211 startTask(); 212 } 213 214 215 synchronized private void startTask() 216 { 217 if ( timerRegistration_ == null ) 218 { 219 timerRegistration_ = 220 getTaskProcessor().executeTaskPeriodically( pollInterval_, 221 runQueueThis_, 222 true ); 223 } 224 } 225 226 227 synchronized private void stopTask() 228 { 229 if ( timerRegistration_ != null ) 230 { 231 getTaskProcessor().cancelTask( timerRegistration_ ); 232 233 timerRegistration_ = null; 234 } 235 } 236 237 238 public synchronized Servant getServant() 239 { 240 if ( thisServant_ == null ) 241 { 242 thisServant_ = new ProxyPullConsumerPOATie( this ); 243 } 244 245 return thisServant_; 246 } 247 248 249 public org.omg.CORBA.Object activate() 250 { 251 return ProxyConsumerHelper.narrow(getServant()._this_object(getORB())); 252 } 253 254 257 public long getPollInterval() 258 { 259 return pollInterval_; 260 } 261 262 263 public long getPullTimer() 264 { 265 return timeSpentInPull_; 266 } 267 268 269 public int getPullCounter() 270 { 271 return pullCounter_; 272 } 273 274 275 public int getSuccessfulPullCounter() 276 { 277 return successfulPullCounter_; 278 } 279 } 280 | Popular Tags |