1 24 package org.objectweb.joram.mom.proxies.tcp; 25 26 import java.io.InputStream ; 27 import java.io.OutputStream ; 28 import java.io.ByteArrayOutputStream ; 29 import java.io.IOException ; 30 import java.net.ServerSocket ; 31 import java.net.Socket ; 32 33 import org.objectweb.joram.mom.notifications.GetProxyIdNot; 34 import org.objectweb.joram.mom.proxies.AckedQueue; 35 import org.objectweb.joram.mom.proxies.GetConnectionNot; 36 import org.objectweb.joram.mom.proxies.OpenConnectionNot; 37 import org.objectweb.joram.mom.proxies.ReliableConnectionContext; 38 import org.objectweb.joram.shared.stream.StreamUtil; 39 40 import fr.dyade.aaa.agent.AgentId; 41 import fr.dyade.aaa.agent.AgentServer; 42 import fr.dyade.aaa.util.Daemon; 43 44 import org.objectweb.util.monolog.api.BasicLevel; 45 import org.objectweb.joram.shared.JoramTracing; 46 47 54 public class TcpConnectionListener extends Daemon { 55 58 private ServerSocket serverSocket; 59 60 63 private TcpProxyService proxyService; 64 65 private int timeout; 66 67 74 public TcpConnectionListener(ServerSocket serverSocket, 75 TcpProxyService proxyService, 76 int timeout) { 77 super("TcpConnectionListener"); 78 this.serverSocket = serverSocket; 79 this.proxyService = proxyService; 80 this.timeout = timeout; 81 } 82 83 public void run() { 84 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 85 JoramTracing.dbgProxy.log( 86 BasicLevel.DEBUG, "TcpConnectionListener.run()"); 87 88 try { 91 Thread.sleep(2000); 92 } catch (InterruptedException exc) { 93 } 95 96 loop: 97 while (running) { 98 canStop = true; 99 if (serverSocket != null) { 100 try { 101 acceptConnection(); 102 } catch (Exception exc) { 103 if (running) { 104 continue loop; 105 } else { 106 break loop; 107 } 108 } 109 } 110 } 111 } 112 113 static class NetOutputStream extends ByteArrayOutputStream { 114 private OutputStream os = null; 115 116 NetOutputStream(Socket sock) throws IOException { 117 super(1024); 118 reset(); 119 os = sock.getOutputStream(); 120 } 121 122 public void reset() { 123 count = 4; 124 } 125 126 public void send() throws IOException { 127 try { 128 buf[0] = (byte) ((count -4) >>> 24); 129 buf[1] = (byte) ((count -4) >>> 16); 130 buf[2] = (byte) ((count -4) >>> 8); 131 buf[3] = (byte) ((count -4) >>> 0); 132 133 writeTo(os); 134 os.flush(); 135 } finally { 136 reset(); 137 } 138 } 139 } 140 141 145 private void acceptConnection() throws Exception { 146 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 147 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 148 "TcpConnectionListener.acceptConnection()"); 149 150 Socket sock = serverSocket.accept(); 151 String inaddr = sock.getInetAddress().getHostAddress(); 152 153 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 154 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, " -> accept connection"); 155 156 try { 157 sock.setTcpNoDelay(true); 158 159 sock.setSoTimeout(timeout); 163 164 InputStream is = sock.getInputStream(); 165 NetOutputStream nos = new NetOutputStream(sock); 166 167 int len = StreamUtil.readIntFrom(is); 168 String userName = StreamUtil.readStringFrom(is); 169 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 170 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 171 " -> read userName = " + userName); 172 String userPassword = StreamUtil.readStringFrom(is); 173 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 174 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 175 " -> read userPassword = " + userPassword); 176 int key = StreamUtil.readIntFrom(is); 177 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 178 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, " -> read key = " + key); 179 int heartBeat = 0; 180 if (key == -1) { 181 heartBeat = StreamUtil.readIntFrom(is); 182 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 183 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, 184 " -> read heartBeat = " + heartBeat); 185 } 186 187 GetProxyIdNot gpin = new GetProxyIdNot(userName, userPassword, inaddr); 188 AgentId proxyId; 189 try { 190 gpin.invoke(new AgentId(AgentServer.getServerId(), 191 AgentServer.getServerId(), 192 AgentId.JoramAdminStamp)); 193 proxyId = gpin.getProxyId(); 194 } catch (Exception exc) { 195 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 196 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "", exc); 197 StreamUtil.writeTo(1, nos); 198 StreamUtil.writeTo(exc.getMessage(), nos); 199 nos.send(); 200 return; 201 } 202 203 IOControl ioctrl; 204 AckedQueue replyQueue; 205 if (key == -1) { 206 OpenConnectionNot ocn = new OpenConnectionNot(true, heartBeat); 207 ocn.invoke(proxyId); 208 StreamUtil.writeTo(0, nos); 209 ReliableConnectionContext ctx = 210 (ReliableConnectionContext)ocn.getConnectionContext(); 211 key = ctx.getKey(); 212 StreamUtil.writeTo(ctx.getKey(), nos); 213 nos.send(); 214 replyQueue = (AckedQueue) ctx.getQueue(); 215 ioctrl = new IOControl(sock); 216 } else { 217 GetConnectionNot gcn = new GetConnectionNot(key); 218 try { 219 gcn.invoke(proxyId); 220 } catch (Exception exc) { 221 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 222 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "", exc); 223 StreamUtil.writeTo(1, nos); 224 StreamUtil.writeTo(exc.getMessage(), nos); 225 nos.send(); 226 return; 227 } 228 ReliableConnectionContext ctx = 229 (ReliableConnectionContext)gcn.getConnectionContext(); 230 replyQueue = ctx.getQueue(); 231 heartBeat = ctx.getHeartBeat(); 232 StreamUtil.writeTo(0, nos); 233 nos.send(); 234 ioctrl = new IOControl(sock, ctx.getInputCounter()); 235 236 TcpConnection tcpConnection = proxyService.getConnection(proxyId, key); 237 if (tcpConnection != null) { 238 tcpConnection.close(); 239 } 240 } 241 242 sock.setSoTimeout(0); 245 246 TcpConnection tcpConnection = new TcpConnection(ioctrl, proxyId, 247 replyQueue, key, proxyService, heartBeat == 0); 248 tcpConnection.start(); 249 } catch (Exception exc) { 250 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 251 JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "", exc); 252 sock.close(); 253 throw exc; 254 } 255 } 256 257 protected void shutdown() { 258 close(); 259 } 260 261 protected void close() { 262 try { 263 if (serverSocket != null) 264 serverSocket.close(); 265 } catch (IOException exc) {} 266 serverSocket = null; 267 } 268 } 269 | Popular Tags |