KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > net > core > AbstractTCConnection


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
3  */

4 package com.tc.net.core;
5
6 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
7 import EDU.oswego.cs.dl.util.concurrent.Latch;
8 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
9 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
10
11 import com.tc.bytes.TCByteBuffer;
12 import com.tc.logging.TCLogger;
13 import com.tc.logging.TCLogging;
14 import com.tc.net.TCSocketAddress;
15 import com.tc.net.core.event.TCConnectionErrorEvent;
16 import com.tc.net.core.event.TCConnectionEvent;
17 import com.tc.net.core.event.TCConnectionEventListener;
18 import com.tc.net.protocol.TCNetworkMessage;
19 import com.tc.net.protocol.TCProtocolAdaptor;
20 import com.tc.util.Assert;
21 import com.tc.util.TCTimeoutException;
22 import com.tc.util.concurrent.SetOnceFlag;
23 import com.tc.util.concurrent.SetOnceRef;
24
25 import java.io.IOException JavaDoc;
26 import java.net.Socket JavaDoc;
27 import java.util.Date JavaDoc;
28 import java.util.Iterator JavaDoc;
29 import java.util.List JavaDoc;
30
31 /**
32  * TCConnection implementation that is <b>not </b> specific to a particular IO library and/or JDK release
33  *
34  * @author teck
35  */

36 abstract class AbstractTCConnection implements TCConnection {
37
38   AbstractTCConnection(TCConnectionEventListener listener, TCProtocolAdaptor adaptor, AbstractTCConnectionManager parent) {
39     Assert.assertNotNull(parent);
40     Assert.assertNotNull(adaptor);
41
42     this.parent = parent;
43     this.protocolAdaptor = adaptor;
44
45     if (listener != null) addListener(listener);
46
47     staticEvent = new TCConnectionEvent() {
48       public TCConnection getSource() {
49         return AbstractTCConnection.this;
50       }
51
52       public String JavaDoc toString() {
53         return AbstractTCConnection.this.toString();
54       }
55     };
56
57     eventFlags[CONNECT] = new SetOnceFlag();
58     eventFlags[EOF] = new SetOnceFlag();
59     eventFlags[ERROR] = new SetOnceFlag();
60     eventFlags[CLOSE] = new SetOnceFlag();
61
62     Assert.assertNoNullElements(eventFlags);
63   }
64
65   public final void asynchClose() {
66     if (closed.attemptSet()) {
67       closeImpl(createCloseCallback(null));
68     }
69   }
70
71   public final boolean close(final long timeout) {
72     if (timeout <= 0) { throw new IllegalArgumentException JavaDoc("timeout cannot be less than or equal to zero"); }
73
74     if (closed.attemptSet()) {
75       final Latch latch = new Latch();
76       closeImpl(createCloseCallback(latch));
77       try {
78         return latch.attempt(timeout);
79       } catch (InterruptedException JavaDoc e) {
80         logger.warn("close interrupted");
81         return isConnected();
82       }
83     }
84
85     return isClosed();
86   }
87
88   private Runnable JavaDoc createCloseCallback(final Latch latch) {
89     final boolean fireClose = isConnected();
90
91     return new Runnable JavaDoc() {
92       public void run() {
93         setConnected(false);
94         parent.connectionClosed(AbstractTCConnection.this);
95
96         if (fireClose) {
97           fireCloseEvent();
98         }
99
100         if (latch != null) latch.release();
101       }
102     };
103   }
104
105   public boolean isClosed() {
106     return closed.isSet();
107   }
108
109   public boolean isConnected() {
110     return connected.get();
111   }
112
113   public String JavaDoc toString() {
114     StringBuffer JavaDoc buf = new StringBuffer JavaDoc();
115
116     buf.append(getClass().getName()).append('@').append(hashCode()).append(":");
117
118     buf.append(" connected: ").append(isConnected());
119     buf.append(", closed: ").append(isClosed());
120
121     if (isSocketEndpoint.get()) {
122       buf.append(" local=");
123       if (localSocketAddress.isSet()) {
124         buf.append(((TCSocketAddress) localSocketAddress.get()).getStringForm());
125       } else {
126         buf.append("[unknown]");
127       }
128
129       buf.append(" remote=");
130       if (remoteSocketAddress.isSet()) {
131         buf.append(((TCSocketAddress) remoteSocketAddress.get()).getStringForm());
132       } else {
133         buf.append("[unknown");
134       }
135     }
136
137     buf.append(" connect=[");
138     final long connect = getConnectTime();
139
140     if (connect != NO_CONNECT_TIME) {
141       buf.append(new Date JavaDoc(connect));
142     } else {
143       buf.append("no connect time");
144     }
145     buf.append(']');
146
147     buf.append(" idle=").append(getIdleTime()).append("ms");
148
149     return buf.toString();
150   }
151
152   public final void addListener(TCConnectionEventListener listener) {
153     if (listener == null) { return; }
154     eventListeners.add(listener); // don't need sync
155
}
156
157   public final void removeListener(TCConnectionEventListener listener) {
158     if (listener == null) { return; }
159     eventListeners.remove(listener); // don't need sync
160
}
161
162   public final long getConnectTime() {
163     return connectTime.get();
164   }
165
166   public final long getIdleTime() {
167     return System.currentTimeMillis() - lastActivityTime.get();
168   }
169
170   public final synchronized void connect(TCSocketAddress addr, int timeout) throws IOException JavaDoc, TCTimeoutException {
171     if (closed.isSet() || connected.get()) { throw new IllegalStateException JavaDoc("Connection closed or already connected"); }
172     connectImpl(addr, timeout);
173     finishConnect();
174   }
175
176   public final synchronized boolean asynchConnect(TCSocketAddress addr) throws IOException JavaDoc {
177     if (closed.isSet() || connected.get()) { throw new IllegalStateException JavaDoc("Connection closed or already connected"); }
178
179     boolean rv = asynchConnectImpl(addr);
180
181     if (rv) {
182       finishConnect();
183     }
184
185     return rv;
186   }
187
188   public final void putMessage(TCNetworkMessage message) {
189     lastActivityTime.set(System.currentTimeMillis());
190
191     // if (!isConnected() || isClosed()) {
192
// logger.warn("Ignoring message sent to non-connected connection");
193
// return;
194
// }
195

196     putMessageImpl(message);
197   }
198
199   public final TCSocketAddress getLocalAddress() {
200     return (TCSocketAddress) localSocketAddress.get();
201   }
202
203   public final TCSocketAddress getRemoteAddress() {
204     return (TCSocketAddress) remoteSocketAddress.get();
205   }
206
207   abstract protected void closeImpl(Runnable JavaDoc callback);
208
209   abstract protected void putMessageImpl(TCNetworkMessage message);
210
211   abstract protected void connectImpl(TCSocketAddress addr, int timeout) throws IOException JavaDoc, TCTimeoutException;
212
213   abstract protected boolean asynchConnectImpl(TCSocketAddress addr) throws IOException JavaDoc;
214
215   protected void setConnected(boolean connected) {
216     if (connected) {
217       this.connectTime.set(System.currentTimeMillis());
218     }
219     this.connected.set(connected);
220   }
221
222   protected void recordSocketAddress(Socket JavaDoc socket) {
223     if (socket != null) {
224       isSocketEndpoint.set(true);
225       localSocketAddress.set(new TCSocketAddress(socket.getLocalAddress(), socket.getLocalPort()));
226       remoteSocketAddress.set(new TCSocketAddress(socket.getInetAddress(), socket.getPort()));
227     }
228   }
229
230   protected void addNetworkData(TCByteBuffer[] data, int length) {
231     lastActivityTime.set(System.currentTimeMillis());
232
233     try {
234       protocolAdaptor.addReadData(this, data, length);
235     } catch (Exception JavaDoc e) {
236       fireErrorEvent(e, null);
237       return;
238     }
239   }
240
241   protected TCByteBuffer[] getReadBuffers() {
242     // TODO: Hook in some form of read throttle. To throttle how much data is read from the network,
243
// only return a subset of the buffers that the protocolAdaptor advises to be used.
244

245     // TODO: should also support a way to de-register read interest temporarily
246

247     return protocolAdaptor.getReadBuffers();
248   }
249
250   protected void fireErrorEvent(String JavaDoc message) {
251     fireErrorEvent(new Exception JavaDoc(message), null);
252   }
253
254   protected void fireErrorEvent(String JavaDoc message, TCNetworkMessage context) {
255     fireErrorEvent(new Exception JavaDoc(message), context);
256   }
257
258   protected void fireErrorEvent(final Exception JavaDoc exception, final TCNetworkMessage context) {
259     final TCConnectionErrorEvent event = new TCConnectionErrorEvent() {
260       public Exception JavaDoc getException() {
261         return exception;
262       }
263
264       public TCConnection getSource() {
265         return AbstractTCConnection.this;
266       }
267
268       public TCNetworkMessage getMessageContext() {
269         return context;
270       }
271
272       public String JavaDoc toString() {
273         return AbstractTCConnection.this + ", exception: "
274                + ((exception != null) ? exception.toString() : "[null exception]") + ", message context: "
275                + ((context != null) ? context.toString() : "[no message context]");
276       }
277     };
278
279     fireEvent(ERROR, event);
280   }
281
282   protected void fireConnectEvent() {
283     fireEvent(CONNECT, staticEvent);
284   }
285
286   protected void fireEndOfFileEvent() {
287     fireEvent(EOF, staticEvent);
288   }
289
290   protected void fireCloseEvent() {
291     fireEvent(CLOSE, staticEvent);
292   }
293
294   protected void finishConnect() {
295     setConnected(true);
296     fireConnectEvent();
297   }
298
299   public final Socket JavaDoc detach() throws IOException JavaDoc {
300     this.parent.removeConnection(this);
301     return detachImpl();
302   }
303
304   protected abstract Socket JavaDoc detachImpl() throws IOException JavaDoc;
305
306   private void fireEvent(final int type, final TCConnectionEvent event) {
307     final SetOnceFlag flag = eventFlags[type];
308     Assert.assertNotNull("event flag for type " + type, flag);
309
310     if (flag.attemptSet()) {
311       for (Iterator JavaDoc iter = eventListeners.iterator(); iter.hasNext();) {
312         TCConnectionEventListener listener = (TCConnectionEventListener) iter.next();
313         Assert.assertNotNull("listener", listener);
314         try {
315           switch (type) {
316             case EOF: {
317               listener.endOfFileEvent(event);
318               break;
319             }
320             case CLOSE: {
321               listener.closeEvent(event);
322               break;
323             }
324             case ERROR: {
325               listener.errorEvent((TCConnectionErrorEvent) event);
326               break;
327             }
328             case CONNECT: {
329               listener.connectEvent(event);
330               break;
331             }
332             default: {
333               throw new InternalError JavaDoc("unknown event type " + type);
334             }
335           }
336         } catch (Exception JavaDoc e) {
337           logger.error("Unhandled exception in event handler", e);
338         }
339       }
340     }
341   }
342
343   public static final long NO_CONNECT_TIME = -1L;
344   protected static final TCLogger logger = TCLogging.getLogger(TCConnection.class);
345
346   private static final int CONNECT = 0;
347   private static final int EOF = 1;
348   private static final int ERROR = 2;
349   private static final int CLOSE = 3;
350
351   private final AbstractTCConnectionManager parent;
352   private final SetOnceFlag[] eventFlags = new SetOnceFlag[4];
353   private final SynchronizedLong lastActivityTime = new SynchronizedLong(System.currentTimeMillis());
354   private final SynchronizedLong connectTime = new SynchronizedLong(NO_CONNECT_TIME);
355   private final TCConnectionEvent staticEvent;
356   private final List eventListeners = new CopyOnWriteArrayList();
357   private final TCProtocolAdaptor protocolAdaptor;
358   private final SynchronizedBoolean isSocketEndpoint = new SynchronizedBoolean(false);
359   private final SetOnceFlag closed = new SetOnceFlag();
360   private final SynchronizedBoolean connected = new SynchronizedBoolean(false);
361   private final SetOnceRef localSocketAddress = new SetOnceRef();
362   private final SetOnceRef remoteSocketAddress = new SetOnceRef();
363
364 }
Popular Tags