KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > http > HttpTransport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.http;
19
20 import java.io.IOException JavaDoc;
21 import java.io.InputStream JavaDoc;
22 import java.io.OutputStreamWriter JavaDoc;
23 import java.io.Writer JavaDoc;
24 import java.net.HttpURLConnection JavaDoc;
25 import java.net.MalformedURLException JavaDoc;
26 import java.net.URI JavaDoc;
27 import java.net.URL JavaDoc;
28
29 import org.apache.activemq.command.Command;
30 import org.apache.activemq.command.ConnectionInfo;
31 import org.apache.activemq.transport.util.TextWireFormat;
32 import org.apache.activemq.util.ByteArrayOutputStream;
33 import org.apache.activemq.util.ByteSequence;
34 import org.apache.activemq.util.Callback;
35 import org.apache.activemq.util.IOExceptionSupport;
36 import org.apache.activemq.util.ServiceStopper;
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39
40 /**
41  * @version $Revision$
42  */

43 public class HttpTransport extends HttpTransportSupport {
44     private static final Log log = LogFactory.getLog(HttpTransport.class);
45     private HttpURLConnection JavaDoc sendConnection;
46     private HttpURLConnection JavaDoc receiveConnection;
47     private URL JavaDoc url;
48     private String JavaDoc clientID;
49 // private String sessionID;
50

51     public HttpTransport(TextWireFormat wireFormat, URI JavaDoc remoteUrl) throws MalformedURLException JavaDoc {
52         super(wireFormat, remoteUrl);
53         url = new URL JavaDoc(remoteUrl.toString());
54     }
55
56     public void oneway(Object JavaDoc o) throws IOException JavaDoc {
57         final Command command = (Command) o;
58         try {
59             if (command.getDataStructureType()==ConnectionInfo.DATA_STRUCTURE_TYPE) {
60                 boolean startGetThread = clientID==null;
61                 clientID=((ConnectionInfo)command).getClientId();
62                 if( startGetThread && isStarted() ) {
63                     try {
64                         super.doStart();
65                     } catch (Exception JavaDoc e) {
66                         throw IOExceptionSupport.create(e);
67                     }
68                 }
69             }
70             
71             HttpURLConnection JavaDoc connection = getSendConnection();
72             String JavaDoc text = getTextWireFormat().marshalText(command);
73             Writer JavaDoc writer = new OutputStreamWriter JavaDoc(connection.getOutputStream());
74             writer.write(text);
75             writer.flush();
76             int answer = connection.getResponseCode();
77             if (answer != HttpURLConnection.HTTP_OK) {
78                 throw new IOException JavaDoc("Failed to post command: " + command + " as response was: " + answer);
79             }
80 // checkSession(connection);
81
}
82         catch (IOException JavaDoc e) {
83             throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e);
84         }
85     }
86
87     public void run() {
88         log.trace("HTTP GET consumer thread starting for transport: " + this);
89         URI JavaDoc remoteUrl = getRemoteUrl();
90         while (!isStopped()) {
91             try {
92                 HttpURLConnection JavaDoc connection = getReceiveConnection();
93                 int answer = connection.getResponseCode();
94                 if (answer != HttpURLConnection.HTTP_OK) {
95                     if (answer == HttpURLConnection.HTTP_CLIENT_TIMEOUT) {
96                         log.trace("GET timed out");
97                     }
98                     else {
99                         log.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
100                     }
101                 }
102                 else {
103 // checkSession(connection);
104

105                     // Create a String for the UTF content
106
InputStream JavaDoc is = connection.getInputStream();
107                     ByteArrayOutputStream baos = new ByteArrayOutputStream(connection.getContentLength()>0?connection.getContentLength():1024);
108                     int c=0;
109                     while( (c=is.read())>= 0 ) {
110                         baos.write(c);
111                     }
112                     ByteSequence sequence = baos.toByteSequence();
113                     String JavaDoc data = new String JavaDoc(sequence.data, sequence.offset, sequence.length, "UTF-8");
114                     
115                     Command command = (Command) getTextWireFormat().unmarshalText(data);
116                     
117                     if (command == null) {
118                         log.warn("Received null packet from url: " + remoteUrl);
119                     }
120                     else {
121                         doConsume(command);
122                     }
123                 }
124             }
125             catch (Throwable JavaDoc e) {
126                 if (!isStopped()) {
127                     log.error("Failed to perform GET on: " + remoteUrl + " due to: " + e, e);
128                 }
129                 else {
130                     log.trace("Caught error after closed: " + e, e);
131                 }
132             } finally {
133                 safeClose(receiveConnection);
134                 receiveConnection=null;
135             }
136         }
137     }
138  
139
140     // Implementation methods
141
// -------------------------------------------------------------------------
142
protected HttpURLConnection JavaDoc createSendConnection() throws IOException JavaDoc {
143         HttpURLConnection JavaDoc conn = (HttpURLConnection JavaDoc) getRemoteURL().openConnection();
144         conn.setDoOutput(true);
145         conn.setRequestMethod("POST");
146         configureConnection(conn);
147         conn.connect();
148         return conn;
149     }
150
151     protected HttpURLConnection JavaDoc createReceiveConnection() throws IOException JavaDoc {
152         HttpURLConnection JavaDoc conn = (HttpURLConnection JavaDoc) getRemoteURL().openConnection();
153         conn.setDoOutput(false);
154         conn.setDoInput(true);
155         conn.setRequestMethod("GET");
156         configureConnection(conn);
157         conn.connect();
158         return conn;
159     }
160
161 // protected void checkSession(HttpURLConnection connection)
162
// {
163
// String set_cookie=connection.getHeaderField("Set-Cookie");
164
// if (set_cookie!=null && set_cookie.startsWith("JSESSIONID="))
165
// {
166
// String[] bits=set_cookie.split("[=;]");
167
// sessionID=bits[1];
168
// }
169
// }
170

171     protected void configureConnection(HttpURLConnection JavaDoc connection) {
172 // if (sessionID !=null) {
173
// connection.addRequestProperty("Cookie", "JSESSIONID="+sessionID);
174
// }
175
// else
176
if (clientID != null) {
177             connection.setRequestProperty("clientID", clientID);
178         }
179     }
180
181     protected URL JavaDoc getRemoteURL() {
182         return url;
183     }
184
185     protected HttpURLConnection JavaDoc getSendConnection() throws IOException JavaDoc {
186         setSendConnection(createSendConnection());
187         return sendConnection;
188     }
189
190     protected HttpURLConnection JavaDoc getReceiveConnection() throws IOException JavaDoc {
191         setReceiveConnection(createReceiveConnection());
192         return receiveConnection;
193     }
194
195     protected void setSendConnection(HttpURLConnection JavaDoc conn) {
196         safeClose(sendConnection);
197         sendConnection = conn;
198     }
199
200     protected void setReceiveConnection(HttpURLConnection JavaDoc conn) {
201         safeClose(receiveConnection);
202         receiveConnection = conn;
203     }
204
205     protected void doStart() throws Exception JavaDoc {
206         // Don't start the background thread until the clientId has been established.
207
if( clientID != null ) {
208             super.doStart();
209         }
210     }
211     
212     protected void doStop(ServiceStopper stopper) throws Exception JavaDoc {
213         stopper.run(new Callback() {
214             public void execute() throws Exception JavaDoc {
215                 safeClose(sendConnection);
216             }
217         });
218         sendConnection = null;
219         stopper.run(new Callback() {
220             public void execute() {
221                 safeClose(receiveConnection);
222             }
223         });
224     }
225     
226     /**
227      * @param connection TODO
228      *
229      */

230     private void safeClose(HttpURLConnection JavaDoc connection) {
231         if( connection!=null ) {
232             connection.disconnect();
233         }
234     }
235
236 }
237
Popular Tags