KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > petals > jbi > messaging > MessageExchangeImpl


1 /**
2  * PETALS: PETALS Services Platform
3  * Copyright (C) 2005 EBM WebSourcing
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA.
19  *
20  * Initial developer(s): EBM WebSourcing
21  * --------------------------------------------------------------------------
22  * $Id: MessageExchangeImpl.java,v 1.2 2005/07/22 10:24:27 alouis Exp $
23  * --------------------------------------------------------------------------
24  */

25
26 package org.objectweb.petals.jbi.messaging;
27
28 import java.io.IOException JavaDoc;
29 import java.io.ObjectInputStream JavaDoc;
30 import java.io.ObjectOutputStream JavaDoc;
31 import java.io.Serializable JavaDoc;
32 import java.net.URI JavaDoc;
33 import java.util.HashMap JavaDoc;
34 import java.util.Map JavaDoc;
35 import java.util.Set JavaDoc;
36
37 import javax.jbi.messaging.ExchangeStatus;
38 import javax.jbi.messaging.Fault;
39 import javax.jbi.messaging.MessageExchange;
40 import javax.jbi.messaging.MessagingException;
41 import javax.jbi.messaging.NormalizedMessage;
42 import javax.jbi.servicedesc.ServiceEndpoint;
43 import javax.xml.namespace.QName JavaDoc;
44
45 import org.objectweb.petals.jbi.registry.ConsumerEndpoint;
46
47 /**
48  * @author Adrien LOUIS - EBM WebSourcing
49  * @author Gael BLONDELLE - EBM WebSourcing
50  * @author Anass OUAZZANI - EBM WebSourcing
51  */

52 public class MessageExchangeImpl implements MessageExchange, Cloneable JavaDoc, Serializable JavaDoc {
53
54     public static final String JavaDoc IN_MSG = "in";
55
56     public static final URI JavaDoc IN_ONLY_PATTERN = URI
57         .create("http://www.w3.org/2004/08/wsdl/in-only");
58
59     public static final URI JavaDoc IN_OPTIONAL_OUT_PATTERN = URI
60         .create("http://www.w3.org/2004/08/wsdl/in-opt-out");
61
62     public static final URI JavaDoc IN_OUT_PATTERN = URI
63         .create("http://www.w3.org/2004/08/wsdl/in-out");
64
65     public static final String JavaDoc OUT_MSG = "out";
66
67     public static final URI JavaDoc ROBUST_IN_ONLY_PATTERN = URI
68         .create("http://www.w3.org/2004/08/wsdl/robust-in-only");
69
70     private static final short SERIALIZE_ROLE_CONSUMER = 0;
71
72     private static final short SERIALIZE_ROLE_PROVIDER = 1;
73
74     private static final short SERIALIZE_STATUS_ACTIVE = 0;
75
76     private static final short SERIALIZE_STATUS_DONE = 1;
77
78     private static final short SERIALIZE_STATUS_ERROR = 2;
79
80     /**
81      *
82      */

83     private static final long serialVersionUID = 1L;
84
85     protected ConsumerEndpoint consumerEndpoint;
86
87     protected ServiceEndpoint endpoint;
88
89     protected Exception JavaDoc error;
90
91     protected String JavaDoc exchangeId;
92
93     protected Fault fault;
94
95     // //////////////////////////////////////////////////////////
96
//
97
// implementation specific attributes
98
//
99
// //////////////////////////////////////////////////////////
100

101     protected QName JavaDoc interfaceName;
102
103     protected Map JavaDoc<String JavaDoc, NormalizedMessage> messages = new HashMap JavaDoc<String JavaDoc, NormalizedMessage>();
104
105     protected QName JavaDoc operation;
106
107     protected URI JavaDoc pattern;
108
109     protected Map JavaDoc<String JavaDoc, Object JavaDoc> properties = new HashMap JavaDoc<String JavaDoc, Object JavaDoc>();
110
111     protected transient Role role;
112
113     protected QName JavaDoc service;
114
115     protected transient ExchangeStatus status;
116
117     protected boolean terminated;
118
119     protected boolean transacted;
120
121     /**
122      * Create a MessageExchangeImpl This constructor is called by a consumer who
123      * wants to initialize an exchange.
124      *
125      * @param consumerDeliveryChannel
126      */

127     public MessageExchangeImpl(ConsumerEndpoint consumerEndpoint) {
128
129         status = ExchangeStatus.ACTIVE;
130
131         role = Role.CONSUMER;
132
133         this.consumerEndpoint = consumerEndpoint;
134     }
135
136     /**
137      * @see javax.jbi.messaging.MessageExchange#createFault()
138      */

139     public Fault createFault() throws MessagingException {
140         return new FaultImpl();
141     }
142
143     /**
144      * @see javax.jbi.messaging.MessageExchange#createMessage()
145      */

146     public NormalizedMessage createMessage() throws MessagingException {
147         return new NormalizedMessageImpl();
148     }
149
150     public ConsumerEndpoint getConsumerEndpoint() {
151         return consumerEndpoint;
152     }
153
154     /**
155      * @see javax.jbi.messaging.MessageExchange#getEndpoint()
156      */

157     public ServiceEndpoint getEndpoint() {
158         return endpoint;
159     }
160
161     /**
162      * @see javax.jbi.messaging.MessageExchange#getError()
163      */

164     public Exception JavaDoc getError() {
165         return error;
166     }
167
168     /**
169      * @see javax.jbi.messaging.MessageExchange#getExchangeId()
170      */

171     public String JavaDoc getExchangeId() {
172         return exchangeId;
173     }
174
175     /**
176      * @see javax.jbi.messaging.MessageExchange#getFault()
177      */

178     public Fault getFault() {
179         return fault;
180     }
181
182     public QName JavaDoc getInterfaceName() {
183         return interfaceName;
184     }
185
186     /**
187      * get the message associated to the reference. It is <b>Not</b> Case
188      * Sensitive.
189      *
190      * @see javax.jbi.messaging.MessageExchange#getMessage(java.lang.String)
191      */

192     public NormalizedMessage getMessage(String JavaDoc name) {
193         if (name != null) {
194             return (NormalizedMessage) messages.get(name.toLowerCase());
195         } else {
196             return null;
197         }
198     }
199
200     public Map JavaDoc<String JavaDoc, NormalizedMessage> getMessages() {
201         return messages;
202     }
203
204     /**
205      * @see javax.jbi.messaging.MessageExchange#getOperation()
206      */

207     public QName JavaDoc getOperation() {
208         return operation;
209     }
210
211     /**
212      * @see javax.jbi.messaging.MessageExchange#getPattern()
213      */

214     public URI JavaDoc getPattern() {
215         return pattern;
216     }
217
218     /**
219      * @see javax.jbi.messaging.MessageExchange#getProperty(java.lang.String)
220      */

221     public Object JavaDoc getProperty(String JavaDoc name) {
222         return properties.get(name);
223     }
224
225     /**
226      * @see javax.jbi.messaging.MessageExchange#getPropertyNames()
227      */

228     public Set JavaDoc getPropertyNames() {
229         return properties.keySet();
230     }
231
232     /**
233      * (non-Javadoc)
234      *
235      * @see javax.jbi.messaging.MessageExchange#getRole()
236      */

237     public Role getRole() {
238         return role;
239     }
240
241     /**
242      * @see javax.jbi.messaging.MessageExchange#getService()
243      */

244     public QName JavaDoc getService() {
245         return service;
246     }
247
248     /**
249      * @see javax.jbi.messaging.MessageExchange#getStatus()
250      */

251     public javax.jbi.messaging.ExchangeStatus getStatus() {
252         return status;
253     }
254
255     public boolean isTerminated() {
256         return terminated;
257     }
258
259     /**
260      * @see javax.jbi.messaging.MessageExchange#isTransacted()
261      */

262     public boolean isTransacted() {
263         return transacted;
264     }
265
266     /**
267      * @see javax.jbi.messaging.MessageExchange#setEndpoint(java.lang.Exception)
268      */

269     public void setEndpoint(ServiceEndpoint endpoint) {
270         this.endpoint = endpoint;
271     }
272
273     /**
274      * @see javax.jbi.messaging.MessageExchange#setError(java.lang.Exception)
275      */

276     public void setError(Exception JavaDoc error) {
277         this.error = error;
278
279         this.status = ExchangeStatus.ERROR;
280     }
281
282     /**
283      * @param exchangeId
284      * The exchangeId to set.
285      */

286     public void setExchangeId(String JavaDoc exchangeId) {
287         this.exchangeId = exchangeId;
288     }
289
290     // //////////////////////////////////////////
291
// specific methods
292
// //////////////////////////////////////////
293

294     /**
295      * (non-Javadoc)
296      *
297      * @see javax.jbi.messaging.MessageExchange#setFault(javax.jbi.messaging.Fault)
298      */

299     public void setFault(Fault fault) throws MessagingException {
300         checkNotTerminated();
301
302         if (role == null) {
303             throw new MessagingException("The Role is not defined.");
304         }
305
306         // check that a fault has not already been sent
307
if (this.fault != null) {
308             throw new MessagingException("A fault has already been set.");
309         }
310
311         // check that the status has not be positionned with DONEor EXCEPTION
312
if (ExchangeStatus.DONE.equals(status)
313             || ExchangeStatus.ERROR.equals(status)) {
314             throw new MessagingException(
315                 "The MessageExchange state and the current Role do not allow this operation.");
316         }
317
318         // only the provider can send a fault, except in the case of a
319
// InOptionnalOut,
320
// where the consumer can set a fault after the provider sent a response
321

322         if (MessageExchange.Role.CONSUMER.equals(role)
323             && !messages.containsKey(OUT_MSG)) {
324             throw new MessagingException(
325                 "The MessageExchange state and the current Role do not allow this operation.");
326         }
327
328         this.fault = fault;
329     }
330
331     public void setInterfaceName(QName JavaDoc interfaceName) {
332         this.interfaceName = interfaceName;
333     }
334
335     /**
336      * Set the specified message. Check the availability to set the message with
337      * the following rules: <br>
338      * <li>CONSUMER Role <-> in reference or PROVIDER Role <-> out reference</li>
339      * <li>JBI predefined MEP <-> in or out reference</li>
340      * <li>setMessage(XXX,reference) has not already been called. <br>
341      * This is <b>not</b> case sensitive
342      *
343      * @see javax.jbi.messaging.MessageExchange#setMessage(javax.jbi.messaging.NormalizedMessage,
344      * java.lang.String)
345      */

346     public void setMessage(NormalizedMessage msg, String JavaDoc name)
347         throws MessagingException {
348         checkNotTerminated();
349
350         if (msg == null) {
351             throw new MessagingException("NormalizedMessage must be non null.");
352         }
353         if (name == null || name.trim().length() == 0) {
354             throw new MessagingException(
355                 "The message reference must be non null and non empty.");
356         }
357
358         // check if the pattern allows this reference
359
checkPatternMatching(name);
360
361         // check Role
362
checkRoleMatching(name);
363
364         // check that the message has not already been sent
365
if (messages.containsKey(name.toLowerCase())) {
366             throw new MessagingException(
367                 "A message has already been set with the '" + name
368                     + "' reference.");
369         }
370
371         // no exception was thrown, set the message
372
messages.put(name.toLowerCase(), msg);
373     }
374
375     /**
376      * @see javax.jbi.messaging.MessageExchange#setOperation(javax.xml.namespace.QName)
377      */

378     public void setOperation(QName JavaDoc name) {
379         this.operation = name;
380     }
381
382     /**
383      * @param pattern
384      * The pattern to set.
385      */

386     public void setPattern(URI JavaDoc pattern) {
387         this.pattern = pattern;
388     }
389
390     /**
391      * @see javax.jbi.messaging.MessageExchange#setProperty(java.lang.String,
392      * java.lang.Object)
393      */

394     public void setProperty(String JavaDoc name, Object JavaDoc obj) {
395         properties.put(name, obj);
396     }
397
398     public void setRole(Role role) {
399         this.role = role;
400     }
401
402     /**
403      * set Destination ServiceName. A destinsation Endpointreference is created
404      * with only "serviceName" attribute
405      */

406     public void setService(QName JavaDoc service) {
407         this.service = service;
408     }
409
410     /**
411      * @see javax.jbi.messaging.MessageExchange#setStatus(javax.jbi.messaging.ExchangeStatus)
412      */

413     public void setStatus(ExchangeStatus status) throws MessagingException {
414         checkNotTerminated();
415
416         // check that DONE can be set.
417
if (ExchangeStatus.DONE.equals(status)) {
418             if (Role.CONSUMER.equals(role)) {
419                 if (IN_ONLY_PATTERN.equals(pattern)) {
420                     throw new MessagingException(
421                         "The MessageExchange state does not allow this operation.");
422                 } else if (IN_OUT_PATTERN.equals(pattern) && fault == null
423                     && getMessage(OUT_MSG) == null) {
424                     throw new MessagingException(
425                         "The MessageExchange state does not allow this operation.");
426                 } else if (IN_OPTIONAL_OUT_PATTERN.equals(pattern)
427                     && fault == null && getMessage(OUT_MSG) == null) {
428                     throw new MessagingException(
429                         "The MessageExchange state does not allow this operation.");
430                 }
431             } else {
432                 if (ROBUST_IN_ONLY_PATTERN.equals(pattern) && fault != null) {
433                     throw new MessagingException(
434                         "The MessageExchange state does not allow this operation.");
435                 } else if (IN_OUT_PATTERN.equals(pattern)) {
436                     throw new MessagingException(
437                         "The MessageExchange state does not allow this operation.");
438                 } else if (IN_OPTIONAL_OUT_PATTERN.equals(pattern)) {
439                     if (fault != null && getMessage(OUT_MSG) == null) {
440                         throw new MessagingException(
441                             "The MessageExchange state does not allow this operation.");
442                     } else if (fault == null && getMessage(OUT_MSG) != null) {
443                         throw new MessagingException(
444                             "The MessageExchange state does not allow this operation.");
445                     }
446                 }
447             }
448         }
449         this.status = status;
450     }
451
452     public void setTerminated(boolean terminated) {
453         this.terminated = terminated;
454     }
455
456     /**
457      * @param transacted
458      * The transacted to set.
459      */

460     public void setTransacted(boolean transacted) {
461         this.transacted = transacted;
462     }
463
464     public String JavaDoc toString() {
465         return "MessageExchange@" + getExchangeId();
466     }
467
468     /**
469      * Check if the exchange is still valid.
470      *
471      * @throws MessagingException
472      */

473     protected void checkNotTerminated() throws MessagingException {
474         if (isTerminated()) {
475             throw new MessagingException("The Exchange is terminated.");
476         }
477     }
478
479     /**
480      * Check that the exchange MEP allows the specified reference. This only
481      * works for the JBI predefined MEPs. It check that reference is IN or OUT.
482      * Other reference or MEP will cause an exception
483      *
484      * @param name
485      * @throws MessagingException
486      * the MEP does not allow the reference
487      */

488     protected void checkPatternMatching(String JavaDoc name) throws MessagingException {
489         if (pattern == null) {
490             throw new MessagingException("The MEP is not defined.");
491         }
492
493         if (name == null) {
494             throw new MessagingException("The reference name is not defined.");
495         }
496
497         if (IN_MSG.equalsIgnoreCase(name)) {
498             if (IN_ONLY_PATTERN.equals(pattern)
499                 || ROBUST_IN_ONLY_PATTERN.equals(pattern)
500                 || IN_OUT_PATTERN.equals(pattern)
501                 || IN_OPTIONAL_OUT_PATTERN.equals(pattern)) {
502                 return;
503             }
504         } else if (OUT_MSG.equalsIgnoreCase(name)
505             && ((IN_OUT_PATTERN.equals(pattern) || IN_OPTIONAL_OUT_PATTERN
506                 .equals(pattern)))) {
507             return;
508         }
509         throw new MessagingException(
510             "the MessageExchange state does not allow this operation.");
511     }
512
513     /**
514      * Check that the Role allows the specified reference. This only works for
515      * the JBI predefined MEPs. It check that reference is IN or OUT and the
516      * Role is CONSUMER or PROVIDER. Other reference will cause an exception
517      *
518      * @param name
519      * @throws MessagingException
520      * the Role does not allow the reference
521      */

522     protected void checkRoleMatching(String JavaDoc name) throws MessagingException {
523         if (role == null) {
524             throw new MessagingException("the Role is not defined.");
525         }
526
527         if (name == null) {
528             throw new MessagingException("The reference name is not defined.");
529         }
530
531         if (MessageExchange.Role.CONSUMER.equals(role)) {
532             if (IN_MSG.equalsIgnoreCase(name)) {
533                 return;
534             }
535         } else if (MessageExchange.Role.PROVIDER.equals(role)
536             && OUT_MSG.equalsIgnoreCase(name)) {
537             return;
538         }
539         throw new MessagingException("The Role does not allow this operation.");
540     }
541     
542     /**
543      * Depending on the state of the Exchange,
544      * clean IN and/or OUT normalized messages
545      * to reduce exchange size.
546      * Status DONE, ERROR : all messages are cleared
547      * Fault message : all other messages are cleared
548      * OUT message set : IN message is cleared
549      */

550     public void cleanMessages(){
551         // if status is DONE or ERROR, do not send
552
// Normalized messages
553
if(! ExchangeStatus.ACTIVE.equals(status)){
554             messages.clear();
555             fault=null;
556         }
557         // if there is a fault, no other messages are kept
558
if(fault != null){
559             messages.clear();
560         }
561         // if OUT response, IN msg is not kept
562
if(messages.containsKey(OUT_MSG) && messages.containsKey(IN_MSG)){
563             messages.remove(IN_MSG);
564         }
565     }
566
567     /**
568      * @see MessageExchangeImpl#readObject(ObjectInputStream)
569      */

570     protected void readObjectDelegate(ObjectInputStream JavaDoc s) throws IOException JavaDoc {
571         // Role deserialization
572
switch (s.readShort()) {
573             case 0:
574                 role = MessageExchange.Role.CONSUMER;
575                 break;
576             case 1:
577                 role = MessageExchange.Role.PROVIDER;
578                 break;
579             default:
580                 break;
581         }
582
583         // Status deserialization
584
switch (s.readShort()) {
585             case 0:
586                 status = ExchangeStatus.ACTIVE;
587                 break;
588             case 1:
589                 status = ExchangeStatus.DONE;
590                 break;
591             case 2:
592                 status = ExchangeStatus.ERROR;
593                 break;
594             default:
595                 break;
596         }
597
598         try {
599             s.defaultReadObject();
600         } catch (ClassNotFoundException JavaDoc e) {
601             throw new IOException JavaDoc(e.getClass() + ":" + e.getMessage());
602         }
603     }
604
605     /**
606      * @see MessageExchangeImpl#writeObject(ObjectOutputStream)
607      */

608     protected void writeObjectDelegate(ObjectOutputStream JavaDoc s) throws IOException JavaDoc {
609         // Role serialization
610
if (MessageExchange.Role.CONSUMER.equals(role)) {
611             s.writeShort(SERIALIZE_ROLE_CONSUMER);
612         } else {
613             s.writeShort(SERIALIZE_ROLE_PROVIDER);
614         }
615
616         // Status serialization
617
if (ExchangeStatus.ACTIVE.equals(status)) {
618             s.writeShort(SERIALIZE_STATUS_ACTIVE);
619         } else if (ExchangeStatus.DONE.equals(status)) {
620             s.writeShort(SERIALIZE_STATUS_DONE);
621         } else {
622             s.writeShort(SERIALIZE_STATUS_ERROR);
623         }
624         s.defaultWriteObject();
625     }
626
627     /**
628      * Used to deserialize Status and Role objects
629      *
630      * @param s
631      * @throws IOException
632      */

633     private void readObject(ObjectInputStream JavaDoc s) throws IOException JavaDoc {
634         readObjectDelegate(s);
635     }
636
637     /**
638      * Used to serialize Status and Role objects
639      *
640      * @param s
641      * @throws IOException
642      */

643     private void writeObject(ObjectOutputStream JavaDoc s) throws IOException JavaDoc {
644         writeObjectDelegate(s);
645     }
646     
647     /**
648      * Used to clone all fields of MessageExchangeImpl except :
649      * <br/> the NormalizedMessages contained in the Map of messages
650      * <br/> the Fault object
651      * <br/> qnames...
652      */

653     public synchronized MessageExchangeImpl clone(){
654         MessageExchangeImpl msg = null;
655         try{
656             msg = (MessageExchangeImpl) super.clone();
657             msg.consumerEndpoint = (ConsumerEndpoint)consumerEndpoint.clone();
658         } catch (CloneNotSupportedException JavaDoc e) {
659             msg = null;
660         }
661         return msg;
662     }
663 }
664
Popular Tags