KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jbi > messaging > MessageExchangeImpl


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jbi.messaging;
18
19 import org.apache.commons.logging.Log;
20 import org.apache.commons.logging.LogFactory;
21 import org.apache.servicemix.JbiConstants;
22 import org.apache.servicemix.jbi.container.ActivationSpec;
23 import org.apache.servicemix.jbi.framework.ComponentContextImpl;
24 import org.apache.servicemix.jbi.framework.ComponentNameSpace;
25 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
26 import org.w3c.dom.Node JavaDoc;
27
28 import javax.jbi.messaging.ExchangeStatus;
29 import javax.jbi.messaging.Fault;
30 import javax.jbi.messaging.MessageExchange;
31 import javax.jbi.messaging.MessagingException;
32 import javax.jbi.messaging.NormalizedMessage;
33 import javax.jbi.servicedesc.ServiceEndpoint;
34 import javax.transaction.Transaction JavaDoc;
35 import javax.xml.namespace.QName JavaDoc;
36 import javax.xml.transform.dom.DOMSource JavaDoc;
37
38 import java.io.Externalizable JavaDoc;
39 import java.io.IOException JavaDoc;
40 import java.io.ObjectInput JavaDoc;
41 import java.io.ObjectOutput JavaDoc;
42 import java.net.URI JavaDoc;
43 import java.util.Set JavaDoc;
44
45 /**
46  * A simple message exchange declaration. This is partial, just giving us enough ME function for the doodle. This
47  * doesn't add anything new to the current MessageExchange definition.
48  *
49  * @version $Revision: 434178 $
50  */

51 public abstract class MessageExchangeImpl implements MessageExchange, Externalizable JavaDoc {
52
53     public static final int SYNC_STATE_ASYNC = 0;
54     public static final int SYNC_STATE_SYNC_SENT = 1;
55     public static final int SYNC_STATE_SYNC_RECEIVED = 2;
56     
57     /**
58      * Exchange is not transactional
59      */

60     public static final int TX_STATE_NONE = 0;
61     /**
62      * Exchange has been enlisted in the current transaction.
63      * This means that the transaction must be commited for
64      * the exchange to be delivered.
65      */

66     public static final int TX_STATE_ENLISTED = 1;
67     /**
68      * Transaction is being conveyed by the exchange.
69      * The transaction context will be given to the
70      * target component.
71      */

72     public static final int TX_STATE_CONVEYED = 2;
73
74     protected static final int CAN_SET_IN_MSG = 0x00000001;
75     protected static final int CAN_SET_OUT_MSG = 0x00000002;
76     protected static final int CAN_SET_FAULT_MSG = 0x00000004;
77     protected static final int CAN_PROVIDER = 0x00000008;
78     protected static final int CAN_CONSUMER = 0x00000000;
79     protected static final int CAN_SEND = 0x00000010;
80     protected static final int CAN_STATUS_ACTIVE = 0x00000040;
81     protected static final int CAN_STATUS_DONE = 0x00000080;
82     protected static final int CAN_STATUS_ERROR = 0x00000100;
83     protected static final int CAN_OWNER = 0x00000200;
84
85     protected static final int STATES_CANS = 0;
86     protected static final int STATES_NEXT_OUT = 1;
87     protected static final int STATES_NEXT_FAULT = 2;
88     protected static final int STATES_NEXT_ERROR = 3;
89     protected static final int STATES_NEXT_DONE = 4;
90     
91     public static final String JavaDoc FAULT = "fault";
92     public static final String JavaDoc IN = "in";
93     public static final String JavaDoc OUT = "out";
94     
95     private static final long serialVersionUID = -3639175136897005605L;
96     
97     private static final Log log = LogFactory.getLog(MessageExchangeImpl.class);
98     
99     protected ComponentContextImpl sourceContext;
100     protected ExchangePacket packet;
101     protected PojoMarshaler marshaler;
102     protected int state;
103     protected int syncState = SYNC_STATE_ASYNC;
104     protected int txState = TX_STATE_NONE;
105     protected int[][] states;
106     protected MessageExchangeImpl mirror;
107     protected transient boolean pushDeliver;
108     protected transient Object JavaDoc txLock;
109
110     /**
111      * Constructor
112      * @param exchangeId
113      * @param pattern
114      */

115     public MessageExchangeImpl(String JavaDoc exchangeId, URI JavaDoc pattern, int[][] states) {
116         this.states = states;
117         this.packet = new ExchangePacket();
118         this.packet.setExchangeId(exchangeId);
119         this.packet.setPattern(pattern);
120     }
121     
122     protected MessageExchangeImpl(ExchangePacket packet, int[][] states) {
123         this.states = states;
124         this.packet = packet;
125     }
126     
127     protected MessageExchangeImpl() {
128     }
129     
130     protected void copyFrom(MessageExchangeImpl me) {
131         if (this != me) {
132             this.packet = me.packet;
133             this.state = me.state;
134             this.mirror.packet = me.packet;
135             this.mirror.state = me.mirror.state;
136         }
137     }
138     
139     protected boolean can(int c) {
140         return (this.states[state][STATES_CANS] & c) == c;
141     }
142
143     /**
144      * Returns the activation spec that was provided when the component was registered
145      * @return the spec
146      */

147     public ActivationSpec getActivationSpec() {
148         if (sourceContext != null) {
149             return sourceContext.getActivationSpec();
150         }
151         return null;
152     }
153
154     /**
155      * Returns the context which created the message exchange which can then be used for routing
156      * @return the context
157      */

158     public ComponentContextImpl getSourceContext() {
159         return sourceContext;
160     }
161
162     /**
163      * Set the context
164      * @param sourceContext
165      */

166     public void setSourceContext(ComponentContextImpl sourceContext) {
167         this.sourceContext = sourceContext;
168         this.mirror.sourceContext = sourceContext;
169     }
170
171     /**
172      * @return the packet
173      */

174     public ExchangePacket getPacket(){
175         return packet;
176     }
177     
178     /**
179      * @return URI of pattenr exchange
180      */

181     public URI JavaDoc getPattern() {
182         return packet.getPattern();
183     }
184
185     /**
186      * @return the exchange Id
187      */

188     public String JavaDoc getExchangeId() {
189         return packet.getExchangeId();
190     }
191
192     /**
193      * @return the processing status of the exchange
194      */

195     public ExchangeStatus getStatus() {
196         if (this.packet.isAborted()) {
197             return ExchangeStatus.ERROR;
198         }
199         return this.packet.getStatus();
200     }
201
202     /**
203      * set the processing status
204      *
205      * @param exchangeStatus
206      * @throws MessagingException
207      */

208     public void setStatus(ExchangeStatus exchangeStatus) throws MessagingException {
209         if (!can(CAN_OWNER)) {
210             throw new IllegalStateException JavaDoc("component is not owner");
211         }
212         this.packet.setStatus(exchangeStatus);
213
214     }
215
216     /**
217      * set the source of a failure
218      *
219      * @param exception
220      */

221     public void setError(Exception JavaDoc exception) {
222         if (!can(CAN_OWNER)) {
223             throw new IllegalStateException JavaDoc("component is not owner");
224         }
225         this.packet.setError(exception);
226     }
227
228     /**
229      * @return the exception describing a processing error
230      */

231     public Exception JavaDoc getError() {
232         return packet.getError();
233     }
234
235     /**
236      * @return the fault message for an exchange
237      */

238     public Fault getFault() {
239         return packet.getFault();
240     }
241
242     /**
243      * set the fault message for the exchange
244      *
245      * @param fault
246      * @throws MessagingException
247      */

248     public void setFault(Fault fault) throws MessagingException {
249         setMessage(fault, FAULT);
250     }
251
252     /**
253      * @return a new message
254      * @throws MessagingException
255      */

256     public NormalizedMessage createMessage() throws MessagingException {
257         return new NormalizedMessageImpl(this);
258     }
259
260     /**
261      * factory method for fault objects
262      *
263      * @return a new fault
264      * @throws MessagingException
265      */

266     public Fault createFault() throws MessagingException {
267         return new FaultImpl();
268     }
269
270     /**
271      * get a NormalizedMessage based on the message reference
272      *
273      * @param name
274      * @return a NormalizedMessage
275      */

276     public NormalizedMessage getMessage(String JavaDoc name) {
277         if (IN.equals(name)) {
278             return packet.getIn();
279         } else if (OUT.equals(name)) {
280             return packet.getOut();
281         } else if (FAULT.equals(name)) {
282             return packet.getFault();
283         } else {
284             return null;
285         }
286     }
287
288     /**
289      * set a NormalizedMessage with a named reference
290      *
291      * @param message
292      * @param name
293      * @throws MessagingException
294      */

295     public void setMessage(NormalizedMessage message, String JavaDoc name) throws MessagingException {
296         if (!can(CAN_OWNER)) {
297             throw new IllegalStateException JavaDoc("component is not owner");
298         }
299         if (message == null) {
300             throw new IllegalArgumentException JavaDoc("message should not be null");
301         }
302         if (name == null) {
303             throw new IllegalArgumentException JavaDoc("name should not be null");
304         }
305         name = name.toLowerCase();
306         if (IN.equals(name)) {
307             if (!can(CAN_SET_IN_MSG)) {
308                 throw new MessagingException("In not supported");
309             }
310             if (packet.getIn() != null) {
311                 throw new MessagingException("In message is already set");
312             }
313             ((NormalizedMessageImpl) message).exchange = this;
314             packet.setIn((NormalizedMessageImpl) message);
315         } else if (OUT.equals(name)) {
316             if (!can(CAN_SET_OUT_MSG)) {
317                 throw new MessagingException("Out not supported");
318             }
319             if (packet.getOut() != null) {
320                 throw new MessagingException("Out message is already set");
321             }
322             ((NormalizedMessageImpl) message).exchange = this;
323             packet.setOut((NormalizedMessageImpl) message);
324         } else if (FAULT.equals(name)) {
325             if (!can(CAN_SET_FAULT_MSG)) {
326                 throw new MessagingException("Fault not supported");
327             }
328             if (!(message instanceof Fault)) {
329                 throw new MessagingException("Setting fault, but message is not a fault");
330             }
331             if (packet.getFault() != null) {
332                 throw new MessagingException("Fault message is already set");
333             }
334             ((NormalizedMessageImpl) message).exchange = this;
335             packet.setFault((FaultImpl) message);
336         } else {
337             throw new MessagingException("Message name must be in, out or fault");
338         }
339     }
340
341     /**
342      * @param name
343      * @return the proerty from the exchange
344      */

345     public Object JavaDoc getProperty(String JavaDoc name) {
346         if (JTA_TRANSACTION_PROPERTY_NAME.equals(name)) {
347             return packet.getTransactionContext();
348         } else if (JbiConstants.PERSISTENT_PROPERTY_NAME.equals(name)) {
349             return packet.getPersistent();
350         } else {
351             return packet.getProperty(name);
352         }
353     }
354
355     /**
356      * set a named property on the exchange
357      *
358      * @param name
359      * @param value
360      */

361     public void setProperty(String JavaDoc name, Object JavaDoc value) {
362         if (!can(CAN_OWNER)) {
363             throw new IllegalStateException JavaDoc("component is not owner");
364         }
365         if (name == null) {
366             throw new IllegalArgumentException JavaDoc("name should not be null");
367         }
368         if (JTA_TRANSACTION_PROPERTY_NAME.equals(name)) {
369             packet.setTransactionContext((Transaction JavaDoc) value);
370         } else if (JbiConstants.PERSISTENT_PROPERTY_NAME.equals(name)) {
371             packet.setPersistent((Boolean JavaDoc) value);
372         } else {
373             packet.setProperty(name, value);
374         }
375     }
376     
377     /**
378      * @return property names
379      */

380     public Set JavaDoc getPropertyNames(){
381         return packet.getPropertyNames();
382     }
383
384     /**
385      * Set an endpoint
386      *
387      * @param endpoint
388      */

389     public void setEndpoint(ServiceEndpoint endpoint) {
390         packet.setEndpoint(endpoint);
391     }
392
393     /**
394      * set a service
395      *
396      * @param name
397      */

398     public void setService(QName JavaDoc name) {
399         packet.setServiceName(name);
400     }
401
402     /**
403      * set an operation
404      *
405      * @param name
406      */

407     public void setOperation(QName JavaDoc name) {
408         packet.setOperationName(name);
409     }
410     
411     /**
412      * set an interface
413      *
414      * @param name
415      */

416     public void setInterfaceName(QName JavaDoc name) {
417         packet.setInterfaceName(name);
418     }
419
420     /**
421      * @return the endpoint
422      */

423     public ServiceEndpoint getEndpoint() {
424         return packet.getEndpoint();
425     }
426
427     /**
428      * @return the service
429      */

430     public QName JavaDoc getService() {
431         return packet.getServiceName();
432     }
433     
434     /**
435      * @return the interface name
436      */

437     public QName JavaDoc getInterfaceName() {
438         return packet.getInterfaceName();
439     }
440
441     /**
442      * @return the operation
443      */

444     public QName JavaDoc getOperation() {
445         return packet.getOperationName();
446     }
447
448     /**
449      * @return the transaction context
450      */

451     public Transaction JavaDoc getTransactionContext() {
452         return packet.getTransactionContext();
453     }
454
455     /**
456      * set the transaction
457      *
458      * @param transaction
459      * @throws MessagingException
460      */

461     public void setTransactionContext(Transaction JavaDoc transaction) throws MessagingException {
462         packet.setTransactionContext(transaction);
463     }
464
465     /**
466      * @return true if transacted
467      */

468     public boolean isTransacted() {
469         return this.packet.getTransactionContext() != null;
470     }
471
472     /**
473      * @return the Role of this exchange
474      */

475     public Role getRole() {
476         return can(CAN_PROVIDER) ? Role.PROVIDER : Role.CONSUMER;
477     }
478
479     /**
480      * @return the in message
481      */

482     public NormalizedMessage getInMessage() {
483         return this.packet.getIn();
484     }
485
486     /**
487      * set the in message
488      *
489      * @param message
490      * @throws MessagingException
491      */

492     public void setInMessage(NormalizedMessage message) throws MessagingException {
493         setMessage(message, IN);
494     }
495
496     /**
497      * @return the out message
498      */

499     public NormalizedMessage getOutMessage() {
500         return getMessage(OUT);
501     }
502
503     /**
504      * set the out message
505      *
506      * @param message
507      * @throws MessagingException
508      */

509     public void setOutMessage(NormalizedMessage message) throws MessagingException {
510         setMessage(message, OUT);
511     }
512     
513     /**
514      * @return Returns the sourceId.
515      */

516     public ComponentNameSpace getSourceId() {
517         return packet.getSourceId();
518     }
519     /**
520      * @param sourceId The sourceId to set.
521      */

522     public void setSourceId(ComponentNameSpace sourceId) {
523         packet.setSourceId(sourceId);
524     }
525     
526     /**
527      * @return Returns the destinationId.
528      */

529     public ComponentNameSpace getDestinationId() {
530         return packet.getDestinationId();
531     }
532     /**
533      * @param destinationId The destinationId to set.
534      */

535     public void setDestinationId(ComponentNameSpace destinationId) {
536         packet.setDestinationId(destinationId);
537     }
538     
539     public Boolean JavaDoc getPersistent() {
540         return packet.getPersistent();
541     }
542     
543     public void setPersistent(Boolean JavaDoc persistent) {
544         packet.setPersistent(persistent);
545     }
546
547
548     public PojoMarshaler getMarshaler() {
549         if (marshaler == null) {
550             marshaler = new DefaultMarshaler();
551         }
552         return marshaler;
553     }
554
555     public void setMarshaler(PojoMarshaler marshaler) {
556         this.marshaler = marshaler;
557     }
558
559     public abstract void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc;
560     
561     public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
562         packet.writeExternal(out);
563         out.write(state);
564         out.write(mirror.state);
565         out.writeBoolean(can(CAN_PROVIDER));
566     }
567     
568     public void handleSend(boolean sync) throws MessagingException {
569         // Check if send / sendSync is legal
570
if (!can(CAN_SEND)) {
571             throw new MessagingException("illegal call to send / sendSync");
572         }
573         if (sync && getStatus() != ExchangeStatus.ACTIVE) {
574             throw new MessagingException("illegal call to sendSync");
575         }
576         this.syncState = sync ? SYNC_STATE_SYNC_SENT : SYNC_STATE_ASYNC;
577         // Check status
578
ExchangeStatus status = getStatus();
579         if (status == ExchangeStatus.ACTIVE && !can(CAN_STATUS_ACTIVE)) {
580             throw new MessagingException("illegal exchange status: active");
581         }
582         if (status == ExchangeStatus.DONE && !can(CAN_STATUS_DONE)) {
583             throw new MessagingException("illegal exchange status: done");
584         }
585         if (status == ExchangeStatus.ERROR && !can(CAN_STATUS_ERROR)) {
586             throw new MessagingException("illegal exchange status: error");
587         }
588         // Check message
589
// Change state
590
if (status == ExchangeStatus.ACTIVE && packet.getFault() == null) {
591             this.state = this.states[this.state][STATES_NEXT_OUT];
592         } else if (status == ExchangeStatus.ACTIVE && packet.getFault() != null) {
593             this.state = this.states[this.state][STATES_NEXT_FAULT];
594         } else if (status == ExchangeStatus.ERROR) {
595             this.state = this.states[this.state][STATES_NEXT_ERROR];
596         } else if (status == ExchangeStatus.DONE) {
597             this.state = this.states[this.state][STATES_NEXT_DONE];
598         } else {
599             throw new IllegalStateException JavaDoc("unknown status");
600         }
601         if (this.state < 0 || this.state >= this.states.length) {
602             throw new IllegalStateException JavaDoc("next state is illegal");
603         }
604     }
605
606     public void handleAccept() throws MessagingException {
607         // Change state
608
ExchangeStatus status = getStatus();
609         int nextState;
610         if (status == ExchangeStatus.ACTIVE && packet.getFault() == null) {
611             nextState = this.states[this.state][STATES_NEXT_OUT];
612         } else if (status == ExchangeStatus.ACTIVE && packet.getFault() != null) {
613             nextState = this.states[this.state][STATES_NEXT_FAULT];
614         } else if (status == ExchangeStatus.ERROR) {
615             nextState = this.states[this.state][STATES_NEXT_ERROR];
616         } else if (status == ExchangeStatus.DONE) {
617             nextState = this.states[this.state][STATES_NEXT_DONE];
618         } else {
619             throw new IllegalStateException JavaDoc("unknown status");
620         }
621         if (nextState < 0 || nextState >= this.states.length) {
622             throw new IllegalStateException JavaDoc("next state is illegal");
623         }
624         this.state = nextState;
625     }
626
627     public MessageExchangeImpl getMirror() {
628         return mirror;
629     }
630
631     public int getSyncState() {
632         return syncState;
633     }
634
635     public void setSyncState(int syncState) {
636         this.syncState = syncState;
637     }
638     
639     /**
640      * @return the txState
641      */

642     public int getTxState() {
643         return txState;
644     }
645
646     /**
647      * @param txState the txState to set
648      */

649     public void setTxState(int txState) {
650         this.txState = txState;
651     }
652
653     public boolean isPushDelivery() {
654         return this.pushDeliver;
655     }
656     
657     public void setPushDeliver(boolean b) {
658         this.pushDeliver = true;
659     }
660     
661     /**
662      * @return the txLock
663      */

664     public Object JavaDoc getTxLock() {
665         return txLock;
666     }
667
668     /**
669      * @param txLock the txLock to set
670      */

671     public void setTxLock(Object JavaDoc txLock) {
672         this.txLock = txLock;
673     }
674
675     public String JavaDoc toString() {
676         try {
677             StringBuffer JavaDoc sb = new StringBuffer JavaDoc();
678             String JavaDoc name = getClass().getName();
679             name = name.substring(name.lastIndexOf('.') + 1, name.length() - 4);
680             sb.append(name);
681             sb.append("[\n");
682             sb.append(" id: ").append(getExchangeId()).append('\n');
683             sb.append(" status: ").append(getStatus()).append('\n');
684             sb.append(" role: ").append(getRole() == Role.CONSUMER ? "consumer" : "provider").append('\n');
685             if (getInterfaceName() != null) {
686                 sb.append(" interface: ").append(getInterfaceName()).append('\n');
687             }
688             if (getService() != null) {
689                 sb.append(" service: ").append(getService()).append('\n');
690             }
691             if (getEndpoint() != null) {
692                 sb.append(" endpoint: ").append(getEndpoint().getEndpointName()).append('\n');
693             }
694             if (getOperation() != null) {
695                 sb.append(" operation: ").append(getOperation()).append('\n');
696             }
697             SourceTransformer st = new SourceTransformer();
698             display("in", sb, st);
699             display("out", sb, st);
700             display("fault", sb, st);
701             if (getError() != null) {
702                 sb.append(" error: ");
703                 sb.append(getError());
704                 sb.append('\n');
705             }
706             sb.append("]");
707             return sb.toString();
708         } catch (Exception JavaDoc e) {
709             log.trace("Error caught in toString", e);
710             return super.toString();
711         }
712     }
713
714     public static final int maxMsgDisplaySize = 1500;
715
716     public static final boolean preserveContent = Boolean.getBoolean("org.apache.servicemix.preserveContent");
717
718     private void display(String JavaDoc msg, StringBuffer JavaDoc sb, SourceTransformer st) {
719         if (getMessage(msg) != null) {
720             sb.append(" ").append(msg).append(": ");
721             try {
722                 if (getMessage(msg).getContent() != null) {
723                     if (preserveContent) {
724                         sb.append(getMessage(msg).getContent().getClass());
725                     } else {
726                         Node JavaDoc node = st.toDOMNode(getMessage(msg).getContent());
727                         getMessage(msg).setContent(new DOMSource JavaDoc(node));
728                         String JavaDoc str = st.toString(node);
729                         if (str.length() > maxMsgDisplaySize) {
730                             sb.append(str.substring(0, maxMsgDisplaySize)).append("...");
731                         } else {
732                             sb.append(str);
733                         }
734                     }
735                 } else {
736                     sb.append("null");
737                 }
738             } catch (Exception JavaDoc e) {
739                 sb.append("Unable to display: ").append(e);
740             }
741             sb.append('\n');
742         }
743     }
744
745 }
Popular Tags