1 22 package fr.dyade.aaa.util; 23 24 import java.io.*; 25 import java.util.*; 26 import java.net.*; 27 28 import org.objectweb.joram.shared.client.AbstractJmsMessage; 29 import org.objectweb.joram.shared.client.AbstractJmsReply; 30 import org.objectweb.joram.shared.stream.StreamUtil; 31 32 import org.objectweb.util.monolog.api.BasicLevel; 33 import org.objectweb.util.monolog.api.Logger; 34 35 public class ReliableTcpConnection { 36 37 public static final int INIT = 0; 38 public static final int CONNECT = 1; 39 public static final int CONNECTING = 2; 40 41 public static final String [] statusNames = 42 {"INIT", "CONNECT", "CONNECTING"}; 43 44 public static Logger logger = 45 Debug.getLogger("fr.dyade.aaa.util.ReliableTcpConnection"); 46 47 public static String WINDOW_SIZE_PROP_NAME = 48 "fr.dyade.aaa.util.ReliableTcpConnection.windowSize"; 49 50 public static int DEFAULT_WINDOW_SIZE = 100; 51 52 private int windowSize; 53 54 private volatile long inputCounter; 55 56 private long outputCounter; 57 58 private volatile int unackCounter; 59 60 private Vector pendingMessages; 61 62 private Socket sock; 63 64 private NetOutputStream nos; 65 66 private BufferedInputStream bis; 67 68 private Object inputLock; 69 70 private Object outputLock; 71 72 private int status; 73 74 private java.util.Timer timer; 75 76 public ReliableTcpConnection(java.util.Timer timer2) { 77 windowSize = Integer.getInteger( 78 WINDOW_SIZE_PROP_NAME, 79 DEFAULT_WINDOW_SIZE).intValue(); 80 if (logger.isLoggable(BasicLevel.INFO)) 81 logger.log(BasicLevel.INFO, 82 "ReliableTcpConnection.windowSize=" + windowSize); 83 timer = timer2; 84 inputCounter = -1; 85 outputCounter = 0; 86 unackCounter = 0; 87 pendingMessages = new Vector(); 88 inputLock = new Object (); 89 outputLock = new Object (); 90 91 setStatus(INIT); 92 } 93 94 private synchronized void setStatus(int status) { 95 if (logger.isLoggable(BasicLevel.DEBUG)) 96 logger.log(BasicLevel.DEBUG, 97 "ReliableTcpConnection.setStatus(" + statusNames[status] + ')'); 98 this.status = status; 99 } 100 101 private final synchronized int getStatus() { 102 return status; 103 } 104 105 public void init(Socket sock) throws IOException { 106 if (logger.isLoggable(BasicLevel.DEBUG)) 107 logger.log(BasicLevel.DEBUG, "ReliableTcpConnection.init()"); 108 synchronized (this) { 109 if (getStatus() != INIT) 110 throw new IOException("Already connected"); 111 setStatus(CONNECTING); 112 } 113 114 try { 115 this.sock = sock; 116 117 synchronized (outputLock) { 118 nos = new NetOutputStream(sock); 119 120 synchronized (pendingMessages) { 121 for (int i = 0; i < pendingMessages.size(); i++) { 122 TcpMessage pendingMsg = (TcpMessage) pendingMessages.elementAt(i); 123 doSend(pendingMsg.id, inputCounter, pendingMsg.object); 124 } 125 } 126 } 127 128 synchronized (inputLock) { 129 bis = new BufferedInputStream(sock.getInputStream()); 130 } 131 132 setStatus(CONNECT); 133 } catch (IOException exc) { 134 if (logger.isLoggable(BasicLevel.DEBUG)) 135 logger.log(BasicLevel.DEBUG, "", exc); 136 close(); 137 throw exc; 138 } 139 } 140 141 public void send(AbstractJmsMessage request) throws IOException { 142 if (logger.isLoggable(BasicLevel.DEBUG)) 143 logger.log(BasicLevel.DEBUG, 144 "ReliableTcpConnection.send(" + request + ')'); 145 146 if (getStatus() != CONNECT) 147 throw new IOException("Connection closed"); 148 try { 149 synchronized (outputLock) { 150 doSend(outputCounter, inputCounter, request); 151 addPendingMessage(new TcpMessage( 152 outputCounter, request)); 153 outputCounter++; 154 } 155 } catch (IOException exc) { 156 if (logger.isLoggable(BasicLevel.DEBUG)) 157 logger.log(BasicLevel.DEBUG, "ReliableTcpConnection.send()", exc); 158 close(); 159 throw exc; 160 } 161 } 162 163 private void doSend(long id, long ackId, AbstractJmsMessage msg) throws IOException { 164 if (logger.isLoggable(BasicLevel.DEBUG)) 165 logger.log(BasicLevel.DEBUG, 166 "ReliableTcpConnection.doSend(" + id + ',' + ackId + ',' + msg + ')'); 167 synchronized (outputLock) { 168 nos.send(id, ackId, msg); 169 unackCounter = 0; 170 } 171 } 172 173 static class NetOutputStream extends ByteArrayOutputStream { 174 private OutputStream os = null; 175 176 NetOutputStream(Socket sock) throws IOException { 177 super(1024); 178 reset(); 179 os = sock.getOutputStream(); 180 } 181 182 public void reset() { 183 count = 4; 184 } 185 186 void send(long id, long ackId, AbstractJmsMessage msg) throws IOException { 187 try { 188 StreamUtil.writeTo(id, this); 189 StreamUtil.writeTo(ackId, this); 190 AbstractJmsMessage.write(msg, this); 191 192 buf[0] = (byte) ((count -4) >>> 24); 193 buf[1] = (byte) ((count -4) >>> 16); 194 buf[2] = (byte) ((count -4) >>> 8); 195 buf[3] = (byte) ((count -4) >>> 0); 196 197 writeTo(os); 198 os.flush(); 199 } finally { 200 reset(); 201 } 202 } 203 } 204 205 private void addPendingMessage(TcpMessage msg) { 206 if (logger.isLoggable(BasicLevel.DEBUG)) 207 logger.log(BasicLevel.DEBUG, 208 "ReliableTcpConnection.addPendingMessage(" + msg + ')'); 209 synchronized (pendingMessages) { 210 pendingMessages.addElement(msg); 211 } 212 } 213 214 private void ackPendingMessages(long ackId) { 215 if (logger.isLoggable(BasicLevel.DEBUG)) 216 logger.log(BasicLevel.DEBUG, 217 "ReliableTcpConnection.ackPendingMessages(" + ackId + ')'); 218 synchronized (pendingMessages) { 219 while (pendingMessages.size() > 0) { 220 TcpMessage pendingMsg = 221 (TcpMessage)pendingMessages.elementAt(0); 222 if (ackId < pendingMsg.id) { 223 break; 225 } else { 226 pendingMessages.removeElementAt(0); 227 } 228 } 229 } 230 } 231 232 public AbstractJmsReply receive() throws Exception { 233 if (logger.isLoggable(BasicLevel.DEBUG)) 234 logger.log(BasicLevel.DEBUG, 235 "ReliableTcpConnection.receive()"); 236 if (getStatus() != CONNECT) 237 throw new IOException("Connection closed"); 238 loop: 239 while (true) { 240 try { 241 long messageId; 242 long ackId; 243 AbstractJmsReply obj; 244 245 synchronized (inputLock) { 246 int len = StreamUtil.readIntFrom(bis); 247 messageId = StreamUtil.readLongFrom(bis); 248 ackId = StreamUtil.readLongFrom(bis); 249 obj = (AbstractJmsReply) AbstractJmsMessage.read(bis); 250 } 251 if (logger.isLoggable(BasicLevel.DEBUG)) 252 logger.log(BasicLevel.DEBUG, " -> id = " + messageId); 253 ackPendingMessages(ackId); 254 if (obj != null) { 255 if (unackCounter < windowSize) { 256 if (logger.isLoggable(BasicLevel.DEBUG)) 257 logger.log(BasicLevel.DEBUG, " -> unackCounter++"); 258 unackCounter++; 259 } else { 260 if (logger.isLoggable(BasicLevel.DEBUG)) 261 logger.log(BasicLevel.DEBUG, " -> schedule"); 262 AckTimerTask ackTimertask = new AckTimerTask(); 263 timer.schedule(ackTimertask, 0); 264 } 265 if (messageId > inputCounter) { 266 inputCounter = messageId; 267 return obj; 268 } else if (logger.isLoggable(BasicLevel.DEBUG)) 269 logger.log(BasicLevel.DEBUG, 270 " -> already received message: " + messageId + " " + obj); 271 } 272 } catch (IOException exc) { 273 if (logger.isLoggable(BasicLevel.DEBUG)) 274 logger.log(BasicLevel.DEBUG, "", exc); 275 close(); 276 throw exc; 277 } 278 } 279 } 280 281 public void close() { 282 if (logger.isLoggable(BasicLevel.DEBUG)) 283 logger.log(BasicLevel.DEBUG, "ReliableTcpConnection.close()"); 284 if (getStatus() == INIT) 285 return; 286 try { 291 sock.getOutputStream().close(); 292 } catch (IOException exc) {} 293 try { 294 sock.close(); 295 } catch (IOException exc) {} 296 setStatus(INIT); 297 } 298 299 static class TcpMessage { 300 long id; 301 AbstractJmsMessage object; 302 303 TcpMessage(long id, AbstractJmsMessage object) { 304 this.id = id; 305 this.object = object; 306 } 307 308 public String toString() { 309 return '(' + super.toString() + 310 ",id=" + id + 311 ",object=" + object + ')'; 312 } 313 } 314 315 class AckTimerTask extends java.util.TimerTask { 316 public void run() { 317 if (logger.isLoggable(BasicLevel.DEBUG)) 318 logger.log(BasicLevel.DEBUG, "AckTimerTask.run()"); 319 try { 320 doSend(-1, inputCounter, null); 321 cancel(); 322 } catch (IOException exc) { 323 if (logger.isLoggable(BasicLevel.DEBUG)) 324 logger.log(BasicLevel.DEBUG, "", exc); 325 } 326 } 327 } 328 } 329 | Popular Tags |