1 18 package org.apache.activemq.transport.http; 19 20 import java.io.IOException ; 21 import java.io.InputStream ; 22 import java.io.OutputStreamWriter ; 23 import java.io.Writer ; 24 import java.net.HttpURLConnection ; 25 import java.net.MalformedURLException ; 26 import java.net.URI ; 27 import java.net.URL ; 28 29 import org.apache.activemq.command.Command; 30 import org.apache.activemq.command.ConnectionInfo; 31 import org.apache.activemq.transport.util.TextWireFormat; 32 import org.apache.activemq.util.ByteArrayOutputStream; 33 import org.apache.activemq.util.ByteSequence; 34 import org.apache.activemq.util.Callback; 35 import org.apache.activemq.util.IOExceptionSupport; 36 import org.apache.activemq.util.ServiceStopper; 37 import org.apache.commons.logging.Log; 38 import org.apache.commons.logging.LogFactory; 39 40 43 public class HttpTransport extends HttpTransportSupport { 44 private static final Log log = LogFactory.getLog(HttpTransport.class); 45 private HttpURLConnection sendConnection; 46 private HttpURLConnection receiveConnection; 47 private URL url; 48 private String clientID; 49 51 public HttpTransport(TextWireFormat wireFormat, URI remoteUrl) throws MalformedURLException { 52 super(wireFormat, remoteUrl); 53 url = new URL (remoteUrl.toString()); 54 } 55 56 public void oneway(Object o) throws IOException { 57 final Command command = (Command) o; 58 try { 59 if (command.getDataStructureType()==ConnectionInfo.DATA_STRUCTURE_TYPE) { 60 boolean startGetThread = clientID==null; 61 clientID=((ConnectionInfo)command).getClientId(); 62 if( startGetThread && isStarted() ) { 63 try { 64 super.doStart(); 65 } catch (Exception e) { 66 throw IOExceptionSupport.create(e); 67 } 68 } 69 } 70 71 HttpURLConnection connection = getSendConnection(); 72 String text = getTextWireFormat().marshalText(command); 73 Writer writer = new OutputStreamWriter (connection.getOutputStream()); 74 writer.write(text); 75 writer.flush(); 76 int answer = connection.getResponseCode(); 77 if (answer != HttpURLConnection.HTTP_OK) { 78 throw new IOException ("Failed to post command: " + command + " as response was: " + answer); 79 } 80 } 82 catch (IOException e) { 83 throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e); 84 } 85 } 86 87 public void run() { 88 log.trace("HTTP GET consumer thread starting for transport: " + this); 89 URI remoteUrl = getRemoteUrl(); 90 while (!isStopped()) { 91 try { 92 HttpURLConnection connection = getReceiveConnection(); 93 int answer = connection.getResponseCode(); 94 if (answer != HttpURLConnection.HTTP_OK) { 95 if (answer == HttpURLConnection.HTTP_CLIENT_TIMEOUT) { 96 log.trace("GET timed out"); 97 } 98 else { 99 log.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer); 100 } 101 } 102 else { 103 105 InputStream is = connection.getInputStream(); 107 ByteArrayOutputStream baos = new ByteArrayOutputStream(connection.getContentLength()>0?connection.getContentLength():1024); 108 int c=0; 109 while( (c=is.read())>= 0 ) { 110 baos.write(c); 111 } 112 ByteSequence sequence = baos.toByteSequence(); 113 String data = new String (sequence.data, sequence.offset, sequence.length, "UTF-8"); 114 115 Command command = (Command) getTextWireFormat().unmarshalText(data); 116 117 if (command == null) { 118 log.warn("Received null packet from url: " + remoteUrl); 119 } 120 else { 121 doConsume(command); 122 } 123 } 124 } 125 catch (Throwable e) { 126 if (!isStopped()) { 127 log.error("Failed to perform GET on: " + remoteUrl + " due to: " + e, e); 128 } 129 else { 130 log.trace("Caught error after closed: " + e, e); 131 } 132 } finally { 133 safeClose(receiveConnection); 134 receiveConnection=null; 135 } 136 } 137 } 138 139 140 protected HttpURLConnection createSendConnection() throws IOException { 143 HttpURLConnection conn = (HttpURLConnection ) getRemoteURL().openConnection(); 144 conn.setDoOutput(true); 145 conn.setRequestMethod("POST"); 146 configureConnection(conn); 147 conn.connect(); 148 return conn; 149 } 150 151 protected HttpURLConnection createReceiveConnection() throws IOException { 152 HttpURLConnection conn = (HttpURLConnection ) getRemoteURL().openConnection(); 153 conn.setDoOutput(false); 154 conn.setDoInput(true); 155 conn.setRequestMethod("GET"); 156 configureConnection(conn); 157 conn.connect(); 158 return conn; 159 } 160 161 171 protected void configureConnection(HttpURLConnection connection) { 172 if (clientID != null) { 177 connection.setRequestProperty("clientID", clientID); 178 } 179 } 180 181 protected URL getRemoteURL() { 182 return url; 183 } 184 185 protected HttpURLConnection getSendConnection() throws IOException { 186 setSendConnection(createSendConnection()); 187 return sendConnection; 188 } 189 190 protected HttpURLConnection getReceiveConnection() throws IOException { 191 setReceiveConnection(createReceiveConnection()); 192 return receiveConnection; 193 } 194 195 protected void setSendConnection(HttpURLConnection conn) { 196 safeClose(sendConnection); 197 sendConnection = conn; 198 } 199 200 protected void setReceiveConnection(HttpURLConnection conn) { 201 safeClose(receiveConnection); 202 receiveConnection = conn; 203 } 204 205 protected void doStart() throws Exception { 206 if( clientID != null ) { 208 super.doStart(); 209 } 210 } 211 212 protected void doStop(ServiceStopper stopper) throws Exception { 213 stopper.run(new Callback() { 214 public void execute() throws Exception { 215 safeClose(sendConnection); 216 } 217 }); 218 sendConnection = null; 219 stopper.run(new Callback() { 220 public void execute() { 221 safeClose(receiveConnection); 222 } 223 }); 224 } 225 226 230 private void safeClose(HttpURLConnection connection) { 231 if( connection!=null ) { 232 connection.disconnect(); 233 } 234 } 235 236 } 237 | Popular Tags |