KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > proxy > ProxyConnection


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.proxy;
19
20 import java.io.IOException JavaDoc;
21 import org.apache.activemq.Service;
22 import org.apache.activemq.command.ShutdownInfo;
23 import org.apache.activemq.transport.DefaultTransportListener;
24 import org.apache.activemq.transport.Transport;
25 import org.apache.activemq.transport.TransportListener;
26 import org.apache.activemq.util.IOExceptionSupport;
27 import org.apache.activemq.util.ServiceStopper;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
31
32 class ProxyConnection implements Service {
33
34     static final private Log log = LogFactory.getLog(ProxyConnection.class);
35     
36     private final Transport localTransport;
37     private final Transport remoteTransport;
38     private AtomicBoolean JavaDoc shuttingDown = new AtomicBoolean JavaDoc(false);
39     private AtomicBoolean JavaDoc running = new AtomicBoolean JavaDoc(false);
40
41     public ProxyConnection(Transport localTransport, Transport remoteTransport) {
42         this.localTransport = localTransport;
43         this.remoteTransport = remoteTransport;
44     }
45
46     public void onFailure(IOException JavaDoc e) {
47         if( !shuttingDown.get() ) {
48             log.debug("Transport error: "+e,e);
49             try {
50                 stop();
51             } catch (Exception JavaDoc ignore) {
52             }
53         }
54     }
55
56     public void start() throws Exception JavaDoc {
57         if( !running.compareAndSet(false, true) ) {
58             return;
59         }
60             
61         this.localTransport.setTransportListener(new DefaultTransportListener() {
62             public void onCommand(Object JavaDoc command) {
63                 boolean shutdown=false;
64                 if( command.getClass() == ShutdownInfo.class ) {
65                     shuttingDown.set(true);
66                     shutdown=true;
67                 }
68                 try {
69                     remoteTransport.oneway(command);
70                     if( shutdown )
71                         stop();
72                 } catch (IOException JavaDoc error) {
73                     onFailure(error);
74                 } catch (Exception JavaDoc error) {
75                     onFailure(IOExceptionSupport.create(error));
76                 }
77             }
78             public void onException(IOException JavaDoc error) {
79                 onFailure(error);
80             }
81         });
82         
83         this.remoteTransport.setTransportListener(new DefaultTransportListener() {
84             public void onCommand(Object JavaDoc command) {
85                 try {
86                     localTransport.oneway(command);
87                 } catch (IOException JavaDoc error) {
88                     onFailure(error);
89                 }
90             }
91             public void onException(IOException JavaDoc error) {
92                 onFailure(error);
93             }
94         });
95         
96         localTransport.start();
97         remoteTransport.start();
98     }
99     
100     public void stop() throws Exception JavaDoc {
101         if( !running.compareAndSet(true, false) ) {
102             return;
103         }
104         shuttingDown.set(true);
105         ServiceStopper ss = new ServiceStopper();
106         ss.stop(localTransport);
107         ss.stop(remoteTransport);
108         ss.throwFirstException();
109     }
110     
111 }
112
Popular Tags