1 package org.jacorb.notification.servant; 2 3 22 23 import java.util.Collections ; 24 import java.util.HashMap ; 25 import java.util.Iterator ; 26 import java.util.List ; 27 import java.util.Map ; 28 29 import org.apache.avalon.framework.configuration.Configuration; 30 import org.apache.avalon.framework.configuration.ConfigurationException; 31 import org.jacorb.notification.NoTranslationException; 32 import org.jacorb.notification.OfferManager; 33 import org.jacorb.notification.SubscriptionManager; 34 import org.jacorb.notification.TypedEventMessage; 35 import org.jacorb.notification.engine.TaskProcessor; 36 import org.jacorb.notification.interfaces.Message; 37 import org.jacorb.notification.interfaces.MessageConsumer; 38 import org.jacorb.notification.queue.MessageQueueAdapter; 39 import org.jacorb.notification.queue.RWLockEventQueueDecorator; 40 import org.jacorb.notification.util.PropertySet; 41 import org.jacorb.notification.util.PropertySetAdapter; 42 import org.omg.CORBA.ARG_OUT ; 43 import org.omg.CORBA.Any ; 44 import org.omg.CORBA.BooleanHolder ; 45 import org.omg.CORBA.InterfaceDef ; 46 import org.omg.CORBA.InterfaceDefHelper; 47 import org.omg.CORBA.NO_IMPLEMENT ; 48 import org.omg.CORBA.NVList ; 49 import org.omg.CORBA.ORB ; 50 import org.omg.CORBA.OperationDescription ; 51 import org.omg.CORBA.ParameterMode ; 52 import org.omg.CORBA.Repository ; 53 import org.omg.CORBA.ServerRequest ; 54 import org.omg.CORBA.InterfaceDefPackage.FullInterfaceDescription; 55 import org.omg.CosEventChannelAdmin.AlreadyConnected; 56 import org.omg.CosEventComm.Disconnected; 57 import org.omg.CosEventComm.PullConsumer; 58 import org.omg.CosNotification.DiscardPolicy; 59 import org.omg.CosNotification.EventTypeHelper; 60 import org.omg.CosNotification.OrderPolicy; 61 import org.omg.CosNotification.Property; 62 import org.omg.CosNotification.UnsupportedQoS; 63 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin; 64 import org.omg.CosNotifyChannelAdmin.ProxyType; 65 import org.omg.CosTypedNotifyChannelAdmin.TypedProxyPullSupplierHelper; 66 import org.omg.CosTypedNotifyChannelAdmin.TypedProxyPullSupplierOperations; 67 import org.omg.CosTypedNotifyChannelAdmin.TypedProxyPullSupplierPOATie; 68 import org.omg.DynamicAny.DynAny ; 69 import org.omg.DynamicAny.DynAnyFactory ; 70 import org.omg.DynamicAny.DynAnyFactoryPackage.InconsistentTypeCode ; 71 import org.omg.PortableServer.DynamicImplementation ; 72 import org.omg.PortableServer.POA ; 73 import org.omg.PortableServer.Servant ; 74 75 79 80 public class TypedProxyPullSupplierImpl extends AbstractProxySupplier implements 81 TypedProxyPullSupplierOperations, ITypedProxy 82 { 83 final Any trueAny_; 84 85 final Any falseAny_; 86 87 private final DynAnyFactory dynAnyFactory_; 88 89 final String supportedInterface_; 90 91 private PullConsumer pullConsumer_; 92 93 private TypedProxyPullSupplier typedProxyPullSupplierServant_; 94 95 private org.omg.CORBA.Object typedProxyPullSupplier_; 96 97 final Map messageQueueMap_; 98 99 final Map invalidResponses_; 100 101 private final Repository repository_; 102 103 private class TypedProxyPullSupplier extends DynamicImplementation 104 { 105 private final String [] supportedInterfaces_ = new String [] { supportedInterface_ }; 106 107 public void invoke(final ServerRequest request) 108 { 109 String _operation = request.operation(); 110 111 boolean _isTryOp = false; 112 if (_operation.startsWith("try_")) 113 { 114 _isTryOp = true; 115 _operation = _operation.substring(4); 117 } 118 119 try 120 { 121 final Message _mesg; 122 123 final MessageQueueAdapter _queue = (MessageQueueAdapter) messageQueueMap_ 124 .get(_operation); 125 126 if (_isTryOp) 127 { 128 _mesg = _queue.getMessageNoBlock(); 129 } 130 else 131 { 132 _mesg = _queue.getMessageBlocking(); 133 } 134 135 try 136 { 137 final NVList _args; 138 139 if (_mesg == null) 140 { 141 _args = (NVList ) invalidResponses_.get(_operation); 142 143 if (_isTryOp) 144 { 145 request.set_result(falseAny_); 146 } 147 } 148 else 149 { 150 _args = prepareResponse(_mesg); 151 152 if (_isTryOp) 153 { 154 request.set_result(trueAny_); 155 } 156 } 157 158 request.arguments(_args); 159 } finally 160 { 161 if (_mesg != null) 162 { 163 _mesg.dispose(); 164 } 165 } 166 } catch (InterruptedException e) 167 { 168 } 170 } 171 172 public String [] _all_interfaces(POA poa, byte[] oid) 173 { 174 return supportedInterfaces_; 175 } 176 177 public POA _default_POA() 178 { 179 return getPOA(); 180 } 181 } 182 183 final NVList prepareResponse(Message mesg) 184 { 185 try 186 { 187 Property[] _props = mesg.toTypedEvent(); 188 189 NVList _args = getORB().create_list(_props.length - 1); 190 191 for (int x = 1; x < _props.length; ++x) 193 { 194 _args.add_value(_props[x].name, _props[x].value, ARG_OUT.value); 195 } 196 197 return _args; 198 } catch (NoTranslationException e) 199 { 200 throw new RuntimeException (); 203 } 204 } 205 206 public TypedProxyPullSupplierImpl(ITypedAdmin admin, ConsumerAdmin consumerAdmin, ORB orb, 207 POA poa, Configuration conf, TaskProcessor taskProcessor, OfferManager offerManager, 208 SubscriptionManager subscriptionManager, DynAnyFactory dynAnyFactory, 209 Repository repository) throws ConfigurationException 210 { 211 super(admin, orb, poa, conf, taskProcessor, offerManager, 212 subscriptionManager, consumerAdmin); 213 214 trueAny_ = orb.create_any(); 215 falseAny_ = orb.create_any(); 216 217 trueAny_.insert_boolean(true); 218 falseAny_.insert_boolean(false); 219 220 supportedInterface_ = admin.getSupportedInterface(); 221 222 dynAnyFactory_ = dynAnyFactory; 223 repository_ = repository; 224 225 qosSettings_.addPropertySetListener( 226 new String [] { OrderPolicy.value, DiscardPolicy.value }, reconfigureEventQueues_); 227 228 try 229 { 230 FullInterfaceDescription interfaceDescription = getInterfaceDescription(); 231 232 validateInterface(interfaceDescription); 233 234 messageQueueMap_ = Collections 235 .unmodifiableMap(newMessageQueueMap(interfaceDescription)); 236 237 invalidResponses_ = Collections 238 .unmodifiableMap(newInvalidResponseMap(interfaceDescription)); 239 } catch (InconsistentTypeCode e) 240 { 241 throw new RuntimeException (); 242 } catch (InterruptedException e) 243 { 244 throw new RuntimeException (); 245 } 246 } 247 248 private void ensureMethodOnlyUsesOutParams(OperationDescription operation) 249 throws IllegalArgumentException 250 { 251 int _noOfParameters = operation.parameters.length; 252 253 for (int x = 0; x < _noOfParameters; ++x) 254 { 255 switch (operation.parameters[x].mode.value()) { 256 case ParameterMode._PARAM_IN: 257 case ParameterMode._PARAM_INOUT: 259 throw new IllegalArgumentException ("only OUT params allowed"); 260 case ParameterMode._PARAM_OUT: 261 break; 262 } 263 } 264 } 265 266 private void prepareInvalidResponse(Map map, OperationDescription operation) 267 throws InconsistentTypeCode 268 { 269 NVList _expectedParams = getORB().create_list(operation.parameters.length); 270 271 for (int x = 0; x < operation.parameters.length; ++x) 272 { 273 DynAny _dynAny = dynAnyFactory_ 274 .create_dyn_any_from_type_code(operation.parameters[x].type); 275 276 _expectedParams 277 .add_value(operation.parameters[x].name, _dynAny.to_any(), ARG_OUT.value); 278 } 279 280 map.put(operation.name, _expectedParams); 281 } 282 283 private final Map newMessageQueueMap(FullInterfaceDescription interfaceDescription) 284 throws InterruptedException 285 { 286 Map map = new HashMap (); 287 288 for (int x = 0; x < interfaceDescription.operations.length; ++x) 289 { 290 if (!interfaceDescription.operations[x].name.startsWith("try_")) 291 { 292 logger_.debug("Create Queue for Operation: " 293 + interfaceDescription.operations[x].name); 294 295 MessageQueueAdapter _messageQueue = getMessageQueueFactory().newMessageQueue( 296 qosSettings_); 297 298 map.put(interfaceDescription.operations[x].name, new RWLockEventQueueDecorator( 299 _messageQueue)); 300 } 301 } 302 303 return map; 304 } 305 306 private final Map newInvalidResponseMap(FullInterfaceDescription interfaceDescription) 307 throws InconsistentTypeCode 308 { 309 Map map = new HashMap (); 310 311 for (int x = 0; x < interfaceDescription.operations.length; ++x) 312 { 313 if (!interfaceDescription.operations[x].name.startsWith("try_")) 314 { 315 prepareInvalidResponse(map, interfaceDescription.operations[x]); 316 } 317 } 318 319 return map; 320 } 321 322 private final void validateInterface(FullInterfaceDescription interfaceDescription) 323 { 324 for (int x = 0; x < interfaceDescription.operations.length; ++x) 325 { 326 ensureMethodOnlyUsesOutParams(interfaceDescription.operations[x]); 327 } 328 } 329 330 private FullInterfaceDescription getInterfaceDescription() 331 { 332 InterfaceDef _interfaceDef = InterfaceDefHelper.narrow(repository_ 333 .lookup_id(supportedInterface_)); 334 335 return _interfaceDef.describe_interface(); 336 } 337 338 private final void configureEventQueue() 339 { 340 try 341 { 342 Iterator i = messageQueueMap_.keySet().iterator(); 343 344 while (i.hasNext()) 345 { 346 String _key = (String ) i.next(); 347 348 RWLockEventQueueDecorator _queueAdapter = (RWLockEventQueueDecorator) messageQueueMap_ 349 .get(_key); 350 351 MessageQueueAdapter _newQueue = getMessageQueueFactory().newMessageQueue( 352 qosSettings_); 353 354 _queueAdapter.replaceDelegate(_newQueue); 355 } 356 357 } catch (InterruptedException e) 358 { 359 throw new RuntimeException (e.getMessage()); 360 } 361 } 362 363 private PropertySetAdapter reconfigureEventQueues_ = new PropertySetAdapter() 364 { 365 public void actionPropertySetChanged(PropertySet source) throws UnsupportedQoS 366 { 367 configureEventQueue(); 368 } 369 }; 370 371 public Any pull() throws Disconnected 372 { 373 throw new NO_IMPLEMENT (); 374 } 375 376 public Any try_pull(BooleanHolder booleanHolder) throws Disconnected 377 { 378 throw new NO_IMPLEMENT (); 379 } 380 381 public void disconnect_pull_supplier() 382 { 383 destroy(); 384 } 385 386 public void connect_typed_pull_consumer(PullConsumer pullConsumer) throws AlreadyConnected 387 { 388 checkIsNotConnected(); 389 390 connectClient(pullConsumer); 391 392 pullConsumer_ = pullConsumer; 393 } 394 395 public org.omg.CORBA.Object get_typed_supplier() 396 { 397 if (typedProxyPullSupplierServant_ == null) 398 { 399 typedProxyPullSupplierServant_ = new TypedProxyPullSupplier(); 400 401 typedProxyPullSupplier_ = typedProxyPullSupplierServant_._this_object(getORB()); 402 } 403 return typedProxyPullSupplier_; 404 } 405 406 public ProxyType MyType() 407 { 408 return ProxyType.PULL_TYPED; 409 } 410 411 public List getSubsequentFilterStages() 412 { 413 return null; 414 } 415 416 public MessageConsumer getMessageConsumer() 417 { 418 return this; 419 } 420 421 public Servant getServant() 422 { 423 if (thisServant_ == null) 424 { 425 thisServant_ = new TypedProxyPullSupplierPOATie(this); 426 } 427 428 return thisServant_; 429 } 430 431 public org.omg.CORBA.Object activate() 432 { 433 return TypedProxyPullSupplierHelper.narrow(getServant()._this_object(getORB())); 434 } 435 436 public void deliverMessage(Message message) 437 { 438 try 439 { 440 Property[] _props = message.toTypedEvent(); 441 442 final String _fullQualifiedOperation; 443 444 if (TypedEventMessage.OPERATION_NAME.equals(_props[0].name)) 445 { 446 _fullQualifiedOperation = _props[0].value.extract_string(); 447 } 448 else if (TypedEventMessage.EVENT_TYPE.equals(_props[0].name)) 449 { 450 _fullQualifiedOperation = EventTypeHelper.extract(_props[0].value).type_name; 451 } 452 else 453 { 454 throw new IllegalArgumentException (); 455 } 456 457 int idx = _fullQualifiedOperation.lastIndexOf("::"); 458 String _operation = _fullQualifiedOperation.substring(idx + 2); 459 460 final Message _clonedMessage = (Message) message.clone(); 461 462 try 463 { 464 ((MessageQueueAdapter) messageQueueMap_.get(_operation)).enqeue(_clonedMessage); 465 } catch (InterruptedException e) 466 { 467 _clonedMessage.dispose(); 468 } 469 } catch (NoTranslationException e) 470 { 471 } 474 } 475 476 public void deliverPendingData() 477 { 478 } 480 481 public void disconnectClient() 482 { 483 if (pullConsumer_ != null) 484 { 485 pullConsumer_.disconnect_pull_consumer(); 486 pullConsumer_ = null; 487 } 488 } 489 490 protected long getCost() 491 { 492 return 0; 493 } 494 } | Popular Tags |