1 24 package org.objectweb.joram.client.jms.tcp; 25 26 import java.io.*; 27 import java.net.*; 28 import java.util.*; 29 import java.util.Timer ; 30 31 import javax.jms.*; 32 import javax.jms.IllegalStateException ; 33 34 import org.objectweb.joram.shared.client.AbstractJmsMessage; 35 import org.objectweb.joram.client.jms.FactoryParameters; 36 import org.objectweb.joram.shared.stream.StreamUtil; 37 38 import fr.dyade.aaa.util.ReliableTcpConnection; 39 40 import fr.dyade.aaa.util.Debug; 41 import org.objectweb.util.monolog.api.BasicLevel; 42 import org.objectweb.util.monolog.api.Logger; 43 44 public class ReliableTcpClient { 45 public static Logger logger = Debug.getLogger(ReliableTcpClient.class.getName()); 46 47 public static final int INIT = 0; 48 public static final int CONNECT = 1; 49 public static final int CLOSE = 2; 50 51 public static final String [] statusNames = 52 {"INIT", "CONNECT", "CLOSE"}; 53 54 private FactoryParameters params; 55 56 private String name; 57 58 private String password; 59 60 private int key; 61 62 private ReliableTcpConnection connection; 63 64 private volatile int status; 65 66 private Vector addresses; 67 72 private boolean reconnect; 73 77 private int reconnectTimeout = 0; 78 79 private Timer timer; 80 81 public ReliableTcpClient() {} 82 83 public void setTimer(Timer timer2) { 84 timer = timer2; 85 } 86 87 public void init(FactoryParameters params, 88 String name, 89 String password, 90 boolean reconnect) { 91 if (logger.isLoggable(BasicLevel.DEBUG)) 92 logger.log(BasicLevel.DEBUG, 93 "ReliableTcpClient.init(" + params + ',' + name + ',' + password + ',' + reconnect + ')'); 94 95 this.params = params; 96 this.name = name; 97 this.password = password; 98 this.reconnect = reconnect; 99 if (params.cnxPendingTimer > 0) 100 this.reconnectTimeout = 101 Math.max(2*params.cnxPendingTimer, 102 (params.connectingTimer*1000)+params.cnxPendingTimer); 103 addresses = new Vector(); 104 key = -1; 105 setStatus(INIT); 106 } 107 108 private void setStatus(int status) { 109 if (logger.isLoggable(BasicLevel.DEBUG)) 110 logger.log(BasicLevel.DEBUG, 111 "ReliableTcpClient[" + name + ',' + key + "].setStatus(" + statusNames[status] + ')'); 112 this.status = status; 113 } 114 115 public void connect() throws JMSException { 116 connect(false); 117 } 118 119 public synchronized void connect(boolean reconnect) throws JMSException { 120 if (logger.isLoggable(BasicLevel.DEBUG)) 121 logger.log(BasicLevel.DEBUG, 122 "ReliableTcpClient[" + name + ',' + key + "].connect(" + reconnect + ')'); 123 124 if (status != INIT) 125 throw new IllegalStateException ("Connect: state error"); 126 127 long startTime = System.currentTimeMillis(); 128 long endTime = startTime; 129 if (addresses.size() > 1) { 130 endTime = Long.MAX_VALUE; 132 } else { 133 if (reconnect) { 134 endTime += reconnectTimeout; 135 } else { 136 endTime += params.connectingTimer * 1000L; 137 } 138 } 139 140 int attemptsC = 0; 141 long nextSleep = 100; 142 while (true) { 143 if (status == CLOSE) 144 throw new IllegalStateException ("Closed connection"); 145 attemptsC++; 146 for (int i = 0; i < addresses.size(); i++) { 147 ServerAddress sa = (ServerAddress)addresses.elementAt(i); 148 try { 149 doConnect(sa.hostName, sa.port); 150 setStatus(CONNECT); 151 return; 152 } catch (JMSSecurityException exc) { 153 throw exc; 154 } catch (UnknownHostException uhe) { 155 if (logger.isLoggable(BasicLevel.DEBUG)) 156 logger.log(BasicLevel.DEBUG, "ReliableTcpClient.connect", uhe); 157 IllegalStateException jmsExc = 158 new IllegalStateException ("Server's host is unknown: " + sa.hostName); 159 jmsExc.setLinkedException(uhe); 160 throw jmsExc; 161 } catch (IOException ioe) { 162 if (logger.isLoggable(BasicLevel.DEBUG)) 163 logger.log(BasicLevel.DEBUG, "ReliableTcpClient.connect", ioe); 164 } catch (JMSException jmse) { 166 if (logger.isLoggable(BasicLevel.DEBUG)) 167 logger.log(BasicLevel.DEBUG, "ReliableTcpClient.connect", jmse); 168 } catch (Exception e) { 170 if (logger.isLoggable(BasicLevel.DEBUG)) 171 logger.log(BasicLevel.DEBUG, "ReliableTcpClient.connect", e); 172 } 174 } 175 long currentTime = System.currentTimeMillis(); 176 177 if (logger.isLoggable(BasicLevel.DEBUG)) 178 logger.log(BasicLevel.DEBUG, 179 " -> currentTime = " + currentTime + ",endTime = " + endTime); 180 181 if (currentTime < endTime) { 183 if (logger.isLoggable(BasicLevel.DEBUG)) 184 logger.log(BasicLevel.DEBUG, 185 " -> retry connection " + name + ',' + key); 186 187 if (currentTime + nextSleep > endTime) { 188 nextSleep = endTime - currentTime; 189 } 190 191 try { 193 wait(nextSleep); 194 } catch (InterruptedException intExc) { 195 IllegalStateException jmsExc = 196 new IllegalStateException ("Could not open the connection with " 197 + addresses + ": interrupted"); 198 } 199 200 nextSleep = nextSleep * 2; 202 } else { 203 if (logger.isLoggable(BasicLevel.DEBUG)) 204 logger.log(BasicLevel.DEBUG, 205 " -> close connection " + name + ',' + key); 206 207 long attemptsT = (System.currentTimeMillis() - startTime) / 1000; 209 IllegalStateException jmsExc = 210 new IllegalStateException ("Could not connect to JMS server with " 211 + addresses 212 + " after " + attemptsC 213 + " attempts during " 214 + attemptsT + " secs: server is" 215 + " not listening" ); 216 throw jmsExc; 217 } 218 } 219 } 220 221 protected Socket createSocket(String hostName, int port) 222 throws Exception { 223 return new Socket(hostName, port); 224 } 225 226 private void doConnect(String hostName, int port) 227 throws Exception , JMSException { 228 if (logger.isLoggable(BasicLevel.DEBUG)) 229 logger.log(BasicLevel.DEBUG, 230 "ReliableTcpClient[" + name + ',' + key + "].doConnect(" + hostName + ',' + port + ')'); 231 232 Socket socket = createSocket(hostName, port); 233 socket.setTcpNoDelay(true); 234 socket.setSoTimeout(0); 235 socket.setSoLinger(true, 1000); 236 237 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 238 OutputStream os = socket.getOutputStream(); 239 InputStream is = socket.getInputStream(); 240 241 if (logger.isLoggable(BasicLevel.DEBUG)) 242 logger.log(BasicLevel.DEBUG, " -> write name = " + name); 243 StreamUtil.writeTo(name, baos); 244 if (logger.isLoggable(BasicLevel.DEBUG)) 245 logger.log(BasicLevel.DEBUG, " -> write password = " + password); 246 StreamUtil.writeTo(password, baos); 247 if (logger.isLoggable(BasicLevel.DEBUG)) 248 logger.log(BasicLevel.DEBUG, " -> write key = " + key); 249 StreamUtil.writeTo(key, baos); 250 251 if (key == -1) { 252 if (logger.isLoggable(BasicLevel.DEBUG)) 253 logger.log(BasicLevel.DEBUG, " -> open new connection"); 254 StreamUtil.writeTo(reconnectTimeout, baos); 255 StreamUtil.writeTo(baos.size(), os); 256 baos.writeTo(os); 257 os.flush(); 258 259 int len = StreamUtil.readIntFrom(is); 260 int res = StreamUtil.readIntFrom(is); 261 if (res > 0) { 262 String info = StreamUtil.readStringFrom(is); 263 throwSecurityError(info); 264 } 265 266 key = StreamUtil.readIntFrom(is); 267 if (logger.isLoggable(BasicLevel.DEBUG)) 268 logger.log(BasicLevel.DEBUG, " -> key = " + name + ',' + key); 269 connection = new ReliableTcpConnection(timer); 270 if (logger.isLoggable(BasicLevel.DEBUG)) 271 logger.log(BasicLevel.DEBUG, " -> init reliable connection"); 272 } else { 273 if (logger.isLoggable(BasicLevel.DEBUG)) 274 logger.log(BasicLevel.DEBUG, " -> reopen connection " + name + ',' + key); 275 StreamUtil.writeTo(baos.size(), os); 276 baos.writeTo(os); 277 os.flush(); 278 279 int len = StreamUtil.readIntFrom(is); 280 int res = StreamUtil.readIntFrom(is); 281 if (logger.isLoggable(BasicLevel.DEBUG)) 282 logger.log(BasicLevel.DEBUG, " -> read res = " + res); 283 if (res > 0) { 284 String info = StreamUtil.readStringFrom(is); 285 throwSecurityError(info); 286 } 287 288 if (logger.isLoggable(BasicLevel.DEBUG)) 289 logger.log(BasicLevel.DEBUG, " -> reset reliable connection"); 290 } 291 292 connection.init(socket); 293 } 294 295 private void throwSecurityError(String info) 296 throws JMSSecurityException { 297 JMSSecurityException jmsExc = 298 new JMSSecurityException("Can't open the connection with the server " + 299 params.getHost() + " on port " + 300 params.getPort() + ": " + info); 301 throw jmsExc; 302 } 303 304 public void send(AbstractJmsMessage request) 305 throws Exception { 306 if (logger.isLoggable(BasicLevel.DEBUG)) 307 logger.log( BasicLevel.DEBUG, 308 "ReliableTcpClient[" + name + ',' + key + "].send(" + request + ')'); 309 if (status == CLOSE) throw new IOException("Closed connection"); 310 if (status != CONNECT) { 311 if (reconnect) waitForReconnection(); 312 else throw new IOException("Closed connection"); 313 } 314 while (true) { 315 try { 316 connection.send(request); 317 return; 318 } catch (IOException exc) { 319 if (logger.isLoggable(BasicLevel.DEBUG)) 320 logger.log(BasicLevel.DEBUG, 321 "ReliableTcpClient[" + name + ',' + key + "]", exc); 322 if (reconnect) { 323 waitForReconnection(); 324 } else { 325 close(); 326 throw exc; 327 } 328 } 329 } 330 } 331 332 public Object receive() 333 throws Exception { 334 if (logger.isLoggable(BasicLevel.DEBUG)) 335 logger.log(BasicLevel.DEBUG, 336 "ReliableTcpClient[" + name + ',' + key + "].receive()"); 337 while (true) { 338 try { 339 return connection.receive(); 340 } catch (IOException exc) { 341 if (logger.isLoggable(BasicLevel.DEBUG)) 342 logger.log(BasicLevel.DEBUG, 343 "ReliableTcpClient[" + name + ',' + key + "]", exc); 344 if (reconnect) { 345 reconnect(); 346 } else { 347 close(); 348 throw exc; 349 } 350 } 351 } 352 } 353 354 private synchronized void waitForReconnection() throws Exception { 355 if (logger.isLoggable(BasicLevel.DEBUG)) 356 logger.log(BasicLevel.DEBUG, 357 "ReliableTcpClient[" + name + ',' + key + "].waitForReconnection()"); 358 while (status == INIT) { 359 try { 360 wait(); 361 } catch (InterruptedException exc) { 362 } 364 } 365 switch (status) { 366 case CONNECT: 367 break; 368 case CLOSE: 369 throw new Exception ("Connection closed"); 370 } 371 } 372 373 private synchronized void reconnect() throws Exception { 374 if (logger.isLoggable(BasicLevel.DEBUG)) 375 logger.log(BasicLevel.DEBUG, 376 "ReliableTcpClient[" + name + ',' + key + "].reconnect()"); 377 switch (status) { 378 case CONNECT: 379 setStatus(INIT); 380 case INIT: 381 try { 382 connect(true); 383 } catch (JMSException exc) { 384 close(); 385 throw exc; 386 } finally { 387 notifyAll(); 388 } 389 break; 390 case CLOSE: 391 throw new Exception ("Connection closed"); 392 default: 393 throw new Error ("State error"); 394 } 395 } 396 397 public synchronized void close() { 398 if (logger.isLoggable(BasicLevel.DEBUG)) 399 logger.log(BasicLevel.DEBUG, 400 "ReliableTcpClient[" + name + ',' + key + "].close()"); 401 if (status != CLOSE) { 402 setStatus(CLOSE); 403 connection.close(); 404 } 405 } 406 407 public void addServerAddress(String host, int port) { 408 addresses.addElement(new ServerAddress(host, port)); 409 } 410 411 public String toString() { 412 return '(' + super.toString() + ",params=" + params + ",name=" + name + 413 ",password=" + password + ",key=" + key + ",connection=" + connection + 414 ",status=" + statusNames[status] + ",addresses=" + addresses + ')'; 415 } 416 417 static class ServerAddress { 418 String hostName; 419 int port; 420 421 public ServerAddress(String hostName, int port) { 422 this.hostName = hostName; 423 this.port = port; 424 } 425 426 public String toString() { 427 return "(hostName=" + hostName + ",port=" + port + ')'; 428 } 429 } 430 } 431 | Popular Tags |