1 18 package org.apache.activemq.transport.http; 19 20 import java.io.DataInputStream ; 21 import java.io.IOException ; 22 import java.io.InterruptedIOException ; 23 import java.net.URI ; 24 25 import org.apache.activemq.transport.FutureResponse; 26 import org.apache.activemq.transport.util.TextWireFormat; 27 import org.apache.activemq.util.ByteArrayInputStream; 28 import org.apache.activemq.util.IOExceptionSupport; 29 import org.apache.activemq.util.IdGenerator; 30 import org.apache.activemq.util.ServiceStopper; 31 import org.apache.commons.httpclient.HttpClient; 32 import org.apache.commons.httpclient.HttpMethod; 33 import org.apache.commons.httpclient.HttpStatus; 34 import org.apache.commons.httpclient.methods.GetMethod; 35 import org.apache.commons.httpclient.methods.HeadMethod; 36 import org.apache.commons.httpclient.methods.PostMethod; 37 import org.apache.commons.logging.Log; 38 import org.apache.commons.logging.LogFactory; 39 40 47 public class HttpClientTransport extends HttpTransportSupport { 48 private static final Log log = LogFactory.getLog(HttpClientTransport.class); 49 public static final int MAX_CLIENT_TIMEOUT = 30000; 50 51 private static final IdGenerator clientIdGenerator = new IdGenerator(); 52 53 private HttpClient sendHttpClient; 54 private HttpClient receiveHttpClient; 55 56 private final String clientID = clientIdGenerator.generateId(); 57 private boolean trace; 58 59 public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) { 60 super(wireFormat, remoteUrl); 61 } 62 63 public FutureResponse asyncRequest(Object command) throws IOException { 64 return null; 65 } 66 67 public void oneway(Object command) throws IOException { 68 69 if( isStopped() ) { 70 throw new IOException ("stopped."); 71 } 72 73 PostMethod httpMethod = new PostMethod(getRemoteUrl().toString()); 74 configureMethod(httpMethod); 75 String data = getTextWireFormat().marshalText(command); 76 byte[] bytes = data.getBytes("UTF-8"); 77 httpMethod.setRequestBody(new ByteArrayInputStream(bytes)); 78 79 try { 80 81 HttpClient client = getSendHttpClient(); 82 client.setTimeout(MAX_CLIENT_TIMEOUT); 83 int answer = client.executeMethod(httpMethod); 84 if (answer != HttpStatus.SC_OK) { 85 throw new IOException ("Failed to post command: " + command + " as response was: " + answer); 86 } 87 88 } catch (IOException e) { 90 throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e); 91 } finally { 92 httpMethod.getResponseBody(); 93 httpMethod.releaseConnection(); 94 } 95 } 96 97 public Object request(Object command) throws IOException { 98 return null; 99 } 100 101 public void run() { 102 103 log.trace("HTTP GET consumer thread starting: " + this); 104 HttpClient httpClient = getReceiveHttpClient(); 105 URI remoteUrl = getRemoteUrl(); 106 107 while ( !isStopped() && !isStopping() ) { 108 109 GetMethod httpMethod = new GetMethod(remoteUrl.toString()); 110 configureMethod(httpMethod); 111 112 try { 113 int answer = httpClient.executeMethod(httpMethod); 114 if (answer != HttpStatus.SC_OK) { 115 if (answer == HttpStatus.SC_REQUEST_TIMEOUT) { 116 log.debug("GET timed out"); 117 try { 118 Thread.sleep(1000); 119 } catch (InterruptedException e) { 120 onException(new InterruptedIOException ()); 121 break; 122 } 123 } 124 else { 125 onException(new IOException ("Failed to perform GET on: " + remoteUrl + " as response was: " + answer)); 126 break; 127 } 128 } 129 else { 130 DataInputStream stream = new DataInputStream (httpMethod.getResponseBodyAsStream()); 132 Object command = (Object ) getTextWireFormat().unmarshal(stream); 133 if (command == null) { 134 log.warn("Received null command from url: " + remoteUrl); 135 } else { 136 doConsume(command); 137 } 138 } 139 } 140 catch (IOException e) { 141 onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl+" Reason: "+e.getMessage(),e)); 142 break; 143 } finally { 144 httpMethod.getResponseBody(); 145 httpMethod.releaseConnection(); 146 } 147 } 148 } 149 150 public HttpClient getSendHttpClient() { 153 if (sendHttpClient == null) { 154 sendHttpClient = createHttpClient(); 155 } 156 return sendHttpClient; 157 } 158 159 public void setSendHttpClient(HttpClient sendHttpClient) { 160 this.sendHttpClient = sendHttpClient; 161 } 162 163 public HttpClient getReceiveHttpClient() { 164 if (receiveHttpClient == null) { 165 receiveHttpClient = createHttpClient(); 166 } 167 return receiveHttpClient; 168 } 169 170 public void setReceiveHttpClient(HttpClient receiveHttpClient) { 171 this.receiveHttpClient = receiveHttpClient; 172 } 173 174 protected void doStart() throws Exception { 177 178 log.trace("HTTP GET consumer thread starting: " + this); 179 HttpClient httpClient = getReceiveHttpClient(); 180 URI remoteUrl = getRemoteUrl(); 181 182 HeadMethod httpMethod = new HeadMethod(remoteUrl.toString()); 183 configureMethod(httpMethod); 184 185 int answer = httpClient.executeMethod(httpMethod); 186 if (answer != HttpStatus.SC_OK) { 187 throw new IOException ("Failed to perform GET on: " + remoteUrl + " as response was: " + answer); 188 } 189 190 super.doStart(); 191 } 192 193 protected void doStop(ServiceStopper stopper) throws Exception { 194 } 195 196 protected HttpClient createHttpClient() { 197 HttpClient client = new HttpClient(); 198 if (getProxyHost() != null) { 199 client.getHostConfiguration().setProxy(getProxyHost(), getProxyPort()); 200 } 201 return client; 202 } 203 204 protected void configureMethod(HttpMethod method) { 205 method.setRequestHeader("clientID", clientID); 206 } 207 208 public boolean isTrace() { 209 return trace; 210 } 211 212 public void setTrace(boolean trace) { 213 this.trace = trace; 214 } 215 216 228 } 229 | Popular Tags |