1 24 25 package org.objectweb.tribe.channel.tcp; 26 27 import java.io.BufferedInputStream ; 28 import java.io.BufferedOutputStream ; 29 import java.io.DataInputStream ; 30 import java.io.DataOutputStream ; 31 import java.io.IOException ; 32 import java.net.InetSocketAddress ; 33 import java.net.Socket ; 34 35 import org.objectweb.tribe.channel.AbstractReliableFifoChannel; 36 import org.objectweb.tribe.common.Address; 37 import org.objectweb.tribe.common.IpAddress; 38 import org.objectweb.tribe.exceptions.ChannelException; 39 import org.objectweb.tribe.exceptions.NotConnectedException; 40 import org.objectweb.tribe.messages.ChannelMessage; 41 42 48 public class TcpChannel extends AbstractReliableFifoChannel 49 { 50 private Socket socket; 52 private IpAddress destinationAddress = null; 53 private IpAddress sourceAddress = null; 54 private DataInputStream inStream; 55 private DataOutputStream outStream; 56 private boolean isClosed; 57 58 64 public TcpChannel() throws IOException 65 { 66 socket = new Socket (); 67 sourceAddress = new IpAddress(socket.getLocalAddress(), socket 68 .getLocalPort()); 69 initializeStreams(); 70 } 71 72 79 public TcpChannel(IpAddress sourceAddress) throws IOException 80 { 81 socket = new Socket (sourceAddress.getAddress(), sourceAddress.getPort(), 82 true); 83 this.sourceAddress = sourceAddress; 84 initializeStreams(); 85 } 86 87 93 public TcpChannel(Socket socket) throws IOException 94 { 95 this.socket = socket; 96 sourceAddress = new IpAddress(socket.getLocalAddress(), socket 97 .getLocalPort()); 98 destinationAddress = new IpAddress(socket.getInetAddress(), socket 99 .getPort()); 100 initializeStreams(); 101 } 102 103 109 private void initializeStreams() throws IOException 110 { 111 socket.setTcpNoDelay(false); 112 isClosed = false; 113 } 114 115 118 public synchronized void send(ChannelMessage msg) throws ChannelException, 119 NotConnectedException 120 { 121 if (isClosed || (socket == null)) 123 throw new NotConnectedException(); 124 if (outStream == null) 125 { try 127 { 128 outStream = new DataOutputStream (new BufferedOutputStream (socket 129 .getOutputStream())); 130 } 131 catch (IOException e1) 132 { 133 throw new ChannelException("Unable to create output stream", e1); 134 } 135 } 136 137 try 139 { 140 byte[] msgInBytes = msg.getByteArray(); 141 outStream.writeInt(msgInBytes.length); 144 outStream.write(msgInBytes); 145 outStream.flush(); 146 } 147 catch (IOException e) 148 { 149 throw new ChannelException("Error while sending message on socket", e); 150 } 151 } 152 153 156 public void close() throws ChannelException 157 { 158 if (isClosed) 159 return; 160 try 161 { 162 socket.close(); 163 } 164 catch (IOException e) 165 { 166 throw new ChannelException("Error while closing the socket", e); 167 } 168 finally 169 { 170 isClosed = true; 171 } 172 } 173 174 177 public void connect(Address destination) throws ChannelException 178 { 179 if (!(destination instanceof IpAddress)) 180 throw new ChannelException("TCP Channels require IP addresses."); 181 destinationAddress = (IpAddress) destination; 182 try 183 { 184 socket.connect(new InetSocketAddress (destinationAddress.getAddress(), 185 destinationAddress.getPort())); 186 } 187 catch (IOException e) 188 { 189 inStream = null; 190 outStream = null; 191 throw new ChannelException("Error while connecting the socket", e); 192 } 193 } 194 195 198 public Address getDestinationAddress() 199 { 200 return destinationAddress; 201 } 202 203 206 public Address getSourceAddress() 207 { 208 return sourceAddress; 209 } 210 211 217 protected DataInputStream getInStream() 218 { 219 if (socket == null) 220 return null; 221 if (inStream == null) 222 try 223 { 224 inStream = new DataInputStream (new BufferedInputStream (socket 225 .getInputStream())); 226 } 227 catch (IOException e) 228 { 229 return null; 230 } 231 232 return inStream; 233 } 234 } | Popular Tags |