KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > eip > patterns > WireTap


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.eip.patterns;
18
19 import javax.jbi.management.DeploymentException;
20 import javax.jbi.messaging.ExchangeStatus;
21 import javax.jbi.messaging.InOnly;
22 import javax.jbi.messaging.MessageExchange;
23 import javax.jbi.messaging.NormalizedMessage;
24
25 import org.apache.servicemix.JbiConstants;
26 import org.apache.servicemix.eip.EIPEndpoint;
27 import org.apache.servicemix.eip.support.ExchangeTarget;
28 import org.apache.servicemix.jbi.util.MessageUtil;
29 import org.apache.servicemix.store.Store;
30
31 /**
32  *
33  * A WireTap component can be used to forward a copy of the input message to a listener.
34  * This component implements the
35  * <a HREF="http://www.enterpriseintegrationpatterns.com/WireTap.html">WireTap</a>
36  * pattern.
37  * It can handle all 4 standard MEPs, but will only send an In-Only MEP to the listener.
38  * In addition, this component is fully asynchronous and uses an exchange store to provide
39  * full HA and recovery for clustered / persistent flows.
40  *
41  * @author gnodet
42  * @version $Revision: 376451 $
43  * @org.apache.xbean.XBean element="wire-tap"
44  * description="A WireTap"
45  */

46 public class WireTap extends EIPEndpoint {
47
48     /**
49      * The main target destination which will receive the exchange
50      */

51     private ExchangeTarget target;
52     /**
53      * The listener destination for in messages
54      */

55     private ExchangeTarget inListener;
56     /**
57      * The listener destination for out messages
58      */

59     private ExchangeTarget outListener;
60     /**
61      * The listener destination for fault messages
62      */

63     private ExchangeTarget faultListener;
64     /**
65      * The correlation property used by this component
66      */

67     private String JavaDoc correlation;
68     
69     /**
70      * @return Returns the target.
71      */

72     public ExchangeTarget getTarget() {
73         return target;
74     }
75
76     /**
77      * @param target The target to set.
78      */

79     public void setTarget(ExchangeTarget target) {
80         this.target = target;
81         this.wsdlExchangeTarget = target;
82     }
83
84     /**
85      * @return Returns the faultListener.
86      */

87     public ExchangeTarget getFaultListener() {
88         return faultListener;
89     }
90
91     /**
92      * @param faultListener The faultListener to set.
93      */

94     public void setFaultListener(ExchangeTarget faultListener) {
95         this.faultListener = faultListener;
96     }
97
98     /**
99      * @return Returns the inListener.
100      */

101     public ExchangeTarget getInListener() {
102         return inListener;
103     }
104
105     /**
106      * @param inListener The inListener to set.
107      */

108     public void setInListener(ExchangeTarget inListener) {
109         this.inListener = inListener;
110     }
111
112     /**
113      * @return Returns the outListener.
114      */

115     public ExchangeTarget getOutListener() {
116         return outListener;
117     }
118
119     /**
120      * @param outListener The outListener to set.
121      */

122     public void setOutListener(ExchangeTarget outListener) {
123         this.outListener = outListener;
124     }
125
126     /* (non-Javadoc)
127      * @see org.apache.servicemix.eip.EIPEndpoint#validate()
128      */

129     public void validate() throws DeploymentException {
130         super.validate();
131         // Check target
132
if (target == null) {
133             throw new IllegalArgumentException JavaDoc("target should be set to a valid ExchangeTarget");
134         }
135         // Create correlation property
136
correlation = "WireTap.Correlation." + getService() + "." + getEndpoint();
137     }
138
139     /* (non-Javadoc)
140      * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
141      */

142     protected void processSync(MessageExchange exchange) throws Exception JavaDoc {
143         // Create exchange for target
144
MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
145         target.configureTarget(tme, getContext());
146         sendSyncToListenerAndTarget(exchange, tme, inListener, "in");
147         if (tme.getStatus() == ExchangeStatus.DONE) {
148             done(exchange);
149         } else if (tme.getStatus() == ExchangeStatus.ERROR) {
150             fail(exchange, tme.getError());
151         } else if (tme.getFault() != null) {
152             sendSyncToListenerAndTarget(tme, exchange, faultListener, "fault");
153             done(tme);
154         } else if (tme.getMessage("out") != null) {
155             sendSyncToListenerAndTarget(tme, exchange, outListener, "out");
156             done(tme);
157         } else {
158             done(tme);
159             throw new IllegalStateException JavaDoc("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
160         }
161     }
162     
163     /* (non-Javadoc)
164      * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
165      */

166     protected void processAsync(MessageExchange exchange) throws Exception JavaDoc {
167         if (exchange.getRole() == MessageExchange.Role.PROVIDER &&
168             exchange.getProperty(correlation) == null) {
169             // Create exchange for target
170
MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
171             if (store.hasFeature(Store.CLUSTERED)) {
172                 exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
173                 tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
174             }
175             target.configureTarget(tme, getContext());
176             // Set correlations
177
exchange.setProperty(correlation, tme.getExchangeId());
178             tme.setProperty(correlation, exchange.getExchangeId());
179             // Put exchange to store
180
store.store(exchange.getExchangeId(), exchange);
181             // Send in to listener and target
182
sendToListenerAndTarget(exchange, tme, inListener, "in");
183         // Mimic the exchange on the other side and send to needed listener
184
} else {
185             String JavaDoc id = (String JavaDoc) exchange.getProperty(correlation);
186             if (id == null) {
187                 if (exchange.getRole() == MessageExchange.Role.CONSUMER &&
188                     exchange.getStatus() != ExchangeStatus.ACTIVE) {
189                     // This must be a listener status, so ignore
190
return;
191                 }
192                 throw new IllegalStateException JavaDoc(correlation + " property not found");
193             }
194             MessageExchange org = (MessageExchange) store.load(id);
195             if (org == null) {
196                 throw new IllegalStateException JavaDoc("Could not load original exchange with id " + id);
197             }
198             // Reproduce DONE status to the other side
199
if (exchange.getStatus() == ExchangeStatus.DONE) {
200                 done(org);
201             // Reproduce ERROR status to the other side
202
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
203                 fail(org, exchange.getError());
204             // Reproduce faults to the other side and listeners
205
} else if (exchange.getFault() != null) {
206                 store.store(exchange.getExchangeId(), exchange);
207                 sendToListenerAndTarget(exchange, org, faultListener, "fault");
208             // Reproduce answers to the other side
209
} else if (exchange.getMessage("out") != null) {
210                 store.store(exchange.getExchangeId(), exchange);
211                 sendToListenerAndTarget(exchange, org, outListener, "out");
212             } else {
213                 throw new IllegalStateException JavaDoc("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
214             }
215         }
216     }
217     
218     private void sendToListenerAndTarget(MessageExchange source,
219                                          MessageExchange dest,
220                                          ExchangeTarget listener,
221                                          String JavaDoc message) throws Exception JavaDoc {
222         if (listener != null) {
223             NormalizedMessage msg = MessageUtil.copy(source.getMessage(message));
224             InOnly lme = exchangeFactory.createInOnlyExchange();
225             if (store.hasFeature(Store.CLUSTERED)) {
226                 lme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
227             }
228             listener.configureTarget(lme, getContext());
229             MessageUtil.transferToIn(msg, lme);
230             send(lme);
231             MessageUtil.transferTo(msg, dest, message);
232             send(dest);
233         } else {
234             MessageUtil.transferTo(source, dest, message);
235             send(dest);
236         }
237     }
238
239     private void sendSyncToListenerAndTarget(MessageExchange source,
240                                              MessageExchange dest,
241                                              ExchangeTarget listener,
242                                              String JavaDoc message) throws Exception JavaDoc {
243         if (listener != null) {
244             NormalizedMessage msg = MessageUtil.copy(source.getMessage(message));
245             InOnly lme = exchangeFactory.createInOnlyExchange();
246             if (store.hasFeature(Store.CLUSTERED)) {
247                 lme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
248             }
249             listener.configureTarget(lme, getContext());
250             MessageUtil.transferToIn(msg, lme);
251             sendSync(lme);
252             MessageUtil.transferTo(msg, dest, message);
253             sendSync(dest);
254         } else {
255             MessageUtil.transferTo(source, dest, message);
256             sendSync(dest);
257         }
258     }
259     
260 }
261
Popular Tags