1 5 package com.tc.net.core; 6 7 import com.tc.bytes.TCByteBuffer; 8 import com.tc.net.NIOWorkarounds; 9 import com.tc.net.TCSocketAddress; 10 import com.tc.net.core.event.TCConnectionEventListener; 11 import com.tc.net.protocol.TCNetworkMessage; 12 import com.tc.net.protocol.TCProtocolAdaptor; 13 import com.tc.util.Assert; 14 import com.tc.util.TCTimeoutException; 15 import com.tc.util.concurrent.SetOnceFlag; 16 17 import java.io.IOException ; 18 import java.net.InetSocketAddress ; 19 import java.net.Socket ; 20 import java.net.SocketException ; 21 import java.net.SocketTimeoutException ; 22 import java.nio.ByteBuffer ; 23 import java.nio.channels.GatheringByteChannel ; 24 import java.nio.channels.ScatteringByteChannel ; 25 import java.nio.channels.SocketChannel ; 26 import java.util.LinkedList ; 27 28 33 final class TCConnectionJDK14 extends AbstractTCConnection implements TCJDK14ChannelReader, TCJDK14ChannelWriter { 34 private static final long WARN_THRESHOLD = 0x400000L; private final SetOnceFlag closed = new SetOnceFlag(); 36 private final LinkedList writeContexts = new LinkedList (); 37 private final TCCommJDK14 comm; 38 private volatile SocketChannel channel; 39 40 TCConnectionJDK14(TCConnectionEventListener listener, TCCommJDK14 comm, TCProtocolAdaptor adaptor, 42 AbstractTCConnectionManager parent) { 43 this(listener, comm, adaptor, null, parent); 44 } 45 46 TCConnectionJDK14(TCConnectionEventListener listener, TCCommJDK14 comm, TCProtocolAdaptor adaptor, SocketChannel ch, 47 AbstractTCConnectionManager parent) { 48 super(listener, adaptor, parent); 49 50 Assert.assertNotNull(comm); 51 this.comm = comm; 52 this.channel = ch; 53 } 54 55 protected void closeImpl(Runnable callback) { 56 try { 57 if (channel != null) { 58 comm.cleanupChannel(channel, callback); 59 } else { 60 callback.run(); 61 } 62 } finally { 63 synchronized (writeContexts) { 64 closed.set(); 67 68 writeContexts.clear(); 69 } 70 } 71 } 72 73 protected void finishConnect() { 74 Assert.assertNotNull("channel", channel); 75 recordSocketAddress(channel.socket()); 76 super.finishConnect(); 77 } 78 79 protected void connectImpl(TCSocketAddress addr, int timeout) throws IOException , TCTimeoutException { 80 SocketChannel newSocket = createChannel(); 81 newSocket.configureBlocking(true); 82 83 InetSocketAddress inetAddr = new InetSocketAddress (addr.getAddress(), addr.getPort()); 84 try { 85 newSocket.socket().connect(inetAddr, timeout); 86 } catch (SocketTimeoutException ste) { 87 comm.cleanupChannel(newSocket, null); 88 throw new TCTimeoutException("Timout of " + timeout + "ms occured connecting to " + addr, ste); 89 } 90 91 channel = newSocket; 92 93 newSocket.configureBlocking(false); 94 comm.requestReadInterest(this, newSocket); 95 } 96 97 private SocketChannel createChannel() throws IOException , SocketException { 98 SocketChannel rv = SocketChannel.open(); 99 Socket s = rv.socket(); 100 101 s.setSendBufferSize(64 * 1024); 103 s.setReceiveBufferSize(64 * 1024); 104 s.setTcpNoDelay(true); 106 107 return rv; 108 } 109 110 protected Socket detachImpl() throws IOException { 111 comm.unregister(channel); 112 channel.configureBlocking(true); 113 return channel.socket(); 114 } 115 116 protected boolean asynchConnectImpl(TCSocketAddress address) throws IOException { 117 SocketChannel newSocket = createChannel(); 118 newSocket.configureBlocking(false); 119 120 InetSocketAddress inetAddr = new InetSocketAddress (address.getAddress(), address.getPort()); 121 final boolean connected = newSocket.connect(inetAddr); 122 setConnected(connected); 123 124 channel = newSocket; 125 126 if (!connected) { 127 comm.requestConnectInterest(this, newSocket); 128 } 129 130 return connected; 131 } 132 133 public void doRead(ScatteringByteChannel sbc) { 134 final boolean debug = logger.isDebugEnabled(); 135 final TCByteBuffer[] readBuffers = getReadBuffers(); 136 137 int bytesRead = 0; 138 boolean readEOF = false; 139 try { 140 for (int i = 0, n = readBuffers.length; i < n; i++) { 143 ByteBuffer buf = extractNioBuffer(readBuffers[i]); 144 145 if (buf.hasRemaining()) { 146 final int read = sbc.read(buf); 147 148 if (-1 == read) { 149 readEOF = true; 151 break; 152 } 153 154 if (0 == read) { 155 break; 156 } 157 158 bytesRead += read; 159 160 if (buf.hasRemaining()) { 161 break; 163 } 164 } 165 } 166 } catch (IOException ioe) { 167 if (logger.isInfoEnabled()) { 168 logger.info("error reading from channel " + channel.toString() + ": " + ioe.getMessage()); 169 } 170 171 fireErrorEvent(ioe, null); 172 return; 173 } 174 175 if (readEOF) { 176 if (bytesRead > 0) { 177 addNetworkData(readBuffers, bytesRead); 178 } 179 180 if (debug) logger.debug("EOF read on connection " + channel.toString()); 181 182 fireEndOfFileEvent(); 183 return; 184 } 185 186 Assert.eval(bytesRead >= 0); 187 188 if (debug) logger.debug("Read " + bytesRead + " bytes on connection " + channel.toString()); 189 190 addNetworkData(readBuffers, bytesRead); 191 } 192 193 public void doWrite(GatheringByteChannel gbc) { 194 final boolean debug = logger.isDebugEnabled(); 195 196 final WriteContext contextsToWrite[]; 199 synchronized (writeContexts) { 200 if (closed.isSet()) { return; } 201 contextsToWrite = (WriteContext[]) writeContexts.toArray(new WriteContext[writeContexts.size()]); 202 } 203 204 int contextsToRemove = 0; 205 for (int index = 0, n = contextsToWrite.length; index < n; index++) { 206 final WriteContext context = contextsToWrite[index]; 207 final ByteBuffer[] buffers = context.clonedData; 208 209 long bytesWritten = 0; 210 try { 211 for (int i = context.index, nn = buffers.length; i < nn; i++) { 214 final ByteBuffer buf = buffers[i]; 215 final int written = gbc.write(buf); 216 217 if (written == 0) { 218 break; 219 } 220 221 bytesWritten += written; 222 223 if (buf.hasRemaining()) { 224 break; 225 } else { 226 context.incrementIndex(); 227 } 228 } 229 } catch (IOException ioe) { 230 if (NIOWorkarounds.windowsWritevWorkaround(ioe)) { 231 break; 232 } 233 234 fireErrorEvent(ioe, context.message); 235 } 236 237 if (debug) logger.debug("Wrote " + bytesWritten + " bytes on connection " + channel.toString()); 238 239 if (context.done()) { 240 contextsToRemove++; 241 if (debug) logger.debug("Complete message sent on connection " + channel.toString()); 242 context.writeComplete(); 243 } else { 244 if (debug) logger.debug("Message not yet completely sent on connection " + channel.toString()); 245 break; 246 } 247 } 248 249 synchronized (writeContexts) { 250 if (closed.isSet()) { return; } 251 252 for (int i = 0; i < contextsToRemove; i++) { 253 writeContexts.removeFirst(); 254 } 255 256 if (writeContexts.isEmpty()) { 257 comm.removeWriteInterest(this, channel); 258 } 259 } 260 } 261 262 static private long bytesRemaining(ByteBuffer[] buffers) { 263 long rv = 0; 264 for (int i = 0, n = buffers.length; i < n; i++) { 265 rv += buffers[i].remaining(); 266 } 267 return rv; 268 } 269 270 static private ByteBuffer[] extractNioBuffers(TCByteBuffer[] src) { 271 ByteBuffer[] rv = new ByteBuffer[src.length]; 272 for (int i = 0, n = src.length; i < n; i++) { 273 rv[i] = (ByteBuffer) src[i].getNioBuffer(); 274 } 275 276 return rv; 277 } 278 279 static private ByteBuffer extractNioBuffer(TCByteBuffer buffer) { 280 return (ByteBuffer) buffer.getNioBuffer(); 281 } 282 283 protected void putMessageImpl(TCNetworkMessage message) { 284 final boolean debug = logger.isDebugEnabled(); 286 287 final WriteContext context = new WriteContext(message); 288 289 final long bytesToWrite = bytesRemaining(context.clonedData); 290 if (bytesToWrite >= TCConnectionJDK14.WARN_THRESHOLD) { 291 logger.warn("Warning: Attempting to send a messaage of size " + bytesToWrite + " bytes"); 292 } 293 294 final boolean newData; 296 final int msgCount; 297 synchronized (writeContexts) { 298 if (closed.isSet()) { return; } 299 300 writeContexts.addLast(context); 301 msgCount = writeContexts.size(); 302 newData = (msgCount == 1); 303 } 304 305 if (debug) { 306 logger.debug("Connection (" + channel.toString() + ") has " + msgCount + " messages queued"); 307 } 308 309 if (newData) { 310 if (debug) { 311 logger.debug("New message on connection, registering for write interest"); 312 } 313 314 320 326 comm.requestWriteInterest(this, channel); 327 } 328 } 329 330 private static class WriteContext { 331 private final TCNetworkMessage message; 332 private final ByteBuffer[] clonedData; 333 private int index = 0; 334 335 WriteContext(TCNetworkMessage message) { 336 this.message = message; 337 338 final ByteBuffer[] msgData = extractNioBuffers(message.getEntireMessageData()); 339 this.clonedData = new ByteBuffer[msgData.length]; 340 341 for (int i = 0; i < msgData.length; i++) { 342 clonedData[i] = msgData[i].duplicate().asReadOnlyBuffer(); 343 } 344 } 345 346 boolean done() { 347 for (int i = index, n = clonedData.length; i < n; i++) { 348 if (clonedData[i].hasRemaining()) { return false; } 349 } 350 351 return true; 352 } 353 354 void incrementIndex() { 355 clonedData[index] = null; 356 index++; 357 } 358 359 void writeComplete() { 360 this.message.wasSent(); 361 } 362 } 363 364 } 365 | Popular Tags |