KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > net > core > TCConnectionJDK14


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.net.core;
6
7 import com.tc.bytes.TCByteBuffer;
8 import com.tc.net.NIOWorkarounds;
9 import com.tc.net.TCSocketAddress;
10 import com.tc.net.core.event.TCConnectionEventListener;
11 import com.tc.net.protocol.TCNetworkMessage;
12 import com.tc.net.protocol.TCProtocolAdaptor;
13 import com.tc.util.Assert;
14 import com.tc.util.TCTimeoutException;
15 import com.tc.util.concurrent.SetOnceFlag;
16
17 import java.io.IOException JavaDoc;
18 import java.net.InetSocketAddress JavaDoc;
19 import java.net.Socket JavaDoc;
20 import java.net.SocketException JavaDoc;
21 import java.net.SocketTimeoutException JavaDoc;
22 import java.nio.ByteBuffer JavaDoc;
23 import java.nio.channels.GatheringByteChannel JavaDoc;
24 import java.nio.channels.ScatteringByteChannel JavaDoc;
25 import java.nio.channels.SocketChannel JavaDoc;
26 import java.util.LinkedList JavaDoc;
27
28 /**
29  * JDK14 (nio) implementation of TCConnection
30  *
31  * @author teck
32  */

33 final class TCConnectionJDK14 extends AbstractTCConnection implements TCJDK14ChannelReader, TCJDK14ChannelWriter {
34   private static final long WARN_THRESHOLD = 0x400000L; // 4MB
35
private final SetOnceFlag closed = new SetOnceFlag();
36   private final LinkedList JavaDoc writeContexts = new LinkedList JavaDoc();
37   private final TCCommJDK14 comm;
38   private volatile SocketChannel JavaDoc channel;
39
40   // for creating unconnected client connections
41
TCConnectionJDK14(TCConnectionEventListener listener, TCCommJDK14 comm, TCProtocolAdaptor adaptor,
42                     AbstractTCConnectionManager parent) {
43     this(listener, comm, adaptor, null, parent);
44   }
45
46   TCConnectionJDK14(TCConnectionEventListener listener, TCCommJDK14 comm, TCProtocolAdaptor adaptor, SocketChannel JavaDoc ch,
47                     AbstractTCConnectionManager parent) {
48     super(listener, adaptor, parent);
49
50     Assert.assertNotNull(comm);
51     this.comm = comm;
52     this.channel = ch;
53   }
54
55   protected void closeImpl(Runnable JavaDoc callback) {
56     try {
57       if (channel != null) {
58         comm.cleanupChannel(channel, callback);
59       } else {
60         callback.run();
61       }
62     } finally {
63       synchronized (writeContexts) {
64         // this will blow up if it is set more than once. Super class should not allow this closeImpl() to
65
// be called more than once
66
closed.set();
67
68         writeContexts.clear();
69       }
70     }
71   }
72
73   protected void finishConnect() {
74     Assert.assertNotNull("channel", channel);
75     recordSocketAddress(channel.socket());
76     super.finishConnect();
77   }
78
79   protected void connectImpl(TCSocketAddress addr, int timeout) throws IOException JavaDoc, TCTimeoutException {
80     SocketChannel JavaDoc newSocket = createChannel();
81     newSocket.configureBlocking(true);
82
83     InetSocketAddress JavaDoc inetAddr = new InetSocketAddress JavaDoc(addr.getAddress(), addr.getPort());
84     try {
85       newSocket.socket().connect(inetAddr, timeout);
86     } catch (SocketTimeoutException JavaDoc ste) {
87       comm.cleanupChannel(newSocket, null);
88       throw new TCTimeoutException("Timout of " + timeout + "ms occured connecting to " + addr, ste);
89     }
90
91     channel = newSocket;
92
93     newSocket.configureBlocking(false);
94     comm.requestReadInterest(this, newSocket);
95   }
96
97   private SocketChannel JavaDoc createChannel() throws IOException JavaDoc, SocketException JavaDoc {
98     SocketChannel JavaDoc rv = SocketChannel.open();
99     Socket JavaDoc s = rv.socket();
100
101     // TODO: provide config options for setting any and all socket options
102
s.setSendBufferSize(64 * 1024);
103     s.setReceiveBufferSize(64 * 1024);
104     // s.setReuseAddress(true);
105
s.setTcpNoDelay(true);
106
107     return rv;
108   }
109
110   protected Socket JavaDoc detachImpl() throws IOException JavaDoc {
111     comm.unregister(channel);
112     channel.configureBlocking(true);
113     return channel.socket();
114   }
115
116   protected boolean asynchConnectImpl(TCSocketAddress address) throws IOException JavaDoc {
117     SocketChannel JavaDoc newSocket = createChannel();
118     newSocket.configureBlocking(false);
119
120     InetSocketAddress JavaDoc inetAddr = new InetSocketAddress JavaDoc(address.getAddress(), address.getPort());
121     final boolean connected = newSocket.connect(inetAddr);
122     setConnected(connected);
123
124     channel = newSocket;
125
126     if (!connected) {
127       comm.requestConnectInterest(this, newSocket);
128     }
129
130     return connected;
131   }
132
133   public void doRead(ScatteringByteChannel JavaDoc sbc) {
134     final boolean debug = logger.isDebugEnabled();
135     final TCByteBuffer[] readBuffers = getReadBuffers();
136
137     int bytesRead = 0;
138     boolean readEOF = false;
139     try {
140       // Do the read in a loop, instead of calling read(ByteBuffer[]).
141
// This seems to avoid memory leaks on sun's 1.4.2 JDK
142
for (int i = 0, n = readBuffers.length; i < n; i++) {
143         ByteBuffer buf = extractNioBuffer(readBuffers[i]);
144
145         if (buf.hasRemaining()) {
146           final int read = sbc.read(buf);
147
148           if (-1 == read) {
149             // Normal EOF
150
readEOF = true;
151             break;
152           }
153
154           if (0 == read) {
155             break;
156           }
157
158           bytesRead += read;
159
160           if (buf.hasRemaining()) {
161             // don't move on to the next buffer if we didn't fill the current one
162
break;
163           }
164         }
165       }
166     } catch (IOException JavaDoc ioe) {
167       if (logger.isInfoEnabled()) {
168         logger.info("error reading from channel " + channel.toString() + ": " + ioe.getMessage());
169       }
170
171       fireErrorEvent(ioe, null);
172       return;
173     }
174
175     if (readEOF) {
176       if (bytesRead > 0) {
177         addNetworkData(readBuffers, bytesRead);
178       }
179
180       if (debug) logger.debug("EOF read on connection " + channel.toString());
181
182       fireEndOfFileEvent();
183       return;
184     }
185
186     Assert.eval(bytesRead >= 0);
187
188     if (debug) logger.debug("Read " + bytesRead + " bytes on connection " + channel.toString());
189
190     addNetworkData(readBuffers, bytesRead);
191   }
192
193   public void doWrite(GatheringByteChannel JavaDoc gbc) {
194     final boolean debug = logger.isDebugEnabled();
195
196     // get a copy of the current write contexts. Since we call out to event/error handlers in the write
197
// loop below, we don't want to be holding the lock on the writeContexts queue
198
final WriteContext contextsToWrite[];
199     synchronized (writeContexts) {
200       if (closed.isSet()) { return; }
201       contextsToWrite = (WriteContext[]) writeContexts.toArray(new WriteContext[writeContexts.size()]);
202     }
203
204     int contextsToRemove = 0;
205     for (int index = 0, n = contextsToWrite.length; index < n; index++) {
206       final WriteContext context = contextsToWrite[index];
207       final ByteBuffer[] buffers = context.clonedData;
208
209       long bytesWritten = 0;
210       try {
211         // Do the write in a loop, instead of calling write(ByteBuffer[]).
212
// This seems to avoid memory leaks on sun's 1.4.2 JDK
213
for (int i = context.index, nn = buffers.length; i < nn; i++) {
214           final ByteBuffer buf = buffers[i];
215           final int written = gbc.write(buf);
216
217           if (written == 0) {
218             break;
219           }
220
221           bytesWritten += written;
222
223           if (buf.hasRemaining()) {
224             break;
225           } else {
226             context.incrementIndex();
227           }
228         }
229       } catch (IOException JavaDoc ioe) {
230         if (NIOWorkarounds.windowsWritevWorkaround(ioe)) {
231           break;
232         }
233
234         fireErrorEvent(ioe, context.message);
235       }
236
237       if (debug) logger.debug("Wrote " + bytesWritten + " bytes on connection " + channel.toString());
238
239       if (context.done()) {
240         contextsToRemove++;
241         if (debug) logger.debug("Complete message sent on connection " + channel.toString());
242         context.writeComplete();
243       } else {
244         if (debug) logger.debug("Message not yet completely sent on connection " + channel.toString());
245         break;
246       }
247     }
248
249     synchronized (writeContexts) {
250       if (closed.isSet()) { return; }
251
252       for (int i = 0; i < contextsToRemove; i++) {
253         writeContexts.removeFirst();
254       }
255
256       if (writeContexts.isEmpty()) {
257         comm.removeWriteInterest(this, channel);
258       }
259     }
260   }
261
262   static private long bytesRemaining(ByteBuffer[] buffers) {
263     long rv = 0;
264     for (int i = 0, n = buffers.length; i < n; i++) {
265       rv += buffers[i].remaining();
266     }
267     return rv;
268   }
269
270   static private ByteBuffer[] extractNioBuffers(TCByteBuffer[] src) {
271     ByteBuffer[] rv = new ByteBuffer[src.length];
272     for (int i = 0, n = src.length; i < n; i++) {
273       rv[i] = (ByteBuffer) src[i].getNioBuffer();
274     }
275
276     return rv;
277   }
278
279   static private ByteBuffer extractNioBuffer(TCByteBuffer buffer) {
280     return (ByteBuffer) buffer.getNioBuffer();
281   }
282
283   protected void putMessageImpl(TCNetworkMessage message) {
284     // ??? Does the message queue and the WriteContext belong in the base connection class?
285
final boolean debug = logger.isDebugEnabled();
286
287     final WriteContext context = new WriteContext(message);
288
289     final long bytesToWrite = bytesRemaining(context.clonedData);
290     if (bytesToWrite >= TCConnectionJDK14.WARN_THRESHOLD) {
291       logger.warn("Warning: Attempting to send a messaage of size " + bytesToWrite + " bytes");
292     }
293
294     // TODO: outgoing queue should not be unbounded size!
295
final boolean newData;
296     final int msgCount;
297     synchronized (writeContexts) {
298       if (closed.isSet()) { return; }
299
300       writeContexts.addLast(context);
301       msgCount = writeContexts.size();
302       newData = (msgCount == 1);
303     }
304
305     if (debug) {
306       logger.debug("Connection (" + channel.toString() + ") has " + msgCount + " messages queued");
307     }
308
309     if (newData) {
310       if (debug) {
311         logger.debug("New message on connection, registering for write interest");
312       }
313
314       // NOTE: this might be the very first message on the socket and
315
// given the current implementation, it isn't necessarily
316
// safe to assume one can write to the channel. Long story
317
// short, always enqueue the message and wait until it is selected
318
// for write interest.
319

320       // If you're trying to optimize for performance by letting the calling thread do the
321
// write, we need to add more logic to connection setup. Specifically, you need register
322
// for, as well as actually be selected for, write interest immediately
323
// after finishConnect(). Only after this selection occurs it is always safe to try
324
// to write.
325

326       comm.requestWriteInterest(this, channel);
327     }
328   }
329
330   private static class WriteContext {
331     private final TCNetworkMessage message;
332     private final ByteBuffer[] clonedData;
333     private int index = 0;
334
335     WriteContext(TCNetworkMessage message) {
336       this.message = message;
337
338       final ByteBuffer[] msgData = extractNioBuffers(message.getEntireMessageData());
339       this.clonedData = new ByteBuffer[msgData.length];
340
341       for (int i = 0; i < msgData.length; i++) {
342         clonedData[i] = msgData[i].duplicate().asReadOnlyBuffer();
343       }
344     }
345
346     boolean done() {
347       for (int i = index, n = clonedData.length; i < n; i++) {
348         if (clonedData[i].hasRemaining()) { return false; }
349       }
350
351       return true;
352     }
353
354     void incrementIndex() {
355       clonedData[index] = null;
356       index++;
357     }
358
359     void writeComplete() {
360       this.message.wasSent();
361     }
362   }
363
364 }
365
Popular Tags