KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > http > HttpTunnelServlet


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.http;
19
20 import java.io.BufferedReader JavaDoc;
21 import java.io.DataOutputStream JavaDoc;
22 import java.io.IOException JavaDoc;
23 import java.util.HashMap JavaDoc;
24 import java.util.Map JavaDoc;
25
26 import javax.servlet.ServletException JavaDoc;
27 import javax.servlet.http.HttpServlet JavaDoc;
28 import javax.servlet.http.HttpServletRequest JavaDoc;
29 import javax.servlet.http.HttpServletResponse JavaDoc;
30
31 import org.apache.activemq.command.Command;
32 import org.apache.activemq.command.WireFormatInfo;
33 import org.apache.activemq.transport.TransportAcceptListener;
34 import org.apache.activemq.transport.util.TextWireFormat;
35 import org.apache.activemq.transport.xstream.XStreamWireFormat;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38
39 import java.util.concurrent.ArrayBlockingQueue JavaDoc;
40 import java.util.concurrent.TimeUnit JavaDoc;
41
42 /**
43  * A servlet which handles server side HTTP transport, delegating to the
44  * ActiveMQ broker. This servlet is designed for being embedded inside an
45  * ActiveMQ Broker using an embedded Jetty or Tomcat instance.
46  *
47  * @version $Revision$
48  */

49 public class HttpTunnelServlet extends HttpServlet JavaDoc {
50     private static final long serialVersionUID = -3826714430767484333L;
51     private static final Log log = LogFactory.getLog(HttpTunnelServlet.class);
52
53     private TransportAcceptListener listener;
54     private TextWireFormat wireFormat;
55     private Map JavaDoc clients = new HashMap JavaDoc();
56     private long requestTimeout = 30000L;
57
58     public void init() throws ServletException JavaDoc {
59         super.init();
60         listener = (TransportAcceptListener) getServletContext().getAttribute("acceptListener");
61         if (listener == null) {
62             throw new ServletException JavaDoc("No such attribute 'acceptListener' available in the ServletContext");
63         }
64         wireFormat = (TextWireFormat) getServletContext().getAttribute("wireFormat");
65         if (wireFormat == null) {
66             wireFormat = createWireFormat();
67         }
68     }
69     
70     protected void doHead(HttpServletRequest JavaDoc request, HttpServletResponse JavaDoc response) throws ServletException JavaDoc, IOException JavaDoc {
71         createTransportChannel(request, response);
72     }
73     
74     protected void doGet(HttpServletRequest JavaDoc request, HttpServletResponse JavaDoc response) throws ServletException JavaDoc, IOException JavaDoc {
75         // lets return the next response
76
Command packet = null;
77         int count=0;
78         try {
79             BlockingQueueTransport transportChannel = getTransportChannel(request, response);
80             if (transportChannel == null)
81                 return;
82             
83             packet = (Command) transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS);
84             
85             DataOutputStream JavaDoc stream = new DataOutputStream JavaDoc(response.getOutputStream());
86 // while( packet !=null ) {
87
wireFormat.marshal(packet, stream);
88                 count++;
89 // packet = (Command) transportChannel.getQueue().poll(0, TimeUnit.MILLISECONDS);
90
// }
91

92         } catch (InterruptedException JavaDoc ignore) {
93         }
94         if (count == 0) {
95             response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT);
96         }
97     }
98
99     protected void doPost(HttpServletRequest JavaDoc request, HttpServletResponse JavaDoc response) throws ServletException JavaDoc, IOException JavaDoc {
100
101         // Read the command directly from the reader
102
Command command = (Command) wireFormat.unmarshalText(request.getReader());
103
104         if (command instanceof WireFormatInfo) {
105             WireFormatInfo info = (WireFormatInfo) command;
106             if (!canProcessWireFormatVersion(info.getVersion())) {
107                 response.sendError(HttpServletResponse.SC_NOT_FOUND, "Cannot process wire format of version: " + info.getVersion());
108             }
109
110         } else {
111
112             BlockingQueueTransport transport = getTransportChannel(request, response);
113             if (transport == null)
114                 return;
115             
116             transport.doConsume(command);
117         }
118     }
119
120     private boolean canProcessWireFormatVersion(int version) {
121         // TODO:
122
return true;
123     }
124
125     protected String JavaDoc readRequestBody(HttpServletRequest JavaDoc request) throws IOException JavaDoc {
126         StringBuffer JavaDoc buffer = new StringBuffer JavaDoc();
127         BufferedReader JavaDoc reader = request.getReader();
128         while (true) {
129             String JavaDoc line = reader.readLine();
130             if (line == null) {
131                 break;
132             }
133             else {
134                 buffer.append(line);
135                 buffer.append("\n");
136             }
137         }
138         return buffer.toString();
139     }
140
141     protected BlockingQueueTransport getTransportChannel(HttpServletRequest JavaDoc request, HttpServletResponse JavaDoc response) throws IOException JavaDoc {
142         String JavaDoc clientID = request.getHeader("clientID");
143         if (clientID == null) {
144             response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
145             log.warn("No clientID header specified");
146             return null;
147         }
148         synchronized (this) {
149             BlockingQueueTransport answer = (BlockingQueueTransport) clients.get(clientID);
150             if (answer == null) {
151                 log.warn("The clientID header specified is invalid. Client sesion has not yet been established for it: "+clientID);
152                 return null;
153             }
154             return answer;
155         }
156     }
157     
158     protected BlockingQueueTransport createTransportChannel(HttpServletRequest JavaDoc request, HttpServletResponse JavaDoc response) throws IOException JavaDoc {
159         String JavaDoc clientID = request.getHeader("clientID");
160         
161         if (clientID == null) {
162             response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
163             log.warn("No clientID header specified");
164             return null;
165         }
166         
167         synchronized (this) {
168             BlockingQueueTransport answer = (BlockingQueueTransport) clients.get(clientID);
169             if (answer != null) {
170                 response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '"+clientID+"' has allready been established");
171                 log.warn("A session for clientID '"+clientID+"' has allready been established");
172                 return null;
173             }
174             
175             answer = createTransportChannel();
176             clients.put(clientID, answer);
177             listener.onAccept(answer);
178             return answer;
179         }
180     }
181
182     protected BlockingQueueTransport createTransportChannel() {
183         return new BlockingQueueTransport(new ArrayBlockingQueue JavaDoc(10));
184     }
185
186     protected TextWireFormat createWireFormat() {
187         return new XStreamWireFormat();
188     }
189 }
190
Popular Tags