1 23 package org.objectweb.joram.mom.proxies.tcp; 24 25 import java.io.*; 26 import java.util.*; 27 import java.net.*; 28 29 import org.objectweb.joram.shared.client.AbstractJmsMessage; 30 import org.objectweb.joram.shared.client.AbstractJmsRequest; 31 import org.objectweb.joram.mom.proxies.*; 32 import org.objectweb.joram.shared.stream.StreamUtil; 33 34 import fr.dyade.aaa.util.Debug; 35 import org.objectweb.util.monolog.api.BasicLevel; 36 import org.objectweb.util.monolog.api.Logger; 37 38 public class IOControl { 39 public static Logger logger = Debug.getLogger(IOControl.class.getName()); 40 41 private long inputCounter; 42 43 private Socket sock; 44 45 private NetOutputStream nos; 46 47 private BufferedInputStream bis; 48 49 private int windowSize; 50 51 private int unackCounter; 52 53 public IOControl(Socket sock) throws IOException { 54 this(sock, -1); 55 } 56 57 public IOControl(Socket sock, 58 long inputCounter) throws IOException { 59 windowSize = Integer.getInteger( 60 fr.dyade.aaa.util.ReliableTcpConnection.WINDOW_SIZE_PROP_NAME, 61 fr.dyade.aaa.util.ReliableTcpConnection.DEFAULT_WINDOW_SIZE).intValue(); 62 unackCounter = 0; 63 this.inputCounter = inputCounter; 64 this.sock = sock; 65 66 nos = new NetOutputStream(sock); 67 bis = new BufferedInputStream(sock.getInputStream()); 68 } 69 70 public synchronized void send(ProxyMessage msg) throws IOException { 71 if (logger.isLoggable(BasicLevel.DEBUG)) 72 logger.log(BasicLevel.DEBUG, "IOControl.send:" + msg); 73 74 try { 75 nos.send(msg.getId(), msg.getAckId(), msg.getObject()); 76 unackCounter = 0; 77 } catch (IOException exc) { 78 if (logger.isLoggable(BasicLevel.DEBUG)) 79 logger.log(BasicLevel.DEBUG, "IOControl.send", exc); 80 close(); 81 throw exc; 82 } 83 } 84 85 static class NetOutputStream extends ByteArrayOutputStream { 86 private OutputStream os = null; 87 88 NetOutputStream(Socket sock) throws IOException { 89 super(1024); 90 reset(); 91 os = sock.getOutputStream(); 92 } 93 94 public void reset() { 95 count = 4; 96 } 97 98 void send(long id, long ackId, AbstractJmsMessage msg) throws IOException { 99 try { 100 StreamUtil.writeTo(id, this); 101 StreamUtil.writeTo(ackId, this); 102 AbstractJmsMessage.write(msg, this); 103 104 buf[0] = (byte) ((count -4) >>> 24); 105 buf[1] = (byte) ((count -4) >>> 16); 106 buf[2] = (byte) ((count -4) >>> 8); 107 buf[3] = (byte) ((count -4) >>> 0); 108 109 writeTo(os); 110 os.flush(); 111 } finally { 112 reset(); 113 } 114 } 115 } 116 117 public ProxyMessage receive() throws Exception { 118 if (logger.isLoggable(BasicLevel.DEBUG)) 119 logger.log(BasicLevel.DEBUG, "IOControl.receive()"); 120 121 try { 122 while (true) { 123 int len = StreamUtil.readIntFrom(bis); 124 long messageId = StreamUtil.readLongFrom(bis); 125 long ackId = StreamUtil.readLongFrom(bis); 126 AbstractJmsRequest obj = (AbstractJmsRequest) AbstractJmsMessage.read(bis); 127 128 if (messageId > inputCounter) { 129 inputCounter = messageId; 130 synchronized (this) { 131 if (unackCounter < windowSize) { 132 unackCounter++; 133 } else { 134 send(new ProxyMessage(-1, messageId, null)); 135 } 136 } 137 return new ProxyMessage(messageId, ackId, obj); 138 } else { 139 logger.log(BasicLevel.DEBUG, 140 "IOControl.receive: already received message: " + messageId + " -> " + obj); 141 } 142 } 143 } catch (IOException exc) { 144 if (logger.isLoggable(BasicLevel.DEBUG)) 145 logger.log(BasicLevel.DEBUG, "IOControl.receive", exc); 146 close(); 147 throw exc; 148 } 149 } 150 151 public void close() { 152 if (logger.isLoggable(BasicLevel.DEBUG)) 153 logger.log(BasicLevel.DEBUG, "IOControl.close()"); 154 155 try { 156 if (bis != null) bis.close(); 157 bis = null; 158 } catch (IOException exc) {} 159 try { 160 if (sock != null) sock.getOutputStream().close(); 161 } catch (IOException exc) {} 162 try { 163 if (sock != null) sock.close(); 164 sock = null; 165 } catch (IOException exc) {} 166 } 167 } 168 | Popular Tags |