KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > notification > servant > TypedProxyPullSupplierImpl


1 package org.jacorb.notification.servant;
2
3 /*
4  * JacORB - a free Java ORB
5  *
6  * Copyright (C) 1997-2003 Gerald Brose.
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the Free
20  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */

22
23 import java.util.Collections JavaDoc;
24 import java.util.HashMap JavaDoc;
25 import java.util.Iterator JavaDoc;
26 import java.util.List JavaDoc;
27 import java.util.Map JavaDoc;
28
29 import org.apache.avalon.framework.configuration.Configuration;
30 import org.apache.avalon.framework.configuration.ConfigurationException;
31 import org.jacorb.notification.NoTranslationException;
32 import org.jacorb.notification.OfferManager;
33 import org.jacorb.notification.SubscriptionManager;
34 import org.jacorb.notification.TypedEventMessage;
35 import org.jacorb.notification.engine.TaskProcessor;
36 import org.jacorb.notification.interfaces.Message;
37 import org.jacorb.notification.interfaces.MessageConsumer;
38 import org.jacorb.notification.queue.MessageQueueAdapter;
39 import org.jacorb.notification.queue.RWLockEventQueueDecorator;
40 import org.jacorb.notification.util.PropertySet;
41 import org.jacorb.notification.util.PropertySetAdapter;
42 import org.omg.CORBA.ARG_OUT JavaDoc;
43 import org.omg.CORBA.Any JavaDoc;
44 import org.omg.CORBA.BooleanHolder JavaDoc;
45 import org.omg.CORBA.InterfaceDef JavaDoc;
46 import org.omg.CORBA.InterfaceDefHelper;
47 import org.omg.CORBA.NO_IMPLEMENT JavaDoc;
48 import org.omg.CORBA.NVList JavaDoc;
49 import org.omg.CORBA.ORB JavaDoc;
50 import org.omg.CORBA.OperationDescription JavaDoc;
51 import org.omg.CORBA.ParameterMode JavaDoc;
52 import org.omg.CORBA.Repository JavaDoc;
53 import org.omg.CORBA.ServerRequest JavaDoc;
54 import org.omg.CORBA.InterfaceDefPackage.FullInterfaceDescription;
55 import org.omg.CosEventChannelAdmin.AlreadyConnected;
56 import org.omg.CosEventComm.Disconnected;
57 import org.omg.CosEventComm.PullConsumer;
58 import org.omg.CosNotification.DiscardPolicy;
59 import org.omg.CosNotification.EventTypeHelper;
60 import org.omg.CosNotification.OrderPolicy;
61 import org.omg.CosNotification.Property;
62 import org.omg.CosNotification.UnsupportedQoS;
63 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
64 import org.omg.CosNotifyChannelAdmin.ProxyType;
65 import org.omg.CosTypedNotifyChannelAdmin.TypedProxyPullSupplierHelper;
66 import org.omg.CosTypedNotifyChannelAdmin.TypedProxyPullSupplierOperations;
67 import org.omg.CosTypedNotifyChannelAdmin.TypedProxyPullSupplierPOATie;
68 import org.omg.DynamicAny.DynAny JavaDoc;
69 import org.omg.DynamicAny.DynAnyFactory JavaDoc;
70 import org.omg.DynamicAny.DynAnyFactoryPackage.InconsistentTypeCode JavaDoc;
71 import org.omg.PortableServer.DynamicImplementation JavaDoc;
72 import org.omg.PortableServer.POA JavaDoc;
73 import org.omg.PortableServer.Servant JavaDoc;
74
75 /**
76  * @author Alphonse Bendt
77  * @version $Id: TypedProxyPullSupplierImpl.java,v 1.8 2005/04/27 10:45:46 alphonse.bendt Exp $
78  */

79
80 public class TypedProxyPullSupplierImpl extends AbstractProxySupplier implements
81         TypedProxyPullSupplierOperations, ITypedProxy
82 {
83     final Any JavaDoc trueAny_;
84
85     final Any JavaDoc falseAny_;
86
87     private final DynAnyFactory JavaDoc dynAnyFactory_;
88
89     final String JavaDoc supportedInterface_;
90
91     private PullConsumer pullConsumer_;
92
93     private TypedProxyPullSupplier typedProxyPullSupplierServant_;
94
95     private org.omg.CORBA.Object JavaDoc typedProxyPullSupplier_;
96
97     final Map JavaDoc messageQueueMap_;
98
99     final Map JavaDoc invalidResponses_;
100
101     private final Repository JavaDoc repository_;
102
103     private class TypedProxyPullSupplier extends DynamicImplementation JavaDoc
104     {
105         private final String JavaDoc[] supportedInterfaces_ = new String JavaDoc[] { supportedInterface_ };
106
107         public void invoke(final ServerRequest JavaDoc request)
108         {
109             String JavaDoc _operation = request.operation();
110
111             boolean _isTryOp = false;
112             if (_operation.startsWith("try_"))
113             {
114                 _isTryOp = true;
115                 // cut 'try_' prefix
116
_operation = _operation.substring(4);
117             }
118
119             try
120             {
121                 final Message _mesg;
122
123                 final MessageQueueAdapter _queue = (MessageQueueAdapter) messageQueueMap_
124                         .get(_operation);
125
126                 if (_isTryOp)
127                 {
128                     _mesg = _queue.getMessageNoBlock();
129                 }
130                 else
131                 {
132                     _mesg = _queue.getMessageBlocking();
133                 }
134
135                 try
136                 {
137                     final NVList JavaDoc _args;
138
139                     if (_mesg == null)
140                     {
141                         _args = (NVList JavaDoc) invalidResponses_.get(_operation);
142
143                         if (_isTryOp)
144                         {
145                             request.set_result(falseAny_);
146                         }
147                     }
148                     else
149                     {
150                         _args = prepareResponse(_mesg);
151
152                         if (_isTryOp)
153                         {
154                             request.set_result(trueAny_);
155                         }
156                     }
157
158                     request.arguments(_args);
159                 } finally
160                 {
161                     if (_mesg != null)
162                     {
163                         _mesg.dispose();
164                     }
165                 }
166             } catch (InterruptedException JavaDoc e)
167             {
168                 // ignore
169
}
170         }
171
172         public String JavaDoc[] _all_interfaces(POA JavaDoc poa, byte[] oid)
173         {
174             return supportedInterfaces_;
175         }
176
177         public POA JavaDoc _default_POA()
178         {
179             return getPOA();
180         }
181     }
182
183     final NVList JavaDoc prepareResponse(Message mesg)
184     {
185         try
186         {
187             Property[] _props = mesg.toTypedEvent();
188
189             NVList JavaDoc _args = getORB().create_list(_props.length - 1);
190
191             // start at index 1 here. index 0 contains operation name
192
for (int x = 1; x < _props.length; ++x)
193             {
194                 _args.add_value(_props[x].name, _props[x].value, ARG_OUT.value);
195             }
196
197             return _args;
198         } catch (NoTranslationException e)
199         {
200             // cannot happen here
201
// as there are no nontranslatable Messages queued.
202
throw new RuntimeException JavaDoc();
203         }
204     }
205
206     public TypedProxyPullSupplierImpl(ITypedAdmin admin, ConsumerAdmin consumerAdmin, ORB JavaDoc orb,
207             POA JavaDoc poa, Configuration conf, TaskProcessor taskProcessor, OfferManager offerManager,
208             SubscriptionManager subscriptionManager, DynAnyFactory JavaDoc dynAnyFactory,
209             Repository JavaDoc repository) throws ConfigurationException
210     {
211         super(admin, orb, poa, conf, taskProcessor, offerManager,
212                 subscriptionManager, consumerAdmin);
213
214         trueAny_ = orb.create_any();
215         falseAny_ = orb.create_any();
216
217         trueAny_.insert_boolean(true);
218         falseAny_.insert_boolean(false);
219
220         supportedInterface_ = admin.getSupportedInterface();
221
222         dynAnyFactory_ = dynAnyFactory;
223         repository_ = repository;
224
225         qosSettings_.addPropertySetListener(
226                 new String JavaDoc[] { OrderPolicy.value, DiscardPolicy.value }, reconfigureEventQueues_);
227
228         try
229         {
230             FullInterfaceDescription interfaceDescription = getInterfaceDescription();
231
232             validateInterface(interfaceDescription);
233
234             messageQueueMap_ = Collections
235                     .unmodifiableMap(newMessageQueueMap(interfaceDescription));
236
237             invalidResponses_ = Collections
238                     .unmodifiableMap(newInvalidResponseMap(interfaceDescription));
239         } catch (InconsistentTypeCode JavaDoc e)
240         {
241             throw new RuntimeException JavaDoc();
242         } catch (InterruptedException JavaDoc e)
243         {
244             throw new RuntimeException JavaDoc();
245         }
246     }
247
248     private void ensureMethodOnlyUsesOutParams(OperationDescription JavaDoc operation)
249             throws IllegalArgumentException JavaDoc
250     {
251         int _noOfParameters = operation.parameters.length;
252
253         for (int x = 0; x < _noOfParameters; ++x)
254         {
255             switch (operation.parameters[x].mode.value()) {
256             case ParameterMode._PARAM_IN:
257             // fallthrough
258
case ParameterMode._PARAM_INOUT:
259                 throw new IllegalArgumentException JavaDoc("only OUT params allowed");
260             case ParameterMode._PARAM_OUT:
261                 break;
262             }
263         }
264     }
265
266     private void prepareInvalidResponse(Map JavaDoc map, OperationDescription JavaDoc operation)
267             throws InconsistentTypeCode JavaDoc
268     {
269         NVList JavaDoc _expectedParams = getORB().create_list(operation.parameters.length);
270
271         for (int x = 0; x < operation.parameters.length; ++x)
272         {
273             DynAny JavaDoc _dynAny = dynAnyFactory_
274                     .create_dyn_any_from_type_code(operation.parameters[x].type);
275
276             _expectedParams
277                     .add_value(operation.parameters[x].name, _dynAny.to_any(), ARG_OUT.value);
278         }
279
280         map.put(operation.name, _expectedParams);
281     }
282
283     private final Map JavaDoc newMessageQueueMap(FullInterfaceDescription interfaceDescription)
284             throws InterruptedException JavaDoc
285     {
286         Map JavaDoc map = new HashMap JavaDoc();
287
288         for (int x = 0; x < interfaceDescription.operations.length; ++x)
289         {
290             if (!interfaceDescription.operations[x].name.startsWith("try_"))
291             {
292                 logger_.debug("Create Queue for Operation: "
293                         + interfaceDescription.operations[x].name);
294
295                 MessageQueueAdapter _messageQueue = getMessageQueueFactory().newMessageQueue(
296                         qosSettings_);
297
298                 map.put(interfaceDescription.operations[x].name, new RWLockEventQueueDecorator(
299                         _messageQueue));
300             }
301         }
302
303         return map;
304     }
305
306     private final Map JavaDoc newInvalidResponseMap(FullInterfaceDescription interfaceDescription)
307             throws InconsistentTypeCode JavaDoc
308     {
309         Map JavaDoc map = new HashMap JavaDoc();
310
311         for (int x = 0; x < interfaceDescription.operations.length; ++x)
312         {
313             if (!interfaceDescription.operations[x].name.startsWith("try_"))
314             {
315                 prepareInvalidResponse(map, interfaceDescription.operations[x]);
316             }
317         }
318
319         return map;
320     }
321
322     private final void validateInterface(FullInterfaceDescription interfaceDescription)
323     {
324         for (int x = 0; x < interfaceDescription.operations.length; ++x)
325         {
326             ensureMethodOnlyUsesOutParams(interfaceDescription.operations[x]);
327         }
328     }
329
330     private FullInterfaceDescription getInterfaceDescription()
331     {
332         InterfaceDef JavaDoc _interfaceDef = InterfaceDefHelper.narrow(repository_
333                 .lookup_id(supportedInterface_));
334
335         return _interfaceDef.describe_interface();
336     }
337
338     private final void configureEventQueue()
339     {
340         try
341         {
342             Iterator JavaDoc i = messageQueueMap_.keySet().iterator();
343
344             while (i.hasNext())
345             {
346                 String JavaDoc _key = (String JavaDoc) i.next();
347
348                 RWLockEventQueueDecorator _queueAdapter = (RWLockEventQueueDecorator) messageQueueMap_
349                         .get(_key);
350
351                 MessageQueueAdapter _newQueue = getMessageQueueFactory().newMessageQueue(
352                         qosSettings_);
353
354                 _queueAdapter.replaceDelegate(_newQueue);
355             }
356
357         } catch (InterruptedException JavaDoc e)
358         {
359             throw new RuntimeException JavaDoc(e.getMessage());
360         }
361     }
362
363     private PropertySetAdapter reconfigureEventQueues_ = new PropertySetAdapter()
364     {
365         public void actionPropertySetChanged(PropertySet source) throws UnsupportedQoS
366         {
367             configureEventQueue();
368         }
369     };
370
371     public Any JavaDoc pull() throws Disconnected
372     {
373         throw new NO_IMPLEMENT JavaDoc();
374     }
375
376     public Any JavaDoc try_pull(BooleanHolder JavaDoc booleanHolder) throws Disconnected
377     {
378         throw new NO_IMPLEMENT JavaDoc();
379     }
380
381     public void disconnect_pull_supplier()
382     {
383         destroy();
384     }
385
386     public void connect_typed_pull_consumer(PullConsumer pullConsumer) throws AlreadyConnected
387     {
388         checkIsNotConnected();
389
390         connectClient(pullConsumer);
391
392         pullConsumer_ = pullConsumer;
393     }
394
395     public org.omg.CORBA.Object JavaDoc get_typed_supplier()
396     {
397         if (typedProxyPullSupplierServant_ == null)
398         {
399             typedProxyPullSupplierServant_ = new TypedProxyPullSupplier();
400
401             typedProxyPullSupplier_ = typedProxyPullSupplierServant_._this_object(getORB());
402         }
403         return typedProxyPullSupplier_;
404     }
405
406     public ProxyType MyType()
407     {
408         return ProxyType.PULL_TYPED;
409     }
410
411     public List JavaDoc getSubsequentFilterStages()
412     {
413         return null;
414     }
415
416     public MessageConsumer getMessageConsumer()
417     {
418         return this;
419     }
420
421     public Servant JavaDoc getServant()
422     {
423         if (thisServant_ == null)
424         {
425             thisServant_ = new TypedProxyPullSupplierPOATie(this);
426         }
427         
428         return thisServant_;
429     }
430
431     public org.omg.CORBA.Object JavaDoc activate()
432     {
433         return TypedProxyPullSupplierHelper.narrow(getServant()._this_object(getORB()));
434     }
435
436     public void deliverMessage(Message message)
437     {
438         try
439         {
440             Property[] _props = message.toTypedEvent();
441
442             final String JavaDoc _fullQualifiedOperation;
443
444             if (TypedEventMessage.OPERATION_NAME.equals(_props[0].name))
445             {
446                 _fullQualifiedOperation = _props[0].value.extract_string();
447             }
448             else if (TypedEventMessage.EVENT_TYPE.equals(_props[0].name))
449             {
450                 _fullQualifiedOperation = EventTypeHelper.extract(_props[0].value).type_name;
451             }
452             else
453             {
454                 throw new IllegalArgumentException JavaDoc();
455             }
456
457             int idx = _fullQualifiedOperation.lastIndexOf("::");
458             String JavaDoc _operation = _fullQualifiedOperation.substring(idx + 2);
459
460             final Message _clonedMessage = (Message) message.clone();
461            
462             try
463             {
464                 ((MessageQueueAdapter) messageQueueMap_.get(_operation)).enqeue(_clonedMessage);
465             } catch (InterruptedException JavaDoc e)
466             {
467                 _clonedMessage.dispose();
468             }
469         } catch (NoTranslationException e)
470         {
471             // ignore
472
// Message is not delivered to the connected Consumer
473
}
474     }
475
476     public void deliverPendingData()
477     {
478         // No Op as this Proxy is a PullSupplier
479
}
480
481     public void disconnectClient()
482     {
483         if (pullConsumer_ != null)
484         {
485             pullConsumer_.disconnect_pull_consumer();
486             pullConsumer_ = null;
487         }
488     }
489     
490     protected long getCost()
491     {
492         return 0;
493     }
494 }
Popular Tags