KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > celtix > bus > ws > rm > RMHandler


1 package org.objectweb.celtix.bus.ws.rm;
2
3 import java.io.IOException JavaDoc;
4 import java.math.BigInteger JavaDoc;
5 import java.util.Collection JavaDoc;
6 import java.util.HashMap JavaDoc;
7 import java.util.Map JavaDoc;
8 import java.util.Timer JavaDoc;
9 import java.util.logging.Level JavaDoc;
10 import java.util.logging.Logger JavaDoc;
11
12 import javax.annotation.PostConstruct;
13 import javax.annotation.PreDestroy;
14 import javax.annotation.Resource;
15 import javax.xml.ws.handler.LogicalHandler;
16 import javax.xml.ws.handler.LogicalMessageContext;
17 import javax.xml.ws.handler.MessageContext;
18
19 import org.objectweb.celtix.Bus;
20 import org.objectweb.celtix.bindings.AbstractBindingBase;
21 import org.objectweb.celtix.bindings.BindingBase;
22 import org.objectweb.celtix.bindings.BindingContextUtils;
23 import org.objectweb.celtix.bindings.ClientBinding;
24 import org.objectweb.celtix.bindings.JAXWSConstants;
25 import org.objectweb.celtix.bindings.ServerBinding;
26
27 import org.objectweb.celtix.bus.jaxws.EndpointImpl;
28 import org.objectweb.celtix.bus.jaxws.ServiceImpl;
29 import org.objectweb.celtix.bus.ws.addressing.AddressingPropertiesImpl;
30 import org.objectweb.celtix.bus.ws.addressing.ContextUtils;
31 import org.objectweb.celtix.bus.ws.addressing.VersionTransformer;
32 import org.objectweb.celtix.bus.ws.rm.persistence.RMStoreFactory;
33 import org.objectweb.celtix.common.logging.LogUtils;
34 import org.objectweb.celtix.configuration.Configuration;
35 import org.objectweb.celtix.configuration.ConfigurationBuilder;
36 import org.objectweb.celtix.configuration.ConfigurationBuilderFactory;
37 import org.objectweb.celtix.configuration.ConfigurationProvider;
38 import org.objectweb.celtix.context.ObjectMessageContext;
39 import org.objectweb.celtix.context.OutputStreamMessageContext;
40 import org.objectweb.celtix.handlers.SystemHandler;
41 import org.objectweb.celtix.transports.ClientTransport;
42 import org.objectweb.celtix.transports.ServerTransport;
43 import org.objectweb.celtix.transports.Transport;
44 import org.objectweb.celtix.ws.addressing.AddressingProperties;
45 import org.objectweb.celtix.ws.addressing.RelatesToType;
46 import org.objectweb.celtix.ws.addressing.v200408.AttributedURI;
47 import org.objectweb.celtix.ws.addressing.v200408.EndpointReferenceType;
48 import org.objectweb.celtix.ws.rm.AckRequestedType;
49 import org.objectweb.celtix.ws.rm.CreateSequenceResponseType;
50 import org.objectweb.celtix.ws.rm.CreateSequenceType;
51 import org.objectweb.celtix.ws.rm.Identifier;
52 import org.objectweb.celtix.ws.rm.RMProperties;
53 import org.objectweb.celtix.ws.rm.SequenceAcknowledgement;
54 import org.objectweb.celtix.ws.rm.SequenceType;
55 import org.objectweb.celtix.ws.rm.TerminateSequenceType;
56 import org.objectweb.celtix.ws.rm.persistence.RMStore;
57 import org.objectweb.celtix.ws.rm.wsdl.SequenceFault;
58 import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
59
60 public class RMHandler implements LogicalHandler<LogicalMessageContext>, SystemHandler {
61
62     public static final String JavaDoc RM_CONFIGURATION_URI = "http://celtix.objectweb.org/bus/ws/rm/rm-config";
63     public static final String JavaDoc RM_CONFIGURATION_ID = "rm-handler";
64
65     private static final Logger JavaDoc LOG = LogUtils.getL7dLogger(RMHandler.class);
66     private static Map JavaDoc<BindingBase, RMHandler> handlers;
67
68     private RMSource source;
69     private RMDestination destination;
70     private RMProxy proxy;
71     private RMServant servant;
72     private Configuration configuration;
73     private RMStore store;
74     private Timer JavaDoc timer;
75     private boolean busLifeCycleListenerRegistered;
76       
77     @Resource(name = JAXWSConstants.BUS_PROPERTY) private Bus bus;
78     @Resource(name = JAXWSConstants.CLIENT_BINDING_PROPERTY) private ClientBinding clientBinding;
79     @Resource(name = JAXWSConstants.SERVER_BINDING_PROPERTY) private ServerBinding serverBinding;
80     @Resource(name = JAXWSConstants.CLIENT_TRANSPORT_PROPERTY) private ClientTransport clientTransport;
81     @Resource(name = JAXWSConstants.SERVER_TRANSPORT_PROPERTY) private ServerTransport serverTransport;
82
83     public RMHandler() {
84         proxy = new RMProxy(this);
85         servant = new RMServant();
86     }
87     
88     @PostConstruct
89     protected synchronized void initialise() {
90         if (null == handlers) {
91             handlers = new HashMap JavaDoc<BindingBase, RMHandler>();
92         }
93         handlers.put(getBinding(), this);
94         
95         if (null == configuration) {
96             configuration = createConfiguration();
97         }
98         
99         if (null == store) {
100             store = new RMStoreFactory().getStore(configuration);
101         }
102         
103         if (null == getSource()) {
104             source = new RMSource(this);
105             source.restore();
106         }
107         if (null == destination) {
108             destination = new RMDestination(this);
109             destination.restore();
110         }
111         
112         if (null == timer) {
113             timer = new Timer JavaDoc();
114         }
115         
116         if (!busLifeCycleListenerRegistered) {
117             getBinding().getBus().getLifeCycleManager()
118                 .registerLifeCycleListener(new RMBusLifeCycleListener(getSource()));
119             busLifeCycleListenerRegistered = true;
120         }
121     }
122     
123     public static RMHandler getHandler(BindingBase binding) {
124         return handlers.get(binding);
125     }
126
127     public void close(MessageContext context) {
128         // TODO commit transaction
129
}
130
131     public boolean handleFault(LogicalMessageContext context) {
132
133         open(context);
134         return false;
135     }
136
137     public boolean handleMessage(LogicalMessageContext context) {
138
139         open(context);
140
141         try {
142             if (ContextUtils.isOutbound(context)) {
143                 handleOutbound(context);
144             } else {
145                 handleInbound(context);
146             }
147         } catch (SequenceFault sf) {
148             sf.printStackTrace();
149             LOG.log(Level.SEVERE, "SequenceFault", sf);
150         }
151         return true;
152     }
153
154     @PreDestroy
155     public void shutdown() {
156         if (null != getSource()) {
157             getSource().shutdown();
158         }
159     }
160     
161     public Configuration getConfiguration() {
162         return configuration;
163     }
164     
165     public RMStore getStore() {
166         return store;
167     }
168
169     public Timer JavaDoc getTimer() {
170         return timer;
171     }
172     
173     public Bus getBus() {
174         return bus;
175     }
176     
177     public Transport getTransport() {
178         return null == clientTransport ? serverTransport : clientTransport;
179     }
180
181     public ClientTransport getClientTransport() {
182         return clientTransport;
183     }
184
185     public ServerTransport getServerTransport() {
186         return serverTransport;
187     }
188
189     public ClientBinding getClientBinding() {
190         return clientBinding;
191     }
192
193     public ServerBinding getServerBinding() {
194         return serverBinding;
195     }
196
197     public boolean isServerSide() {
198         return null != serverBinding;
199     }
200
201     public AbstractBindingBase getBinding() {
202         if (null != clientBinding) {
203             return (AbstractBindingBase)clientBinding;
204         }
205         return (AbstractBindingBase)serverBinding;
206     }
207
208     public RMProxy getProxy() {
209         return proxy;
210     }
211     
212     public RMServant getServant() {
213         return servant;
214     }
215     
216     protected RMSource getSource() {
217         return source;
218     }
219     
220     protected RMDestination getDestination() {
221         return destination;
222     }
223
224     protected void open(LogicalMessageContext context) {
225         // TODO begin transaction
226
getSource().getRetransmissionQueue().start(getBus().getWorkQueueManager()
227                                                    .getAutomaticWorkQueue());
228     }
229
230     protected Configuration createConfiguration() {
231         
232         Configuration busCfg = getBinding().getBus().getConfiguration();
233         ConfigurationBuilder builder = ConfigurationBuilderFactory.getBuilder();
234         Configuration parent;
235         org.objectweb.celtix.ws.addressing.EndpointReferenceType ref = getBinding().getEndpointReference();
236
237         if (null != clientBinding) {
238             String JavaDoc id = EndpointReferenceUtils.getServiceName(ref).toString()
239                 + "/" + EndpointReferenceUtils.getPortName(ref);
240             parent = builder.getConfiguration(ServiceImpl.PORT_CONFIGURATION_URI,
241                                                                 id, busCfg);
242         } else {
243             parent = builder.getConfiguration(EndpointImpl.ENDPOINT_CONFIGURATION_URI, EndpointReferenceUtils
244                 .getServiceName(ref).toString(), busCfg);
245         }
246
247         Configuration cfg = builder.getConfiguration(RM_CONFIGURATION_URI, RM_CONFIGURATION_ID, parent);
248         if (null == cfg) {
249             cfg = builder.buildConfiguration(RM_CONFIGURATION_URI, RM_CONFIGURATION_ID, parent);
250             
251         }
252         boolean policyProviderRegistered = false;
253         for (ConfigurationProvider p : cfg.getProviders()) {
254             if (p instanceof RMPolicyProvider) {
255                 policyProviderRegistered = true;
256                 break;
257             }
258         }
259         if (!policyProviderRegistered) {
260             cfg.getProviders().add(new RMPolicyProvider(getBinding().getBus(),
261                                                         getBinding().getEndpointReference()));
262         }
263         
264     
265         return cfg;
266
267     }
268
269     protected void handleOutbound(LogicalMessageContext context) throws SequenceFault {
270         LOG.entering(getClass().getName(), "handleOutbound");
271         AddressingPropertiesImpl maps =
272             ContextUtils.retrieveMAPs(context, false, true);
273       
274         // ensure the appropriate version of WS-Addressing is used
275
maps.exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME);
276
277         String JavaDoc action = null;
278         if (maps != null && null != maps.getAction()) {
279             action = maps.getAction().getValue();
280         }
281
282         // nothing to do if this is a CreateSequence, TerminateSequence or
283
// SequenceInfo request
284

285         if (LOG.isLoggable(Level.FINE)) {
286             LOG.fine("Action: " + action);
287         }
288
289         boolean isApplicationMessage = true;
290         
291         if (RMUtils.getRMConstants().getCreateSequenceAction().equals(action)
292             || RMUtils.getRMConstants().getCreateSequenceResponseAction().equals(action)
293             || RMUtils.getRMConstants().getTerminateSequenceAction().equals(action)
294             || RMUtils.getRMConstants().getLastMessageAction().equals(action)
295             || RMUtils.getRMConstants().getSequenceAcknowledgmentAction().equals(action)
296             || RMUtils.getRMConstants().getSequenceInfoAction().equals(action)) {
297             isApplicationMessage = false;
298         }
299         
300         RMPropertiesImpl rmpsOut = (RMPropertiesImpl)RMContextUtils.retrieveRMProperties(context, true);
301         if (null == rmpsOut) {
302             rmpsOut = new RMPropertiesImpl();
303             RMContextUtils.storeRMProperties(context, rmpsOut, true);
304         }
305         
306         RMPropertiesImpl rmpsIn = null;
307         Identifier inSeqId = null;
308         BigInteger JavaDoc inMessageNumber = null;
309         
310         if (isApplicationMessage) {
311                         
312             rmpsIn = (RMPropertiesImpl)RMContextUtils.retrieveRMProperties(context, false);
313             
314             if (null != rmpsIn && null != rmpsIn.getSequence()) {
315                 inSeqId = rmpsIn.getSequence().getIdentifier();
316                 inMessageNumber = rmpsIn.getSequence().getMessageNumber();
317             }
318             LOG.fine("inbound sequence: " + (null == inSeqId ? "null" : inSeqId.getValue()));
319
320             // not for partial responses to oneway requests
321

322             if (!(isServerSide() && BindingContextUtils.isOnewayTransport(context))) {
323
324                 if (!ContextUtils.isRequestor(context)) {
325                     assert null != inSeqId;
326                 }
327                 
328                 // get the current sequence, requesting the creation of a new one if necessary
329

330                 SourceSequence seq = getSequence(inSeqId, context, maps);
331                 assert null != seq;
332
333                 // increase message number and store a sequence type object in
334
// context
335

336                 seq.nextMessageNumber(inSeqId, inMessageNumber);
337                 rmpsOut.setSequence(seq);
338
339                 // if this was the last message in the sequence, reset the
340
// current sequence so that a new one will be created next
341
// time the handler is invoked
342

343                 if (seq.isLastMessage()) {
344                     source.setCurrent(null);
345                 }
346             }
347         }
348         
349         // add Acknowledgements (to application messages or explicitly
350
// created Acknowledgement messages only)
351

352         if (isApplicationMessage
353             || RMUtils.getRMConstants().getSequenceAcknowledgmentAction().equals(action)) {
354             AttributedURI to = VersionTransformer.convert(maps.getTo());
355             assert null != to;
356             addAcknowledgements(rmpsOut, inSeqId, to);
357         }
358
359         // indicate to the binding that a response is expected from the transport although
360
// the web method is a oneway method
361

362         if (BindingContextUtils.isOnewayMethod(context)
363             || RMUtils.getRMConstants().getLastMessageAction().equals(action)) {
364             context.put(OutputStreamMessageContext.ONEWAY_MESSAGE_TF, Boolean.FALSE);
365         }
366     }
367
368     protected void handleInbound(LogicalMessageContext context) throws SequenceFault {
369
370         LOG.entering(getClass().getName(), "handleInbound");
371         RMProperties rmps = RMContextUtils.retrieveRMProperties(context, false);
372         
373         final AddressingPropertiesImpl maps = ContextUtils.retrieveMAPs(context, false, false);
374         assert null != maps;
375
376         String JavaDoc action = null;
377         if (null != maps.getAction()) {
378             action = maps.getAction().getValue();
379         }
380
381         if (LOG.isLoggable(Level.FINE)) {
382             LOG.fine("Action: " + action);
383         }
384
385         if (RMUtils.getRMConstants().getCreateSequenceResponseAction().equals(action)) {
386             Object JavaDoc[] parameters = (Object JavaDoc[])context.get(ObjectMessageContext.METHOD_PARAMETERS);
387             CreateSequenceResponseType csr = (CreateSequenceResponseType)parameters[0];
388             getServant().createSequenceResponse(getSource(),
389                                                 csr,
390                                                 getProxy().getOfferedIdentifier());
391
392             return;
393         } else if (RMUtils.getRMConstants().getCreateSequenceAction().equals(action)) {
394             Object JavaDoc[] parameters = (Object JavaDoc[])context.get(ObjectMessageContext.METHOD_PARAMETERS);
395             CreateSequenceType cs = (CreateSequenceType)parameters[0];
396
397             final CreateSequenceResponseType csr =
398                 getServant().createSequence(getDestination(), cs, maps);
399             
400             Runnable JavaDoc response = new Runnable JavaDoc() {
401                 public void run() {
402                     try {
403                         getProxy().createSequenceResponse(maps, csr);
404                     } catch (IOException JavaDoc ex) {
405                         ex.printStackTrace();
406                     } catch (SequenceFault sf) {
407                         sf.printStackTrace();
408                     }
409                 }
410             };
411             getBinding().getBus().getWorkQueueManager().getAutomaticWorkQueue().execute(response);
412     
413             return;
414         } else if (RMUtils.getRMConstants().getTerminateSequenceAction().equals(action)) {
415             Object JavaDoc[] parameters = (Object JavaDoc[])context.get(ObjectMessageContext.METHOD_PARAMETERS);
416             TerminateSequenceType cs = (TerminateSequenceType)parameters[0];
417
418             getServant().terminateSequence(getDestination(), cs.getIdentifier());
419         }
420         
421         // for application AND out of band messages
422

423         if (null != rmps) {
424             
425             processAcknowledgments(rmps);
426
427             processAcknowledgmentRequests(rmps);
428             
429             processSequence(rmps, maps);
430         }
431     }
432
433     private void processAcknowledgments(RMProperties rmps) {
434         Collection JavaDoc<SequenceAcknowledgement> acks = rmps.getAcks();
435         if (null != acks) {
436             for (SequenceAcknowledgement ack : acks) {
437                 getSource().setAcknowledged(ack);
438             }
439         }
440     }
441
442     private void processSequence(RMProperties rmps, AddressingProperties maps) throws SequenceFault {
443         SequenceType s = rmps.getSequence();
444         if (null == s) {
445             return;
446         }
447         getDestination().acknowledge(s,
448             null == maps.getReplyTo() ? null : maps.getReplyTo().getAddress().getValue());
449     }
450
451     private void processAcknowledgmentRequests(RMProperties rmps) {
452         Collection JavaDoc<AckRequestedType> requested = rmps.getAcksRequested();
453         if (null != requested) {
454             for (AckRequestedType ar : requested) {
455                 DestinationSequence seq = getDestination().getSequence(ar.getIdentifier());
456                 if (null != seq) {
457                     seq.scheduleImmediateAcknowledgement();
458                 } else {
459                     LOG.severe("No such sequence.");
460                 }
461             }
462         }
463     }
464
465     private void addAcknowledgements(RMPropertiesImpl rmpsOut, Identifier inSeqId, AttributedURI to) {
466
467         for (DestinationSequence seq : getDestination().getAllSequences()) {
468             if (seq.sendAcknowledgement()
469                 && ((seq.getAcksTo().getAddress().getValue().equals(RMUtils.getAddressingConstants()
470                     .getAnonymousURI()) && AbstractSequenceImpl.identifierEquals(seq.getIdentifier(),
471                                                                                 inSeqId))
472                     || to.getValue().equals(seq.getAcksTo().getAddress().getValue()))) {
473                 rmpsOut.addAck(seq);
474             } else if (LOG.isLoggable(Level.FINE)) {
475                 if (!seq.sendAcknowledgement()) {
476                     LOG.fine("no need to add an acknowledgements for sequence "
477                              + seq.getIdentifier().getValue());
478                 } else {
479                     LOG.fine("sequences acksTo (" + seq.getAcksTo().getAddress().getValue()
480                              + ") does not match to (" + to.getValue() + ")");
481                 }
482             }
483         }
484
485         if (LOG.isLoggable(Level.FINE)) {
486             Collection JavaDoc<SequenceAcknowledgement> acks = rmpsOut.getAcks();
487             if (null == acks) {
488                 LOG.fine("No acknowledgements added");
489             } else {
490                 LOG.fine("Added " + acks.size() + " acknowledgements.");
491             }
492         }
493     }
494     
495     private SourceSequence getSequence(Identifier inSeqId,
496                                  LogicalMessageContext context,
497                                  AddressingPropertiesImpl maps) throws SequenceFault {
498         SourceSequence seq = getSource().getCurrent(inSeqId);
499
500         if (null == seq) {
501             // TODO: better error handling
502
org.objectweb.celtix.ws.addressing.EndpointReferenceType to = null;
503             try {
504                 EndpointReferenceType acksTo = null;
505                 RelatesToType relatesTo = null;
506                 if (isServerSide()) {
507                     AddressingPropertiesImpl inMaps = ContextUtils
508                         .retrieveMAPs(context, false, false);
509                     inMaps.exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME);
510                     acksTo = RMUtils.createReference(inMaps.getTo().getValue());
511                     to = inMaps.getReplyTo();
512                     getServant().setUnattachedIdentifier(inSeqId);
513                     relatesTo = ContextUtils.WSA_OBJECT_FACTORY.createRelatesToType();
514                     DestinationSequence inSeq = getDestination().getSequence(inSeqId);
515                     relatesTo.setValue(inSeq != null ? inSeq.getCorrelationID() : null);
516                 } else {
517                     acksTo = VersionTransformer.convert(maps.getReplyTo());
518                     // for oneways
519
if (Names.WSA_NONE_ADDRESS.equals(acksTo.getAddress().getValue())) {
520                         acksTo = RMUtils.createReference(Names.WSA_ANONYMOUS_ADDRESS);
521                     }
522                 }
523
524                 getProxy().createSequence(getSource(), to, acksTo, relatesTo);
525             } catch (IOException JavaDoc ex) {
526                 ex.printStackTrace();
527             }
528
529             seq = getSource().awaitCurrent(inSeqId);
530             seq.setTarget(to);
531         }
532         
533         return seq;
534     }
535
536     public void destroy() {
537         getSource().getRetransmissionQueue().stop();
538     }
539     
540     
541
542 }
543
Popular Tags