KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > proxies > tcp > TcpConnectionListener


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2004 - 2006 ScalAgent Distributed Technologies
4  * Copyright (C) 2004 France Telecom R&D
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): ScalAgent Distributed Technologies
22  * Contributor(s):
23  */

24 package org.objectweb.joram.mom.proxies.tcp;
25
26 import java.io.InputStream JavaDoc;
27 import java.io.OutputStream JavaDoc;
28 import java.io.ByteArrayOutputStream JavaDoc;
29 import java.io.IOException JavaDoc;
30 import java.net.ServerSocket JavaDoc;
31 import java.net.Socket JavaDoc;
32
33 import org.objectweb.joram.mom.notifications.GetProxyIdNot;
34 import org.objectweb.joram.mom.proxies.AckedQueue;
35 import org.objectweb.joram.mom.proxies.GetConnectionNot;
36 import org.objectweb.joram.mom.proxies.OpenConnectionNot;
37 import org.objectweb.joram.mom.proxies.ReliableConnectionContext;
38 import org.objectweb.joram.shared.stream.StreamUtil;
39
40 import fr.dyade.aaa.agent.AgentId;
41 import fr.dyade.aaa.agent.AgentServer;
42 import fr.dyade.aaa.util.Daemon;
43
44 import org.objectweb.util.monolog.api.BasicLevel;
45 import org.objectweb.joram.shared.JoramTracing;
46
47 /**
48  * Listens to the TCP connections from the JMS clients.
49  * Creates a <code>TcpConnection</code> for each
50  * accepted TCP connection.
51  * Opens the <code>UserConnection</code> with the
52  * right user's proxy.
53  */

54 public class TcpConnectionListener extends Daemon {
55   /**
56    * The server socket listening to connections from the JMS clients.
57    */

58   private ServerSocket JavaDoc serverSocket;
59   
60   /**
61    * The TCP proxy service
62    */

63   private TcpProxyService proxyService;
64
65   private int timeout;
66
67   /**
68    * Creates a new connection listener
69    *
70    * @param serverSocket the server socket to listen to
71    * @param proxyService the TCP proxy service of this
72    * connection listener
73    */

74   public TcpConnectionListener(ServerSocket JavaDoc serverSocket,
75                                TcpProxyService proxyService,
76                                int timeout) {
77     super("TcpConnectionListener");
78     this.serverSocket = serverSocket;
79     this.proxyService = proxyService;
80     this.timeout = timeout;
81   }
82
83   public void run() {
84     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
85       JoramTracing.dbgProxy.log(
86         BasicLevel.DEBUG, "TcpConnectionListener.run()");
87
88     // Wait for the admin topic deployment.
89
// (a synchronization would be much better)
90
try {
91       Thread.sleep(2000);
92     } catch (InterruptedException JavaDoc exc) {
93       // continue
94
}
95
96     loop:
97     while (running) {
98       canStop = true;
99       if (serverSocket != null) {
100         try {
101           acceptConnection();
102         } catch (Exception JavaDoc exc) {
103           if (running) {
104             continue loop;
105           } else {
106             break loop;
107           }
108         }
109       }
110     }
111   }
112
113   static class NetOutputStream extends ByteArrayOutputStream JavaDoc {
114     private OutputStream JavaDoc os = null;
115
116     NetOutputStream(Socket JavaDoc sock) throws IOException JavaDoc {
117       super(1024);
118       reset();
119       os = sock.getOutputStream();
120     }
121
122     public void reset() {
123       count = 4;
124     }
125
126     public void send() throws IOException JavaDoc {
127       try {
128         buf[0] = (byte) ((count -4) >>> 24);
129         buf[1] = (byte) ((count -4) >>> 16);
130         buf[2] = (byte) ((count -4) >>> 8);
131         buf[3] = (byte) ((count -4) >>> 0);
132
133         writeTo(os);
134         os.flush();
135       } finally {
136         reset();
137       }
138     }
139   }
140
141   /**
142    * Accepts a TCP connection. Opens the <code>UserConnection</code> with the
143    * right user's proxy, creates and starts the <code>TcpConnection</code>.
144    */

145   private void acceptConnection() throws Exception JavaDoc {
146     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
147       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
148                                 "TcpConnectionListener.acceptConnection()");
149
150     Socket JavaDoc sock = serverSocket.accept();
151     String JavaDoc inaddr = sock.getInetAddress().getHostAddress();
152
153     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
154       JoramTracing.dbgProxy.log(BasicLevel.DEBUG, " -> accept connection");
155
156     try {
157       sock.setTcpNoDelay(true);
158
159       // Fix bug when the client doesn't
160
// use the right protocol (e.g. Telnet)
161
// and blocks this listener.
162
sock.setSoTimeout(timeout);
163
164       InputStream JavaDoc is = sock.getInputStream();
165       NetOutputStream nos = new NetOutputStream(sock);
166
167       int len = StreamUtil.readIntFrom(is);
168       String JavaDoc userName = StreamUtil.readStringFrom(is);
169       if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
170     JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
171                                   " -> read userName = " + userName);
172       String JavaDoc userPassword = StreamUtil.readStringFrom(is);
173       if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
174     JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
175                                   " -> read userPassword = " + userPassword);
176       int key = StreamUtil.readIntFrom(is);
177       if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
178     JoramTracing.dbgProxy.log(BasicLevel.DEBUG, " -> read key = " + key);
179       int heartBeat = 0;
180       if (key == -1) {
181     heartBeat = StreamUtil.readIntFrom(is);
182     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
183     JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
184                                   " -> read heartBeat = " + heartBeat);
185       }
186
187       GetProxyIdNot gpin = new GetProxyIdNot(userName, userPassword, inaddr);
188       AgentId proxyId;
189       try {
190         gpin.invoke(new AgentId(AgentServer.getServerId(),
191                                 AgentServer.getServerId(),
192                                 AgentId.JoramAdminStamp));
193         proxyId = gpin.getProxyId();
194       } catch (Exception JavaDoc exc) {
195         if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
196       JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "", exc);
197         StreamUtil.writeTo(1, nos);
198         StreamUtil.writeTo(exc.getMessage(), nos);
199         nos.send();
200         return;
201       }
202
203       IOControl ioctrl;
204       AckedQueue replyQueue;
205       if (key == -1) {
206         OpenConnectionNot ocn = new OpenConnectionNot(true, heartBeat);
207         ocn.invoke(proxyId);
208         StreamUtil.writeTo(0, nos);
209         ReliableConnectionContext ctx =
210           (ReliableConnectionContext)ocn.getConnectionContext();
211         key = ctx.getKey();
212         StreamUtil.writeTo(ctx.getKey(), nos);
213         nos.send();
214         replyQueue = (AckedQueue) ctx.getQueue();
215         ioctrl = new IOControl(sock);
216       } else {
217         GetConnectionNot gcn = new GetConnectionNot(key);
218         try {
219           gcn.invoke(proxyId);
220         } catch (Exception JavaDoc exc) {
221       if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
222             JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "", exc);
223           StreamUtil.writeTo(1, nos);
224           StreamUtil.writeTo(exc.getMessage(), nos);
225           nos.send();
226           return;
227         }
228         ReliableConnectionContext ctx =
229           (ReliableConnectionContext)gcn.getConnectionContext();
230         replyQueue = ctx.getQueue();
231         heartBeat = ctx.getHeartBeat();
232         StreamUtil.writeTo(0, nos);
233         nos.send();
234         ioctrl = new IOControl(sock, ctx.getInputCounter());
235
236         TcpConnection tcpConnection = proxyService.getConnection(proxyId, key);
237         if (tcpConnection != null) {
238           tcpConnection.close();
239         }
240       }
241
242       // Reset the timeout in order to enable the server to indefinitely
243
// wait for requests.
244
sock.setSoTimeout(0);
245
246       TcpConnection tcpConnection = new TcpConnection(ioctrl, proxyId,
247           replyQueue, key, proxyService, heartBeat == 0);
248       tcpConnection.start();
249     } catch (Exception JavaDoc exc) {
250       if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
251         JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "", exc);
252       sock.close();
253       throw exc;
254     }
255   }
256   
257   protected void shutdown() {
258     close();
259   }
260     
261   protected void close() {
262     try {
263       if (serverSocket != null)
264         serverSocket.close();
265     } catch (IOException JavaDoc exc) {}
266     serverSocket = null;
267   }
268 }
269
Popular Tags