KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > StubConnection


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker;
19
20 import java.io.IOException JavaDoc;
21
22 import org.apache.activemq.Service;
23 import org.apache.activemq.command.Command;
24 import org.apache.activemq.command.ExceptionResponse;
25 import org.apache.activemq.command.Message;
26 import org.apache.activemq.command.Response;
27 import org.apache.activemq.command.ShutdownInfo;
28 import org.apache.activemq.transport.DefaultTransportListener;
29 import org.apache.activemq.transport.Transport;
30 import org.apache.activemq.transport.TransportFactory;
31 import org.apache.activemq.util.JMSExceptionSupport;
32 import org.apache.activemq.util.ServiceSupport;
33
34 import java.util.concurrent.BlockingQueue JavaDoc;
35 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
36
37 public class StubConnection implements Service {
38
39     private final BlockingQueue JavaDoc dispatchQueue = new LinkedBlockingQueue JavaDoc();
40     private Connection connection;
41     private Transport transport;
42     boolean shuttingDown = false;
43     
44     protected void dispatch(Object JavaDoc command) throws InterruptedException JavaDoc, IOException JavaDoc {
45         dispatchQueue.put(command);
46     }
47
48     public StubConnection(BrokerService broker) throws Exception JavaDoc {
49         this(TransportFactory.connect(broker.getVmConnectorURI()));
50     }
51
52     public StubConnection(Connection connection) {
53         this.connection = connection;
54     }
55
56     public StubConnection(Transport transport) throws Exception JavaDoc {
57         this.transport = transport;
58         transport.setTransportListener(new DefaultTransportListener() {
59             public void onCommand(Object JavaDoc command) {
60                 try {
61                     if (command.getClass() == ShutdownInfo.class) {
62                         shuttingDown = true;
63                     }
64                     StubConnection.this.dispatch(command);
65                 }
66                 catch (Exception JavaDoc e) {
67                     onException(new IOException JavaDoc("" + e));
68                 }
69             }
70
71             public void onException(IOException JavaDoc error) {
72                 if (!shuttingDown) {
73                     error.printStackTrace();
74                 }
75             }
76         });
77         transport.start();
78     }
79
80     public BlockingQueue JavaDoc getDispatchQueue() {
81         return dispatchQueue;
82     }
83
84     public void send(Command command) throws Exception JavaDoc {
85         if( command instanceof Message ) {
86             Message message = (Message) command;
87             message.setProducerId(message.getMessageId().getProducerId());
88         }
89         command.setResponseRequired(false);
90         if (connection != null) {
91             Response response = connection.service(command);
92             if (response != null && response.isException()) {
93                 ExceptionResponse er = (ExceptionResponse) response;
94                 throw JMSExceptionSupport.create(er.getException());
95             }
96         }
97         else if (transport != null) {
98             transport.oneway(command);
99         }
100     }
101
102     public Response request(Command command) throws Exception JavaDoc {
103         if( command instanceof Message ) {
104             Message message = (Message) command;
105             message.setProducerId(message.getMessageId().getProducerId());
106         }
107         command.setResponseRequired(true);
108         if (connection != null) {
109             Response response = connection.service(command);
110             if (response != null && response.isException()) {
111                 ExceptionResponse er = (ExceptionResponse) response;
112                 throw JMSExceptionSupport.create(er.getException());
113             }
114             return response;
115         }
116         else if (transport != null) {
117             Response response = (Response) transport.request(command);
118             if (response != null && response.isException()) {
119                 ExceptionResponse er = (ExceptionResponse) response;
120                 throw JMSExceptionSupport.create(er.getException());
121             }
122             return response;
123         }
124         return null;
125     }
126
127     public Connection getConnection() {
128         return connection;
129     }
130
131     public Transport getTransport() {
132         return transport;
133     }
134
135     public void start() throws Exception JavaDoc {
136     }
137
138     public void stop() throws Exception JavaDoc {
139         shuttingDown = true;
140         if (transport != null) {
141             try {
142                 transport.oneway(new ShutdownInfo());
143             }
144             catch (IOException JavaDoc e) {
145             }
146             ServiceSupport.dispose(transport);
147         }
148     }
149 }
150
Popular Tags