1 4 package com.tc.net.core; 5 6 import com.tc.logging.TCLogger; 7 import com.tc.logging.TCLogging; 8 import com.tc.net.TCSocketAddress; 9 import com.tc.net.core.event.TCConnectionErrorEvent; 10 import com.tc.net.core.event.TCConnectionEvent; 11 import com.tc.net.core.event.TCConnectionEventListener; 12 import com.tc.net.core.event.TCListenerEvent; 13 import com.tc.net.core.event.TCListenerEventListener; 14 import com.tc.net.protocol.ProtocolAdaptorFactory; 15 import com.tc.net.protocol.TCProtocolAdaptor; 16 import com.tc.util.concurrent.SetOnceFlag; 17 18 import java.io.IOException ; 19 import java.util.HashSet ; 20 import java.util.Set ; 21 22 27 abstract class AbstractTCConnectionManager implements TCConnectionManager { 28 29 AbstractTCConnectionManager(TCComm comm) { 30 privateComm = (comm == null); 31 32 if (privateComm) { 33 this.comm = (AbstractTCComm) new TCCommFactory().getInstance(true); 34 } else { 35 this.comm = (AbstractTCComm) comm; 36 } 37 38 this.connEvents = new ConnectionEvents(); 39 this.listenerEvents = new ListenerEvents(); 40 } 41 42 public TCConnection[] getAllConnections() { 43 synchronized (connections) { 44 return (TCConnection[]) connections.toArray(EMPTY_CONNECTION_ARRAY); 45 } 46 } 47 48 public TCListener[] getAllListeners() { 49 synchronized (listeners) { 50 return (TCListener[]) listeners.toArray(EMPTY_LISTENER_ARRAY); 51 } 52 } 53 54 public final synchronized TCListener createListener(TCSocketAddress addr, ProtocolAdaptorFactory factory) 55 throws IOException { 56 return createListener(addr, factory, Constants.DEFAULT_ACCEPT_QUEUE_DEPTH, true); 57 } 58 59 public final synchronized TCListener createListener(TCSocketAddress addr, ProtocolAdaptorFactory factory, 60 int backlog, boolean reuseAddr) throws IOException { 61 checkShutdown(); 62 63 TCListener rv = createListenerImpl(addr, factory, backlog, reuseAddr); 64 rv.addEventListener(listenerEvents); 65 rv.addEventListener(comm); 66 comm.listenerAdded(rv); 67 68 synchronized (listeners) { 69 listeners.add(rv); 70 } 71 72 return rv; 73 } 74 75 public final synchronized TCConnection createConnection(TCProtocolAdaptor adaptor) { 76 checkShutdown(); 77 78 TCConnection rv = createConnectionImpl(adaptor, connEvents); 79 newConnection(rv); 80 81 return rv; 82 } 83 84 public synchronized void closeAllConnections(long timeout) { 85 closeAllConnections(false, timeout); 86 } 87 88 public synchronized void asynchCloseAllConnections() { 89 closeAllConnections(true, 0); 90 } 91 92 private void closeAllConnections(boolean async, long timeout) { 93 TCConnection[] conns; 94 95 synchronized (connections) { 96 conns = (TCConnection[]) connections.toArray(EMPTY_CONNECTION_ARRAY); 97 } 98 99 for (int i = 0; i < conns.length; i++) { 100 TCConnection conn = conns[i]; 101 102 try { 103 if (async) { 104 conn.asynchClose(); 105 } else { 106 conn.close(timeout); 107 } 108 } catch (Exception e) { 109 logger.error("Exception trying to close " + conn, e); 110 } 111 } 112 } 113 114 public synchronized void closeAllListeners() { 115 TCListener[] list; 116 117 synchronized (listeners) { 118 list = (TCListener[]) listeners.toArray(EMPTY_LISTENER_ARRAY); 119 } 120 121 for (int i = 0; i < list.length; i++) { 122 TCListener lsnr = list[i]; 123 124 try { 125 lsnr.stop(); 126 } catch (Exception e) { 127 logger.error("Exception trying to close " + lsnr, e); 128 } 129 } 130 } 131 132 public final synchronized void shutdown() { 133 if (shutdown.attemptSet()) { 134 closeAllListeners(); 135 asynchCloseAllConnections(); 136 137 if (privateComm) { 138 comm.stop(); 139 } 140 } 141 } 142 143 void connectionClosed(TCConnection conn) { 144 synchronized (connections) { 145 connections.remove(conn); 146 } 147 } 148 149 void newConnection(TCConnection conn) { 150 synchronized (connections) { 151 connections.add(conn); 152 } 153 } 154 155 void removeConnection(AbstractTCConnection connection) { 156 synchronized (connections) { 157 connections.remove(connection); 158 } 159 } 160 161 protected TCConnectionEventListener getConnectionListener() { 162 return connEvents; 163 } 164 165 protected abstract TCListener createListenerImpl(TCSocketAddress addr, ProtocolAdaptorFactory factory, int backlog, 166 boolean reuseAddr) throws IOException ; 167 168 protected abstract TCConnection createConnectionImpl(TCProtocolAdaptor adaptor, TCConnectionEventListener listener); 169 170 private final void checkShutdown() { 171 if (shutdown.isSet()) { throw new IllegalStateException ("connection manager shutdown"); } 172 } 173 174 private class ConnectionEvents implements TCConnectionEventListener { 175 public final void connectEvent(TCConnectionEvent event) { 176 if (logger.isDebugEnabled()) { 177 logger.debug("connect event: " + event.toString()); 178 } 179 } 180 181 public final void closeEvent(TCConnectionEvent event) { 182 if (logger.isDebugEnabled()) { 183 logger.debug("close event: " + event.toString()); 184 } 185 } 186 187 public final void errorEvent(TCConnectionErrorEvent event) { 188 try { 189 final Throwable err = event.getException(); 190 191 if (err != null) { 192 if (err instanceof IOException ) { 193 if (logger.isInfoEnabled()) { 194 logger.info("error event on connection " + event.getSource() + ": " + err.getMessage()); 195 } 196 } else { 197 logger.error(err); 198 } 199 } 200 } finally { 201 event.getSource().asynchClose(); 202 } 203 } 204 205 public final void endOfFileEvent(TCConnectionEvent event) { 206 if (logger.isDebugEnabled()) { 207 logger.debug("EOF event: " + event.toString()); 208 } 209 210 event.getSource().asynchClose(); 211 } 212 } 213 214 private class ListenerEvents implements TCListenerEventListener { 215 public void closeEvent(TCListenerEvent event) { 216 synchronized (listeners) { 217 listeners.remove(event.getSource()); 218 } 219 } 220 } 221 222 protected static final TCConnection[] EMPTY_CONNECTION_ARRAY = new TCConnection[] {}; 223 protected static final TCListener[] EMPTY_LISTENER_ARRAY = new TCListener[] {}; 224 225 protected static final TCLogger logger = TCLogging.getLogger(TCConnectionManager.class); 226 227 protected final AbstractTCComm comm; 228 229 private final Set connections = new HashSet (); 230 private final Set listeners = new HashSet (); 231 private final SetOnceFlag shutdown = new SetOnceFlag(); 232 private final boolean privateComm; 233 private final ConnectionEvents connEvents; 234 private final ListenerEvents listenerEvents; 235 236 } | Popular Tags |