1 21 package com.presumo.jms.plugin.implementation.transport.tcp; 22 23 import java.net.Socket ; 24 import java.net.InetAddress ; 25 import java.io.InputStream ; 26 import java.io.OutputStream ; 27 import java.io.BufferedOutputStream ; 28 import java.io.BufferedInputStream ; 29 import java.io.DataInputStream ; 30 import java.io.DataOutputStream ; 31 import java.io.IOException ; 32 33 import com.presumo.jms.message.JmsMessage; 34 import com.presumo.jms.message.MessageEncoder; 35 import com.presumo.jms.resources.Resources; 36 import com.presumo.util.log.Logger; 37 import com.presumo.util.log.LoggerFactory; 38 39 import com.presumo.jms.plugin.transport.Transport; 40 41 42 45 public class TransportImpl implements Transport 46 { 47 private final static int CLOSE_SOCKET = -45; private final static int CLOSE_ACK = -46; 49 50 private final Socket socket; 51 private final DataInputStream in; 52 private final DataOutputStream out; 53 54 private final Object receiveLock = new String ("TransportImpl_Receive_Lock"); 55 56 private boolean closed = false; 57 58 private String host; 59 private int port; 60 61 65 public TransportImpl(String host, int port) 66 throws IOException 67 { 68 logger.entry("TransportImpl<init>", host, new Integer (port)); 69 70 this.host = host; 71 this.port = port; 72 InetAddress server = InetAddress.getByName(host); 73 this.socket = new Socket (server, port); 74 this.socket.setSoLinger(false, 0); 75 this.in = new DataInputStream (socket.getInputStream()); 76 this.out = new DataOutputStream (new BufferedOutputStream (socket.getOutputStream(), 77 5024)); 78 79 80 logger.exit("TransportImpl<init>"); 81 } 82 83 87 TransportImpl(Socket clientSocket) throws IOException 88 { 89 logger.entry("TransportImple<init>(Socket)"); 90 this.host = clientSocket.getInetAddress().getHostName(); 91 this.port = clientSocket.getPort(); 92 this.socket = clientSocket; 93 this.in = new DataInputStream (clientSocket.getInputStream()); 94 this.out = new DataOutputStream (new BufferedOutputStream (clientSocket.getOutputStream(), 95 5024)); 96 logger.info("PJMSI5002", host + ":" +port); 97 logger.exit("TransportImpl<init>(Socket"); 98 } 99 100 public String getRemoteID() 101 { 102 return host + ":" + port; 103 } 104 105 public synchronized void sendMessages(JmsMessage [] messages) 106 throws IOException 107 { 108 if (closed) 109 throw new IOException ("Socket closed"); 110 111 out.writeInt(messages.length); 112 MessageEncoder.encode(messages, out); 113 out.flush(); 114 } 115 116 public JmsMessage [] receiveMessages() 117 throws IOException 118 { 119 if (closed) 120 throw new IOException ("Socket closed"); 121 JmsMessage [] retval = null; 122 synchronized (receiveLock) { 123 try { 124 int size = in.readInt(); 125 if (size < 0) { 126 if (size == CLOSE_SOCKET) { 127 logger.debug("remote close socket request received"); 128 closeSocket(); 129 throw new IOException ("Socket closed"); 130 } if (size == CLOSE_ACK) { 131 logger.debug("close ACK received"); 132 closed = true; 133 socket.close(); 134 throw new IOException ("Socket closed"); 135 } 136 else 137 throw new IOException ("Input stream corrupted"); 138 } 139 retval = new JmsMessage[size]; 140 for (int i=0; i < size; ++i) 141 retval[i] = MessageEncoder.decode(in); 142 143 } catch (IOException ioe) { 144 if (!closed) { 145 closed = true; 146 try { socket.close(); } catch (IOException ioe2) {} 147 } 148 throw ioe; 149 } 150 } 151 return retval; 152 } 153 154 155 158 public synchronized void close() 159 { 160 if (! closed) { 161 closed = true; 162 logger.debug("Initiating close"); 163 try { 164 out.writeInt(CLOSE_SOCKET); 165 out.flush(); 166 logger.debug("Waiting for close acknowledgement"); 167 waitForAckAndClose(); 168 } catch (IOException ioe) { 169 ioe.printStackTrace(); 170 } 171 } 172 } 173 174 private void waitForAckAndClose() 175 { 176 try { 177 while (true) 178 receiveMessages(); 179 } catch (IOException ioe) {} 180 } 181 182 private void closeSocket() throws IOException 183 { 184 logger.debug("Sending close acknowledgement"); 185 out.writeInt(CLOSE_ACK); 186 out.flush(); 187 int bytesread = in.read(); 188 while (bytesread != -1) { 189 logger.debug("Did not reach end of stream yet"); 190 bytesread = in.read(); 191 } 192 closed = true; 193 logger.debug("Closing the socket after sending ACK"); 194 socket.close(); 195 } 196 197 private static Logger logger = 199 LoggerFactory.getLogger(TransportImpl.class, Resources.getBundle()); 200 } 202 | Popular Tags |