1 package org.jacorb.notification.servant; 2 3 22 23 import java.util.List ; 24 25 import org.apache.avalon.framework.configuration.Configuration; 26 import org.apache.avalon.framework.configuration.ConfigurationException; 27 import org.jacorb.notification.OfferManager; 28 import org.jacorb.notification.SubscriptionManager; 29 import org.jacorb.notification.engine.TaskProcessor; 30 import org.jacorb.notification.interfaces.Message; 31 import org.jacorb.notification.interfaces.MessageConsumer; 32 import org.jacorb.notification.util.CollectionsWrapper; 33 import org.omg.CORBA.Any ; 34 import org.omg.CORBA.BooleanHolder ; 35 import org.omg.CORBA.ORB ; 36 import org.omg.CORBA.UNKNOWN ; 37 import org.omg.CosEventChannelAdmin.AlreadyConnected; 38 import org.omg.CosEventComm.Disconnected; 39 import org.omg.CosEventComm.PullConsumer; 40 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin; 41 import org.omg.CosNotifyChannelAdmin.ProxyPullSupplierOperations; 42 import org.omg.CosNotifyChannelAdmin.ProxyPullSupplierPOATie; 43 import org.omg.CosNotifyChannelAdmin.ProxySupplierHelper; 44 import org.omg.CosNotifyChannelAdmin.ProxyType; 45 import org.omg.PortableServer.POA ; 46 import org.omg.PortableServer.Servant ; 47 48 52 53 public class ProxyPullSupplierImpl extends AbstractProxySupplier implements 54 ProxyPullSupplierOperations 55 { 56 private static final Any sUndefinedAny; 57 58 static 59 { 60 ORB _orb = ORB.init(); 61 62 sUndefinedAny = _orb.create_any(); 63 } 64 65 67 private PullConsumer pullConsumer_ = null; 68 69 71 public ProxyPullSupplierImpl(IAdmin admin, ORB orb, POA poa, Configuration config, TaskProcessor taskProcessor, OfferManager offerManager, SubscriptionManager subscriptionManager, ConsumerAdmin consumerAdmin) 72 throws ConfigurationException 73 { 74 super(admin, orb, poa, config, taskProcessor, offerManager, subscriptionManager, consumerAdmin); 75 } 76 77 public ProxyType MyType() 78 { 79 return ProxyType.PULL_ANY; 80 } 81 82 public void disconnect_pull_supplier() 83 { 84 destroy(); 85 } 86 87 protected void disconnectClient() 88 { 89 if (pullConsumer_ != null) 90 { 91 logger_.info("disconnect any_pull_consumer"); 92 93 pullConsumer_.disconnect_pull_consumer(); 94 pullConsumer_ = null; 95 } 96 } 97 98 public Any pull() throws Disconnected 99 { 100 checkStillConnected(); 101 102 try 103 { 104 Message _event = getMessageBlocking(); 105 try 106 { 107 return _event.toAny(); 108 } finally 109 { 110 _event.dispose(); 111 } 112 } catch (InterruptedException e) 113 { 114 logger_.fatalError("interrupted", e); 115 116 throw new UNKNOWN (); 117 } 118 } 119 120 public Any try_pull(BooleanHolder hasEvent) throws Disconnected 121 { 122 checkStillConnected(); 123 124 hasEvent.value = false; 125 126 Message _message = getMessageNoBlock(); 127 128 if (_message != null) 129 { 130 try 131 { 132 hasEvent.value = true; 133 134 return _message.toAny(); 135 } finally 136 { 137 _message.dispose(); 138 } 139 } 140 141 return sUndefinedAny; 142 } 143 144 public void connect_any_pull_consumer(PullConsumer consumer) throws AlreadyConnected 145 { 146 logger_.info("connect any_pull_consumer"); 147 148 checkIsNotConnected(); 149 150 pullConsumer_ = consumer; 151 152 connectClient(consumer); 153 } 154 155 public List getSubsequentFilterStages() 156 { 157 return CollectionsWrapper.singletonList(this); 158 } 159 160 public MessageConsumer getMessageConsumer() 161 { 162 return this; 163 } 164 165 public void enableDelivery() 166 { 167 } 170 171 public void disableDelivery() 172 { 173 } 176 177 public void deliverPendingData() 178 { 179 } 181 182 public synchronized Servant getServant() 183 { 184 if (thisServant_ == null) 185 { 186 thisServant_ = new ProxyPullSupplierPOATie(this); 187 } 188 return thisServant_; 189 } 190 191 public org.omg.CORBA.Object activate() 192 { 193 return ProxySupplierHelper.narrow(getServant()._this_object(getORB())); 194 } 195 196 protected long getCost() 197 { 198 return 0; 199 } 200 } | Popular Tags |