KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > network > ForwardingBridge


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.network;
19
20 import java.io.IOException JavaDoc;
21
22 import org.apache.activemq.Service;
23 import org.apache.activemq.command.ActiveMQQueue;
24 import org.apache.activemq.command.ActiveMQTopic;
25 import org.apache.activemq.command.BrokerId;
26 import org.apache.activemq.command.BrokerInfo;
27 import org.apache.activemq.command.Command;
28 import org.apache.activemq.command.ConnectionId;
29 import org.apache.activemq.command.ConnectionInfo;
30 import org.apache.activemq.command.ConsumerInfo;
31 import org.apache.activemq.command.ExceptionResponse;
32 import org.apache.activemq.command.Message;
33 import org.apache.activemq.command.MessageAck;
34 import org.apache.activemq.command.MessageDispatch;
35 import org.apache.activemq.command.ProducerInfo;
36 import org.apache.activemq.command.Response;
37 import org.apache.activemq.command.SessionInfo;
38 import org.apache.activemq.command.ShutdownInfo;
39 import org.apache.activemq.transport.DefaultTransportListener;
40 import org.apache.activemq.transport.FutureResponse;
41 import org.apache.activemq.transport.ResponseCallback;
42 import org.apache.activemq.transport.Transport;
43 import org.apache.activemq.util.IdGenerator;
44 import org.apache.activemq.util.ServiceStopper;
45 import org.apache.activemq.util.ServiceSupport;
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48
49 /**
50  * Forwards all messages from the local broker to the remote broker.
51  *
52  * @org.apache.xbean.XBean
53  *
54  * @version $Revision$
55  */

56 public class ForwardingBridge implements Service{
57     
58     static final private Log log = LogFactory.getLog(ForwardingBridge.class);
59
60     private final Transport localBroker;
61     private final Transport remoteBroker;
62     
63     IdGenerator idGenerator = new IdGenerator();
64     ConnectionInfo connectionInfo;
65     SessionInfo sessionInfo;
66     ProducerInfo producerInfo;
67     ConsumerInfo queueConsumerInfo;
68     ConsumerInfo topicConsumerInfo;
69     
70     private String JavaDoc clientId;
71     private int prefetchSize=1000;
72     private boolean dispatchAsync;
73     private String JavaDoc destinationFilter = ">";
74     
75     private int queueDispatched;
76     private int topicDispatched;
77     
78     BrokerId localBrokerId;
79     BrokerId remoteBrokerId;
80     private NetworkBridgeFailedListener bridgeFailedListener;
81
82     public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
83         this.localBroker = localBroker;
84         this.remoteBroker = remoteBroker;
85     }
86
87     public void start() throws Exception JavaDoc {
88         log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established.");
89
90         localBroker.setTransportListener(new DefaultTransportListener(){
91             public void onCommand(Object JavaDoc o) {
92                 Command command = (Command) o;
93                 serviceLocalCommand(command);
94             }
95             public void onException(IOException JavaDoc error) {
96                 serviceLocalException(error);
97             }
98         });
99         
100         remoteBroker.setTransportListener(new DefaultTransportListener(){
101             public void onCommand(Object JavaDoc o) {
102                 Command command = (Command) o;
103                 serviceRemoteCommand(command);
104             }
105             public void onException(IOException JavaDoc error) {
106                 serviceRemoteException(error);
107             }
108         });
109         
110         localBroker.start();
111         remoteBroker.start();
112     }
113
114     protected void triggerStartBridge() throws IOException JavaDoc {
115         Thread JavaDoc thead = new Thread JavaDoc() {
116             public void run() {
117                 try {
118                     startBridge();
119                 }
120                 catch (IOException JavaDoc e) {
121                     log.error("Failed to start network bridge: " + e, e);
122                 }
123             }
124         };
125         thead.start();
126     }
127
128     /**
129      * @throws IOException
130      */

131     private void startBridge() throws IOException JavaDoc {
132         connectionInfo = new ConnectionInfo();
133         connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
134         connectionInfo.setClientId(clientId);
135         localBroker.oneway(connectionInfo);
136         remoteBroker.oneway(connectionInfo);
137
138         sessionInfo=new SessionInfo(connectionInfo, 1);
139         localBroker.oneway(sessionInfo);
140         remoteBroker.oneway(sessionInfo);
141         
142         queueConsumerInfo = new ConsumerInfo(sessionInfo, 1);
143         queueConsumerInfo.setDispatchAsync(dispatchAsync);
144         queueConsumerInfo.setDestination(new ActiveMQQueue(destinationFilter));
145         queueConsumerInfo.setPrefetchSize(prefetchSize);
146         queueConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
147         localBroker.oneway(queueConsumerInfo);
148         
149         producerInfo = new ProducerInfo(sessionInfo, 1);
150         producerInfo.setResponseRequired(false);
151         remoteBroker.oneway(producerInfo);
152         
153         if( connectionInfo.getClientId()!=null ) {
154             topicConsumerInfo = new ConsumerInfo(sessionInfo, 2);
155             topicConsumerInfo.setDispatchAsync(dispatchAsync);
156             topicConsumerInfo.setSubscriptionName("topic-bridge");
157             topicConsumerInfo.setRetroactive(true);
158             topicConsumerInfo.setDestination(new ActiveMQTopic(destinationFilter));
159             topicConsumerInfo.setPrefetchSize(prefetchSize);
160             topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
161             localBroker.oneway(topicConsumerInfo);
162         }
163         log.info("Network connection between " + localBroker + " and " + remoteBroker + " has been established.");
164     }
165     
166     public void stop() throws Exception JavaDoc {
167         try {
168             if( connectionInfo!=null ) {
169                 localBroker.request(connectionInfo.createRemoveCommand());
170                 remoteBroker.request(connectionInfo.createRemoveCommand());
171             }
172             localBroker.setTransportListener(null);
173             remoteBroker.setTransportListener(null);
174             localBroker.oneway(new ShutdownInfo());
175             remoteBroker.oneway(new ShutdownInfo());
176         } finally {
177             ServiceStopper ss = new ServiceStopper();
178             ss.stop(localBroker);
179             ss.stop(remoteBroker);
180             ss.throwFirstException();
181         }
182     }
183     
184     public void serviceRemoteException(Throwable JavaDoc error) {
185         log.info("Unexpected remote exception: "+error);
186         log.debug("Exception trace: ", error);
187     }
188     
189     protected void serviceRemoteCommand(Command command) {
190         try {
191             if(command.isBrokerInfo() ) {
192                 synchronized( this ) {
193                     remoteBrokerId = ((BrokerInfo)command).getBrokerId();
194                     if( localBrokerId !=null) {
195                         if( localBrokerId.equals(remoteBrokerId) ) {
196                             log.info("Disconnecting loop back connection.");
197                             ServiceSupport.dispose(this);
198                         } else {
199                             triggerStartBridge();
200                         }
201                     }
202                 }
203             } else {
204                 log.warn("Unexpected remote command: "+command);
205             }
206         } catch (IOException JavaDoc e) {
207             serviceLocalException(e);
208         }
209     }
210
211     public void serviceLocalException(Throwable JavaDoc error) {
212         log.info("Unexpected local exception: "+error);
213         log.debug("Exception trace: ", error);
214         fireBridgeFailed();
215     }
216     protected void serviceLocalCommand(Command command) {
217         try {
218             if( command.isMessageDispatch() ) {
219                 final MessageDispatch md = (MessageDispatch) command;
220                 Message message = md.getMessage();
221                 message.setProducerId(producerInfo.getProducerId());
222                 
223                 if( message.getOriginalTransactionId()==null )
224                     message.setOriginalTransactionId(message.getTransactionId());
225                 message.setTransactionId(null);
226
227                 
228                 if( !message.isResponseRequired() ) {
229                     
230                     // If the message was originally sent using async send, we will preserve that QOS
231
// by bridging it using an async send (small chance of message loss).
232
remoteBroker.oneway(message);
233                     localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
234                     
235                 } else {
236                     
237                     // The message was not sent using async send, so we should only ack the local
238
// broker when we get confirmation that the remote broker has received the message.
239
ResponseCallback callback = new ResponseCallback() {
240                         public void onCompletion(FutureResponse future) {
241                             try {
242                                 Response response = future.getResult();
243                                 if(response.isException()){
244                                     ExceptionResponse er=(ExceptionResponse) response;
245                                     serviceLocalException(er.getException());
246                                 } else {
247                                     localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
248                                 }
249                             } catch (IOException JavaDoc e) {
250                                 serviceLocalException(e);
251                             }
252                         }
253                     };
254
255                     remoteBroker.asyncRequest(message, callback);
256                 }
257                 
258                                 
259                 // Ack on every message since we don't know if the broker is blocked due to memory
260
// usage and is waiting for an Ack to un-block him.
261

262                 // Acking a range is more efficient, but also more prone to locking up a server
263
// Perhaps doing something like the following should be policy based.
264
// if( md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) ) {
265
// queueDispatched++;
266
// if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2) ) {
267
// localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, queueDispatched));
268
// queueDispatched=0;
269
// }
270
// } else {
271
// topicDispatched++;
272
// if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2) ) {
273
// localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, topicDispatched));
274
// topicDispatched=0;
275
// }
276
// }
277
} else if(command.isBrokerInfo() ) {
278                 synchronized( this ) {
279                     localBrokerId = ((BrokerInfo)command).getBrokerId();
280                     if( remoteBrokerId !=null) {
281                         if( remoteBrokerId.equals(localBrokerId) ) {
282                             log.info("Disconnecting loop back connection.");
283                             ServiceSupport.dispose(this);
284                         } else {
285                             triggerStartBridge();
286                         }
287                     }
288                 }
289             } else {
290                 log.debug("Unexpected local command: "+command);
291             }
292         } catch (IOException JavaDoc e) {
293             serviceLocalException(e);
294         }
295     }
296
297     public String JavaDoc getClientId() {
298         return clientId;
299     }
300     public void setClientId(String JavaDoc clientId) {
301         this.clientId = clientId;
302     }
303
304     public int getPrefetchSize() {
305         return prefetchSize;
306     }
307     public void setPrefetchSize(int prefetchSize) {
308         this.prefetchSize = prefetchSize;
309     }
310
311     public boolean isDispatchAsync() {
312         return dispatchAsync;
313     }
314     public void setDispatchAsync(boolean dispatchAsync) {
315         this.dispatchAsync = dispatchAsync;
316     }
317
318     public String JavaDoc getDestinationFilter() {
319         return destinationFilter;
320     }
321     public void setDestinationFilter(String JavaDoc destinationFilter) {
322         this.destinationFilter = destinationFilter;
323     }
324
325    
326     public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){
327       this.bridgeFailedListener=listener;
328     }
329     
330     private void fireBridgeFailed() {
331         NetworkBridgeFailedListener l = this.bridgeFailedListener;
332         if (l!=null) {
333             l.bridgeFailed();
334         }
335     }
336 }
337
Popular Tags