1 package org.jacorb.notification.servant; 2 3 23 24 25 import org.apache.avalon.framework.configuration.Configuration; 26 import org.jacorb.notification.MessageFactory; 27 import org.jacorb.notification.OfferManager; 28 import org.jacorb.notification.SubscriptionManager; 29 import org.jacorb.notification.conf.Attributes; 30 import org.jacorb.notification.conf.Default; 31 import org.jacorb.notification.engine.TaskProcessor; 32 import org.jacorb.notification.interfaces.Message; 33 import org.jacorb.notification.interfaces.MessageSupplier; 34 import org.omg.CORBA.BooleanHolder ; 35 import org.omg.CORBA.ORB ; 36 import org.omg.CosEventChannelAdmin.AlreadyConnected; 37 import org.omg.CosEventComm.Disconnected; 38 import org.omg.CosNotification.StructuredEvent; 39 import org.omg.CosNotifyChannelAdmin.ProxyConsumerHelper; 40 import org.omg.CosNotifyChannelAdmin.ProxyType; 41 import org.omg.CosNotifyChannelAdmin.StructuredProxyPullConsumerOperations; 42 import org.omg.CosNotifyChannelAdmin.StructuredProxyPullConsumerPOATie; 43 import org.omg.CosNotifyComm.StructuredPullSupplier; 44 import org.omg.PortableServer.POA ; 45 import org.omg.PortableServer.Servant ; 46 47 import EDU.oswego.cs.dl.util.concurrent.Semaphore; 48 import EDU.oswego.cs.dl.util.concurrent.Sync; 49 50 51 55 56 public class StructuredProxyPullConsumerImpl 57 extends AbstractProxyConsumer 58 implements StructuredProxyPullConsumerOperations, 59 MessageSupplier 60 { 61 protected final Sync pullSync_ = new Semaphore(Default.DEFAULT_CONCURRENT_PULL_OPERATIONS_ALLOWED); 62 63 protected long pollInterval_; 64 65 private StructuredPullSupplier pullSupplier_; 66 67 private Object taskId_; 68 69 private final Runnable runQueueThis_; 70 71 73 public StructuredProxyPullConsumerImpl(IAdmin admin, ORB orb, POA poa, Configuration conf, TaskProcessor taskProcessor, MessageFactory mf, OfferManager offerManager, SubscriptionManager subscriptionManager) 74 { 75 super(admin, orb, poa, conf, taskProcessor, mf, null, offerManager, subscriptionManager); 76 77 runQueueThis_ = new Runnable () 78 { 79 public void run() 80 { 81 schedulePullTask(StructuredProxyPullConsumerImpl.this); 82 } 83 }; 84 } 85 86 88 public ProxyType MyType() { 89 return ProxyType.PULL_STRUCTURED; 90 } 91 92 93 public void configure (Configuration conf) 94 { 95 super.configure (conf); 96 97 pollInterval_ = 98 conf.getAttributeAsLong (Attributes.PULL_CONSUMER_POLL_INTERVAL, 99 Default.DEFAULT_PULL_CONSUMER_POLL_INTERVAL); 100 } 101 102 103 public void disconnect_structured_pull_consumer() 104 { 105 destroy(); 106 } 107 108 109 public synchronized void connect_structured_pull_supplier( StructuredPullSupplier pullSupplier ) 110 throws AlreadyConnected 111 { 112 checkIsNotConnected(); 113 pullSupplier_ = pullSupplier; 114 connectClient(pullSupplier); 115 startTask(); 116 } 117 118 119 protected void connectionSuspended() 120 { 121 stopTask(); 122 } 123 124 125 public void connectionResumed() 126 { 127 startTask(); 128 } 129 130 131 public void runPullMessage() throws Disconnected 132 { 133 if (!isConnected() || isSuspended()) { 134 return; 135 } 136 137 try 138 { 139 runPullEventInternal(); 140 } 141 catch (InterruptedException e) 142 { 143 logger_.error("pull interrupted", e); 144 } 145 } 146 147 148 protected void runPullEventInternal() 149 throws InterruptedException , 150 Disconnected 151 { 152 BooleanHolder _hasEvent = new BooleanHolder (); 153 _hasEvent.value = false; 154 StructuredEvent _event = null; 155 156 try 157 { 158 pullSync_.acquire(); 159 _event = pullSupplier_.try_pull_structured_event( _hasEvent ); 160 } 161 finally 162 { 163 pullSync_.release(); 164 } 165 166 if ( _hasEvent.value ) 167 { 168 Message _mesg = 169 getMessageFactory().newMessage( _event, this ); 170 171 processMessage( _mesg ); 172 } 173 } 174 175 176 protected void disconnectClient() 177 { 178 stopTask(); 179 pullSupplier_.disconnect_structured_pull_supplier(); 180 181 pullSupplier_ = null; 182 } 183 184 185 protected void startTask() 186 { 187 if ( taskId_ == null ) 188 { 189 taskId_ = getTaskProcessor() 190 .executeTaskPeriodically( pollInterval_, 191 runQueueThis_, 192 true ); 193 } 194 } 195 196 197 protected void stopTask() 198 { 199 if ( taskId_ != null ) 200 { 201 getTaskProcessor().cancelTask( taskId_ ); 202 taskId_ = null; 203 } 204 } 205 206 207 public synchronized Servant getServant() 208 { 209 if ( thisServant_ == null ) 210 { 211 thisServant_ = new StructuredProxyPullConsumerPOATie( this ); 212 } 213 214 return thisServant_; 215 } 216 217 218 public org.omg.CORBA.Object activate() 219 { 220 return ProxyConsumerHelper.narrow(getServant()._this_object(getORB())); 221 } 222 } 223 | Popular Tags |