KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > eip > patterns > MessageFilter


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.eip.patterns;
18
19 import javax.jbi.management.DeploymentException;
20 import javax.jbi.messaging.ExchangeStatus;
21 import javax.jbi.messaging.Fault;
22 import javax.jbi.messaging.InOnly;
23 import javax.jbi.messaging.MessageExchange;
24 import javax.jbi.messaging.NormalizedMessage;
25 import javax.jbi.messaging.RobustInOnly;
26
27 import org.apache.servicemix.eip.EIPEndpoint;
28 import org.apache.servicemix.eip.support.ExchangeTarget;
29 import org.apache.servicemix.eip.support.Predicate;
30 import org.apache.servicemix.jbi.util.MessageUtil;
31
32 /**
33  * MessageFilter allows filtering incoming JBI exchanges.
34  * This component implements the
35  * <a HREF="http://www.enterpriseintegrationpatterns.com/Filter.html">Message Filter</a>
36  * pattern.
37  *
38  * @author gnodet
39  * @version $Revision: 376451 $
40  * @org.apache.xbean.XBean element="message-filter"
41  * description="A Message Filter"
42  */

43 public class MessageFilter extends EIPEndpoint {
44
45     /**
46      * The main target destination which will receive the exchange
47      */

48     private ExchangeTarget target;
49     /**
50      * The filter to use on incoming messages
51      */

52     private Predicate filter;
53     /**
54      * The correlation property used by this component
55      */

56     private String JavaDoc correlation;
57     /**
58      * Indicates if faults and errors from recipients should be sent
59      * back to the consumer. In such a case, only the first fault or
60      * error received will be reported.
61      * Note that if the consumer is synchronous, it will be blocked
62      * until all recipients successfully acked the exchange, or
63      * a fault or error is reported, and the exchange will be kept in the
64      * store for recovery.
65      */

66     private boolean reportErrors;
67     
68     /**
69      * @return Returns the target.
70      */

71     public ExchangeTarget getTarget() {
72         return target;
73     }
74
75     /**
76      * @param target The target to set.
77      */

78     public void setTarget(ExchangeTarget target) {
79         this.target = target;
80     }
81
82     /**
83      * @return Returns the filter.
84      */

85     public Predicate getFilter() {
86         return filter;
87     }
88
89     /**
90      * @param filter The filter to set.
91      */

92     public void setFilter(Predicate filter) {
93         this.filter = filter;
94     }
95
96     /**
97      * @return Returns the reportErrors.
98      */

99     public boolean isReportErrors() {
100         return reportErrors;
101     }
102
103     /**
104      * @param reportErrors The reportErrors to set.
105      */

106     public void setReportErrors(boolean reportErrors) {
107         this.reportErrors = reportErrors;
108     }
109
110     /* (non-Javadoc)
111      * @see org.apache.servicemix.eip.EIPEndpoint#validate()
112      */

113     public void validate() throws DeploymentException {
114         super.validate();
115         // Check target
116
if (target == null) {
117             throw new IllegalArgumentException JavaDoc("target should be set to a valid ExchangeTarget");
118         }
119         // Check filter
120
if (filter == null) {
121             throw new IllegalArgumentException JavaDoc("filter property should be set");
122         }
123         // Create correlation property
124
correlation = "MessageFilter.Correlation." + getService() + "." + getEndpoint();
125     }
126
127     /* (non-Javadoc)
128      * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
129      */

130     protected void processSync(MessageExchange exchange) throws Exception JavaDoc {
131         if (exchange instanceof InOnly == false &&
132             exchange instanceof RobustInOnly == false) {
133             fail(exchange, new UnsupportedOperationException JavaDoc("Use an InOnly or RobustInOnly MEP"));
134         } else {
135             NormalizedMessage in = MessageUtil.copyIn(exchange);
136             MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
137             target.configureTarget(me, getContext());
138             MessageUtil.transferToIn(in, me);
139             if (filter.matches(me)) {
140                 sendSync(me);
141                 if (me.getStatus() == ExchangeStatus.ERROR && reportErrors) {
142                     fail(exchange, me.getError());
143                 } else if (me.getStatus() == ExchangeStatus.DONE) {
144                     done(exchange);
145                 } else if (me.getFault() != null && reportErrors) {
146                     Fault fault = MessageUtil.copyFault(me);
147                     done(me);
148                     MessageUtil.transferToFault(fault, exchange);
149                     sendSync(exchange);
150                 }
151             } else {
152                 done(exchange);
153             }
154         }
155     }
156
157     /* (non-Javadoc)
158      * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
159      */

160     protected void processAsync(MessageExchange exchange) throws Exception JavaDoc {
161         // If we need to report errors, the behavior is really different,
162
// as we need to keep the incoming exchange in the store until
163
// all acks have been received
164
if (reportErrors) {
165             // TODO: implement this
166
throw new UnsupportedOperationException JavaDoc("Not implemented");
167         // We are in a simple fire-and-forget behaviour.
168
// This implementation is really efficient as we do not use
169
// the store at all.
170
} else {
171             if (exchange.getStatus() == ExchangeStatus.DONE) {
172                 return;
173             } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
174                 return;
175             } else if (exchange instanceof InOnly == false &&
176                        exchange instanceof RobustInOnly == false) {
177                 fail(exchange, new UnsupportedOperationException JavaDoc("Use an InOnly or RobustInOnly MEP"));
178             } else if (exchange.getFault() != null) {
179                 done(exchange);
180             } else {
181                 NormalizedMessage in = MessageUtil.copyIn(exchange);
182                 MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
183                 target.configureTarget(me, getContext());
184                 MessageUtil.transferToIn(in, me);
185                 if (filter.matches(me)) {
186                     send(me);
187                 }
188                 done(exchange);
189             }
190         }
191     }
192
193 }
194
Popular Tags