1 18 package org.apache.activemq.transport.http; 19 20 import java.io.BufferedReader ; 21 import java.io.DataOutputStream ; 22 import java.io.IOException ; 23 import java.util.HashMap ; 24 import java.util.Map ; 25 26 import javax.servlet.ServletException ; 27 import javax.servlet.http.HttpServlet ; 28 import javax.servlet.http.HttpServletRequest ; 29 import javax.servlet.http.HttpServletResponse ; 30 31 import org.apache.activemq.command.Command; 32 import org.apache.activemq.command.WireFormatInfo; 33 import org.apache.activemq.transport.TransportAcceptListener; 34 import org.apache.activemq.transport.util.TextWireFormat; 35 import org.apache.activemq.transport.xstream.XStreamWireFormat; 36 import org.apache.commons.logging.Log; 37 import org.apache.commons.logging.LogFactory; 38 39 import java.util.concurrent.ArrayBlockingQueue ; 40 import java.util.concurrent.TimeUnit ; 41 42 49 public class HttpTunnelServlet extends HttpServlet { 50 private static final long serialVersionUID = -3826714430767484333L; 51 private static final Log log = LogFactory.getLog(HttpTunnelServlet.class); 52 53 private TransportAcceptListener listener; 54 private TextWireFormat wireFormat; 55 private Map clients = new HashMap (); 56 private long requestTimeout = 30000L; 57 58 public void init() throws ServletException { 59 super.init(); 60 listener = (TransportAcceptListener) getServletContext().getAttribute("acceptListener"); 61 if (listener == null) { 62 throw new ServletException ("No such attribute 'acceptListener' available in the ServletContext"); 63 } 64 wireFormat = (TextWireFormat) getServletContext().getAttribute("wireFormat"); 65 if (wireFormat == null) { 66 wireFormat = createWireFormat(); 67 } 68 } 69 70 protected void doHead(HttpServletRequest request, HttpServletResponse response) throws ServletException , IOException { 71 createTransportChannel(request, response); 72 } 73 74 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException , IOException { 75 Command packet = null; 77 int count=0; 78 try { 79 BlockingQueueTransport transportChannel = getTransportChannel(request, response); 80 if (transportChannel == null) 81 return; 82 83 packet = (Command) transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS); 84 85 DataOutputStream stream = new DataOutputStream (response.getOutputStream()); 86 wireFormat.marshal(packet, stream); 88 count++; 89 92 } catch (InterruptedException ignore) { 93 } 94 if (count == 0) { 95 response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT); 96 } 97 } 98 99 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException , IOException { 100 101 Command command = (Command) wireFormat.unmarshalText(request.getReader()); 103 104 if (command instanceof WireFormatInfo) { 105 WireFormatInfo info = (WireFormatInfo) command; 106 if (!canProcessWireFormatVersion(info.getVersion())) { 107 response.sendError(HttpServletResponse.SC_NOT_FOUND, "Cannot process wire format of version: " + info.getVersion()); 108 } 109 110 } else { 111 112 BlockingQueueTransport transport = getTransportChannel(request, response); 113 if (transport == null) 114 return; 115 116 transport.doConsume(command); 117 } 118 } 119 120 private boolean canProcessWireFormatVersion(int version) { 121 return true; 123 } 124 125 protected String readRequestBody(HttpServletRequest request) throws IOException { 126 StringBuffer buffer = new StringBuffer (); 127 BufferedReader reader = request.getReader(); 128 while (true) { 129 String line = reader.readLine(); 130 if (line == null) { 131 break; 132 } 133 else { 134 buffer.append(line); 135 buffer.append("\n"); 136 } 137 } 138 return buffer.toString(); 139 } 140 141 protected BlockingQueueTransport getTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException { 142 String clientID = request.getHeader("clientID"); 143 if (clientID == null) { 144 response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified"); 145 log.warn("No clientID header specified"); 146 return null; 147 } 148 synchronized (this) { 149 BlockingQueueTransport answer = (BlockingQueueTransport) clients.get(clientID); 150 if (answer == null) { 151 log.warn("The clientID header specified is invalid. Client sesion has not yet been established for it: "+clientID); 152 return null; 153 } 154 return answer; 155 } 156 } 157 158 protected BlockingQueueTransport createTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException { 159 String clientID = request.getHeader("clientID"); 160 161 if (clientID == null) { 162 response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified"); 163 log.warn("No clientID header specified"); 164 return null; 165 } 166 167 synchronized (this) { 168 BlockingQueueTransport answer = (BlockingQueueTransport) clients.get(clientID); 169 if (answer != null) { 170 response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '"+clientID+"' has allready been established"); 171 log.warn("A session for clientID '"+clientID+"' has allready been established"); 172 return null; 173 } 174 175 answer = createTransportChannel(); 176 clients.put(clientID, answer); 177 listener.onAccept(answer); 178 return answer; 179 } 180 } 181 182 protected BlockingQueueTransport createTransportChannel() { 183 return new BlockingQueueTransport(new ArrayBlockingQueue (10)); 184 } 185 186 protected TextWireFormat createWireFormat() { 187 return new XStreamWireFormat(); 188 } 189 } 190 | Popular Tags |