KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > eip > support > AbstractContentBasedRouter


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.eip.support;
18
19 import javax.jbi.management.DeploymentException;
20 import javax.jbi.messaging.ExchangeStatus;
21 import javax.jbi.messaging.Fault;
22 import javax.jbi.messaging.MessageExchange;
23 import javax.jbi.messaging.NormalizedMessage;
24
25 import org.apache.servicemix.JbiConstants;
26 import org.apache.servicemix.eip.EIPEndpoint;
27 import org.apache.servicemix.jbi.util.MessageUtil;
28 import org.apache.servicemix.store.Store;
29
30 /**
31  * AbstractContentBasedRouter can be used as a base class for content-based routing.
32  * This component implements the
33  * <a HREF="http://www.enterpriseintegrationpatterns.com/ContentBasedRouter.html">Content-Based Router</a>
34  * pattern.
35  *
36  * @author gnodet
37  * @version $Revision: 376451 $
38  */

39 public abstract class AbstractContentBasedRouter extends EIPEndpoint {
40
41     /**
42      * The correlation property used by this component
43      */

44     private String JavaDoc correlation;
45     
46     /* (non-Javadoc)
47      * @see org.apache.servicemix.eip.EIPEndpoint#validate()
48      */

49     public void validate() throws DeploymentException {
50         super.validate();
51         // Create correlation property
52
correlation = "AbstractContentBasedRouter.Correlation." + getService() + "." + getEndpoint();
53     }
54
55     /* (non-Javadoc)
56      * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
57      */

58     protected void processSync(MessageExchange exchange) throws Exception JavaDoc {
59         // Create exchange for target
60
MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
61         // Now copy input to new exchange
62
// We need to read the message once for finding routing target
63
// so ensure we have a re-readable source
64
NormalizedMessage in = MessageUtil.copyIn(exchange);
65         MessageUtil.transferToIn(in, tme);
66         // Retrieve target
67
ExchangeTarget target = getDestination(tme);
68         target.configureTarget(tme, getContext());
69         // Send in to target
70
sendSync(tme);
71         // Send back the result
72
if (tme.getStatus() == ExchangeStatus.DONE) {
73             done(exchange);
74         } else if (tme.getStatus() == ExchangeStatus.ERROR) {
75             fail(exchange, tme.getError());
76         } else if (tme.getFault() != null) {
77             Fault fault = MessageUtil.copyFault(tme);
78             done(tme);
79             MessageUtil.transferToFault(fault, exchange);
80             sendSync(exchange);
81         } else if (tme.getMessage("out") != null) {
82             NormalizedMessage out = MessageUtil.copyOut(tme);
83             done(tme);
84             MessageUtil.transferToOut(out, exchange);
85             sendSync(exchange);
86         } else {
87             done(tme);
88             throw new IllegalStateException JavaDoc("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
89         }
90     }
91
92     /* (non-Javadoc)
93      * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
94      */

95     protected void processAsync(MessageExchange exchange) throws Exception JavaDoc {
96         if (exchange.getRole() == MessageExchange.Role.PROVIDER &&
97             exchange.getProperty(correlation) == null) {
98             // Create exchange for target
99
MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
100             if (store.hasFeature(Store.CLUSTERED)) {
101                 exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
102                 tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
103             }
104             // Set correlations
105
tme.setProperty(correlation, exchange.getExchangeId());
106             exchange.setProperty(correlation, tme.getExchangeId());
107             // Put exchange to store
108
store.store(exchange.getExchangeId(), exchange);
109             // Now copy input to new exchange
110
// We need to read the message once for finding routing target
111
// so ensure we have a re-readable source
112
NormalizedMessage in = MessageUtil.copyIn(exchange);
113             MessageUtil.transferToIn(in, tme);
114             // Retrieve target
115
ExchangeTarget target = getDestination(tme);
116             target.configureTarget(tme, getContext());
117             // Send in to target
118
send(tme);
119         // Mimic the exchange on the other side and send to needed listener
120
} else {
121             String JavaDoc id = (String JavaDoc) exchange.getProperty(correlation);
122             if (id == null) {
123                 throw new IllegalStateException JavaDoc(correlation + " property not found");
124             }
125             MessageExchange org = (MessageExchange) store.load(id);
126             if (org == null) {
127                 throw new IllegalStateException JavaDoc("Could not load original exchange with id " + id);
128             }
129             // Reproduce DONE status to the other side
130
if (exchange.getStatus() == ExchangeStatus.DONE) {
131                 done(org);
132             // Reproduce ERROR status to the other side
133
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
134                 fail(org, exchange.getError());
135             // Reproduce faults to the other side and listeners
136
} else if (exchange.getFault() != null) {
137                 store.store(exchange.getExchangeId(), exchange);
138                 MessageUtil.transferTo(exchange, org, "fault");
139                 send(org);
140             // Reproduce answers to the other side
141
} else if (exchange.getMessage("out") != null) {
142                 store.store(exchange.getExchangeId(), exchange);
143                 MessageUtil.transferTo(exchange, org, "out");
144                 send(org);
145             } else {
146                 throw new IllegalStateException JavaDoc("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
147             }
148         }
149     }
150     
151     /**
152      * Find the target destination for the given JBI exchange
153      * @param exchange
154      * @return the target for the given exchange
155      * @throws Exception
156      */

157     protected abstract ExchangeTarget getDestination(MessageExchange exchange) throws Exception JavaDoc;
158
159 }
160
Popular Tags