1 package org.jacorb.notification.servant; 2 3 22 23 import java.util.List ; 24 25 import org.apache.avalon.framework.configuration.Configuration; 26 import org.apache.avalon.framework.configuration.ConfigurationException; 27 import org.jacorb.notification.NoTranslationException; 28 import org.jacorb.notification.OfferManager; 29 import org.jacorb.notification.SubscriptionManager; 30 import org.jacorb.notification.TypedEventMessage; 31 import org.jacorb.notification.engine.PushOperation; 32 import org.jacorb.notification.engine.PushTaskExecutorFactory; 33 import org.jacorb.notification.engine.TaskProcessor; 34 import org.jacorb.notification.interfaces.Message; 35 import org.jacorb.notification.interfaces.MessageConsumer; 36 import org.omg.CORBA.ARG_IN ; 37 import org.omg.CORBA.NVList ; 38 import org.omg.CORBA.ORB ; 39 import org.omg.CORBA.Request ; 40 import org.omg.CORBA.TCKind ; 41 import org.omg.CORBA.TypeCode ; 42 import org.omg.CosEventChannelAdmin.AlreadyConnected; 43 import org.omg.CosEventChannelAdmin.TypeError; 44 import org.omg.CosEventComm.Disconnected; 45 import org.omg.CosNotification.EventTypeHelper; 46 import org.omg.CosNotification.Property; 47 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin; 48 import org.omg.CosNotifyChannelAdmin.ProxyType; 49 import org.omg.CosTypedEventComm.TypedPushConsumer; 50 import org.omg.CosTypedNotifyChannelAdmin.TypedProxyPushSupplierHelper; 51 import org.omg.CosTypedNotifyChannelAdmin.TypedProxyPushSupplierOperations; 52 import org.omg.CosTypedNotifyChannelAdmin.TypedProxyPushSupplierPOATie; 53 import org.omg.PortableServer.POA ; 54 import org.omg.PortableServer.Servant ; 55 56 60 61 public class TypedProxyPushSupplierImpl extends AbstractProxyPushSupplier implements 62 TypedProxyPushSupplierOperations, ITypedProxy 63 { 64 private class PushTypedOperation implements PushOperation 65 { 66 private final Request request_; 67 68 public PushTypedOperation(Request request) { 69 request_ = request; 70 } 71 72 public void invokePush() throws Disconnected { 73 deliverMessageInternal(request_); 74 } 75 76 public void dispose() { 77 } 79 } 80 81 private TypedPushConsumer pushConsumer_; 82 83 private org.omg.CORBA.Object typedConsumer_; 84 85 private static final TypeCode TYPE_CODE_VOID = ORB.init().get_primitive_tc(TCKind.tk_void); 86 87 private final String supportedInterface_; 88 89 private long timeSpent_ = 0; 90 91 public TypedProxyPushSupplierImpl(ITypedAdmin admin, ConsumerAdmin consumerAdmin, ORB orb, 92 POA poa, Configuration conf, TaskProcessor taskProcessor, PushTaskExecutorFactory pushTaskExecutorFactory, 93 OfferManager offerManager, SubscriptionManager subscriptionManager) 94 throws ConfigurationException 95 { 96 super(admin, orb, poa, conf, taskProcessor, pushTaskExecutorFactory, offerManager, 97 subscriptionManager, consumerAdmin); 98 99 supportedInterface_ = admin.getSupportedInterface(); 100 } 101 102 public void disconnect_push_supplier() 103 { 104 destroy(); 105 } 106 107 public void connect_typed_push_consumer(TypedPushConsumer typedPushConsumer) 108 throws AlreadyConnected, TypeError 109 { 110 logger_.info("connect typed_push_supplier"); 111 112 checkIsNotConnected(); 113 114 connectClient(typedPushConsumer); 115 116 pushConsumer_ = typedPushConsumer; 117 118 typedConsumer_ = pushConsumer_.get_typed_consumer(); 119 120 if (!typedConsumer_._is_a(supportedInterface_)) 121 { 122 throw new TypeError(); 123 } 124 } 125 126 public ProxyType MyType() 127 { 128 return ProxyType.PUSH_TYPED; 129 } 130 131 public MessageConsumer getMessageConsumer() 132 { 133 return this; 134 } 135 136 public List getSubsequentFilterStages() 137 { 138 return null; 139 } 140 141 public org.omg.CORBA.Object activate() 142 { 143 return TypedProxyPushSupplierHelper.narrow(getServant()._this_object(getORB())); 144 } 145 146 public void isIDLAssignable(final String ifName) throws IllegalArgumentException 147 { 148 if (typedConsumer_._is_a(ifName)) 149 { 150 return; 151 } 152 153 if (ifName.indexOf("Pull") > 0) 154 { 155 int idx = ifName.indexOf("Pull"); 156 157 StringBuffer _nonPullIF = new StringBuffer (); 158 _nonPullIF.append(ifName.substring(0, idx)); 159 _nonPullIF.append(ifName.substring(idx + 4)); 160 161 if (typedConsumer_._is_a(_nonPullIF.toString())) 162 { 163 return; 164 } 165 } 166 167 throw new IllegalArgumentException (); 168 } 169 170 171 public void pushPendingData() 172 { 173 final Message[] messages = getAllMessages(); 174 175 for (int i = 0; i < messages.length; ++i) 176 { 177 try 178 { 179 deliverMessageWithRetry(messages[i]); 180 } finally 181 { 182 messages[i].dispose(); 183 } 184 } 185 } 186 187 private void deliverMessageWithRetry(Message message) 188 { 189 try 190 { 191 final Property[] _props = message.toTypedEvent(); 192 193 final String _fullQualifiedOperation; 194 195 if (TypedEventMessage.OPERATION_NAME.equals(_props[0].name)) 196 { 197 _fullQualifiedOperation = _props[0].value.extract_string(); 198 } 199 else if (TypedEventMessage.EVENT_TYPE.equals(_props[0].name)) 200 { 201 _fullQualifiedOperation = EventTypeHelper.extract(_props[0].value).type_name; 202 203 String _idlType = EventTypeHelper.extract(_props[0].value).domain_name; 204 205 isIDLAssignable(_idlType); 206 } 207 else 208 { 209 throw new IllegalArgumentException (); 210 } 211 212 int _idx = _fullQualifiedOperation.lastIndexOf("::"); 213 final String _operation = _fullQualifiedOperation.substring(_idx + 2); 214 215 final Request _request = typedConsumer_._request(_operation); 216 217 final NVList _arguments = _request.arguments(); 218 219 for (int x = 1; x < _props.length; ++x) 220 { 221 _arguments.add_value(_props[x].name, _props[x].value, ARG_IN.value); 222 } 223 224 _request.set_return_type(TYPE_CODE_VOID); 225 226 try 227 { 228 deliverMessageInternal(_request); 229 } catch (Throwable t) 230 { 231 PushTypedOperation _failedOperation = new PushTypedOperation(_request); 232 233 handleFailedPushOperation(_failedOperation, t); 234 } 235 } catch (NoTranslationException e) 236 { 237 240 logger_.info("No Translation possible", e); 241 } 242 } 243 244 void deliverMessageInternal(final Request request) 245 { 246 long now = System.currentTimeMillis(); 247 request.invoke(); 248 timeSpent_ += (System.currentTimeMillis() - now); 249 resetErrorCounter(); 250 } 251 252 protected void disconnectClient() 253 { 254 if (pushConsumer_ != null) 255 { 256 pushConsumer_.disconnect_push_consumer(); 257 pushConsumer_ = null; 258 } 259 } 260 261 public synchronized Servant getServant() 262 { 263 if (thisServant_ == null) 264 { 265 thisServant_ = new TypedProxyPushSupplierPOATie(this); 266 } 267 268 return thisServant_; 269 } 270 271 protected long getCost() 272 { 273 return timeSpent_; 274 } 275 } | Popular Tags |