1 4 package com.tc.net.core; 5 6 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 7 import EDU.oswego.cs.dl.util.concurrent.Latch; 8 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 9 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; 10 11 import com.tc.bytes.TCByteBuffer; 12 import com.tc.logging.TCLogger; 13 import com.tc.logging.TCLogging; 14 import com.tc.net.TCSocketAddress; 15 import com.tc.net.core.event.TCConnectionErrorEvent; 16 import com.tc.net.core.event.TCConnectionEvent; 17 import com.tc.net.core.event.TCConnectionEventListener; 18 import com.tc.net.protocol.TCNetworkMessage; 19 import com.tc.net.protocol.TCProtocolAdaptor; 20 import com.tc.util.Assert; 21 import com.tc.util.TCTimeoutException; 22 import com.tc.util.concurrent.SetOnceFlag; 23 import com.tc.util.concurrent.SetOnceRef; 24 25 import java.io.IOException ; 26 import java.net.Socket ; 27 import java.util.Date ; 28 import java.util.Iterator ; 29 import java.util.List ; 30 31 36 abstract class AbstractTCConnection implements TCConnection { 37 38 AbstractTCConnection(TCConnectionEventListener listener, TCProtocolAdaptor adaptor, AbstractTCConnectionManager parent) { 39 Assert.assertNotNull(parent); 40 Assert.assertNotNull(adaptor); 41 42 this.parent = parent; 43 this.protocolAdaptor = adaptor; 44 45 if (listener != null) addListener(listener); 46 47 staticEvent = new TCConnectionEvent() { 48 public TCConnection getSource() { 49 return AbstractTCConnection.this; 50 } 51 52 public String toString() { 53 return AbstractTCConnection.this.toString(); 54 } 55 }; 56 57 eventFlags[CONNECT] = new SetOnceFlag(); 58 eventFlags[EOF] = new SetOnceFlag(); 59 eventFlags[ERROR] = new SetOnceFlag(); 60 eventFlags[CLOSE] = new SetOnceFlag(); 61 62 Assert.assertNoNullElements(eventFlags); 63 } 64 65 public final void asynchClose() { 66 if (closed.attemptSet()) { 67 closeImpl(createCloseCallback(null)); 68 } 69 } 70 71 public final boolean close(final long timeout) { 72 if (timeout <= 0) { throw new IllegalArgumentException ("timeout cannot be less than or equal to zero"); } 73 74 if (closed.attemptSet()) { 75 final Latch latch = new Latch(); 76 closeImpl(createCloseCallback(latch)); 77 try { 78 return latch.attempt(timeout); 79 } catch (InterruptedException e) { 80 logger.warn("close interrupted"); 81 return isConnected(); 82 } 83 } 84 85 return isClosed(); 86 } 87 88 private Runnable createCloseCallback(final Latch latch) { 89 final boolean fireClose = isConnected(); 90 91 return new Runnable () { 92 public void run() { 93 setConnected(false); 94 parent.connectionClosed(AbstractTCConnection.this); 95 96 if (fireClose) { 97 fireCloseEvent(); 98 } 99 100 if (latch != null) latch.release(); 101 } 102 }; 103 } 104 105 public boolean isClosed() { 106 return closed.isSet(); 107 } 108 109 public boolean isConnected() { 110 return connected.get(); 111 } 112 113 public String toString() { 114 StringBuffer buf = new StringBuffer (); 115 116 buf.append(getClass().getName()).append('@').append(hashCode()).append(":"); 117 118 buf.append(" connected: ").append(isConnected()); 119 buf.append(", closed: ").append(isClosed()); 120 121 if (isSocketEndpoint.get()) { 122 buf.append(" local="); 123 if (localSocketAddress.isSet()) { 124 buf.append(((TCSocketAddress) localSocketAddress.get()).getStringForm()); 125 } else { 126 buf.append("[unknown]"); 127 } 128 129 buf.append(" remote="); 130 if (remoteSocketAddress.isSet()) { 131 buf.append(((TCSocketAddress) remoteSocketAddress.get()).getStringForm()); 132 } else { 133 buf.append("[unknown"); 134 } 135 } 136 137 buf.append(" connect=["); 138 final long connect = getConnectTime(); 139 140 if (connect != NO_CONNECT_TIME) { 141 buf.append(new Date (connect)); 142 } else { 143 buf.append("no connect time"); 144 } 145 buf.append(']'); 146 147 buf.append(" idle=").append(getIdleTime()).append("ms"); 148 149 return buf.toString(); 150 } 151 152 public final void addListener(TCConnectionEventListener listener) { 153 if (listener == null) { return; } 154 eventListeners.add(listener); } 156 157 public final void removeListener(TCConnectionEventListener listener) { 158 if (listener == null) { return; } 159 eventListeners.remove(listener); } 161 162 public final long getConnectTime() { 163 return connectTime.get(); 164 } 165 166 public final long getIdleTime() { 167 return System.currentTimeMillis() - lastActivityTime.get(); 168 } 169 170 public final synchronized void connect(TCSocketAddress addr, int timeout) throws IOException , TCTimeoutException { 171 if (closed.isSet() || connected.get()) { throw new IllegalStateException ("Connection closed or already connected"); } 172 connectImpl(addr, timeout); 173 finishConnect(); 174 } 175 176 public final synchronized boolean asynchConnect(TCSocketAddress addr) throws IOException { 177 if (closed.isSet() || connected.get()) { throw new IllegalStateException ("Connection closed or already connected"); } 178 179 boolean rv = asynchConnectImpl(addr); 180 181 if (rv) { 182 finishConnect(); 183 } 184 185 return rv; 186 } 187 188 public final void putMessage(TCNetworkMessage message) { 189 lastActivityTime.set(System.currentTimeMillis()); 190 191 196 putMessageImpl(message); 197 } 198 199 public final TCSocketAddress getLocalAddress() { 200 return (TCSocketAddress) localSocketAddress.get(); 201 } 202 203 public final TCSocketAddress getRemoteAddress() { 204 return (TCSocketAddress) remoteSocketAddress.get(); 205 } 206 207 abstract protected void closeImpl(Runnable callback); 208 209 abstract protected void putMessageImpl(TCNetworkMessage message); 210 211 abstract protected void connectImpl(TCSocketAddress addr, int timeout) throws IOException , TCTimeoutException; 212 213 abstract protected boolean asynchConnectImpl(TCSocketAddress addr) throws IOException ; 214 215 protected void setConnected(boolean connected) { 216 if (connected) { 217 this.connectTime.set(System.currentTimeMillis()); 218 } 219 this.connected.set(connected); 220 } 221 222 protected void recordSocketAddress(Socket socket) { 223 if (socket != null) { 224 isSocketEndpoint.set(true); 225 localSocketAddress.set(new TCSocketAddress(socket.getLocalAddress(), socket.getLocalPort())); 226 remoteSocketAddress.set(new TCSocketAddress(socket.getInetAddress(), socket.getPort())); 227 } 228 } 229 230 protected void addNetworkData(TCByteBuffer[] data, int length) { 231 lastActivityTime.set(System.currentTimeMillis()); 232 233 try { 234 protocolAdaptor.addReadData(this, data, length); 235 } catch (Exception e) { 236 fireErrorEvent(e, null); 237 return; 238 } 239 } 240 241 protected TCByteBuffer[] getReadBuffers() { 242 245 247 return protocolAdaptor.getReadBuffers(); 248 } 249 250 protected void fireErrorEvent(String message) { 251 fireErrorEvent(new Exception (message), null); 252 } 253 254 protected void fireErrorEvent(String message, TCNetworkMessage context) { 255 fireErrorEvent(new Exception (message), context); 256 } 257 258 protected void fireErrorEvent(final Exception exception, final TCNetworkMessage context) { 259 final TCConnectionErrorEvent event = new TCConnectionErrorEvent() { 260 public Exception getException() { 261 return exception; 262 } 263 264 public TCConnection getSource() { 265 return AbstractTCConnection.this; 266 } 267 268 public TCNetworkMessage getMessageContext() { 269 return context; 270 } 271 272 public String toString() { 273 return AbstractTCConnection.this + ", exception: " 274 + ((exception != null) ? exception.toString() : "[null exception]") + ", message context: " 275 + ((context != null) ? context.toString() : "[no message context]"); 276 } 277 }; 278 279 fireEvent(ERROR, event); 280 } 281 282 protected void fireConnectEvent() { 283 fireEvent(CONNECT, staticEvent); 284 } 285 286 protected void fireEndOfFileEvent() { 287 fireEvent(EOF, staticEvent); 288 } 289 290 protected void fireCloseEvent() { 291 fireEvent(CLOSE, staticEvent); 292 } 293 294 protected void finishConnect() { 295 setConnected(true); 296 fireConnectEvent(); 297 } 298 299 public final Socket detach() throws IOException { 300 this.parent.removeConnection(this); 301 return detachImpl(); 302 } 303 304 protected abstract Socket detachImpl() throws IOException ; 305 306 private void fireEvent(final int type, final TCConnectionEvent event) { 307 final SetOnceFlag flag = eventFlags[type]; 308 Assert.assertNotNull("event flag for type " + type, flag); 309 310 if (flag.attemptSet()) { 311 for (Iterator iter = eventListeners.iterator(); iter.hasNext();) { 312 TCConnectionEventListener listener = (TCConnectionEventListener) iter.next(); 313 Assert.assertNotNull("listener", listener); 314 try { 315 switch (type) { 316 case EOF: { 317 listener.endOfFileEvent(event); 318 break; 319 } 320 case CLOSE: { 321 listener.closeEvent(event); 322 break; 323 } 324 case ERROR: { 325 listener.errorEvent((TCConnectionErrorEvent) event); 326 break; 327 } 328 case CONNECT: { 329 listener.connectEvent(event); 330 break; 331 } 332 default: { 333 throw new InternalError ("unknown event type " + type); 334 } 335 } 336 } catch (Exception e) { 337 logger.error("Unhandled exception in event handler", e); 338 } 339 } 340 } 341 } 342 343 public static final long NO_CONNECT_TIME = -1L; 344 protected static final TCLogger logger = TCLogging.getLogger(TCConnection.class); 345 346 private static final int CONNECT = 0; 347 private static final int EOF = 1; 348 private static final int ERROR = 2; 349 private static final int CLOSE = 3; 350 351 private final AbstractTCConnectionManager parent; 352 private final SetOnceFlag[] eventFlags = new SetOnceFlag[4]; 353 private final SynchronizedLong lastActivityTime = new SynchronizedLong(System.currentTimeMillis()); 354 private final SynchronizedLong connectTime = new SynchronizedLong(NO_CONNECT_TIME); 355 private final TCConnectionEvent staticEvent; 356 private final List eventListeners = new CopyOnWriteArrayList(); 357 private final TCProtocolAdaptor protocolAdaptor; 358 private final SynchronizedBoolean isSocketEndpoint = new SynchronizedBoolean(false); 359 private final SetOnceFlag closed = new SetOnceFlag(); 360 private final SynchronizedBoolean connected = new SynchronizedBoolean(false); 361 private final SetOnceRef localSocketAddress = new SetOnceRef(); 362 private final SetOnceRef remoteSocketAddress = new SetOnceRef(); 363 364 } | Popular Tags |