1 4 package com.tc.net.protocol.transport; 5 6 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 7 8 import com.tc.logging.TCLogger; 9 import com.tc.net.MaxConnectionsExceededException; 10 import com.tc.net.TCSocketAddress; 11 import com.tc.net.core.ConnectionAddressProvider; 12 import com.tc.net.core.TCConnection; 13 import com.tc.net.core.TCConnectionManager; 14 import com.tc.util.Assert; 15 import com.tc.util.TCTimeoutException; 16 import com.tc.util.concurrent.NoExceptionLinkedQueue; 17 18 import java.io.IOException ; 19 import java.net.InetAddress ; 20 import java.net.UnknownHostException ; 21 22 25 class ClientConnectionEstablisher implements Runnable , MessageTransportListener { 26 27 private static final long CONNECT_RETRY_INTERVAL = 1000; 28 29 private static final Object RECONNECT = new Object (); 30 private static final Object QUIT = new Object (); 31 32 private final ClientMessageTransport transport; 33 private final String desc; 34 private final TCLogger logger; 35 private final int maxReconnectTries; 36 private final int timeout; 37 private final ConnectionAddressProvider connAddressProvider; 38 private final TCConnectionManager connManager; 39 40 private final SynchronizedBoolean connecting = new SynchronizedBoolean(false); 41 42 private Thread connectionEstablisher; 43 44 private NoExceptionLinkedQueue reconnectRequest = new NoExceptionLinkedQueue(); 45 46 ClientConnectionEstablisher(ClientMessageTransport transport, TCConnectionManager connManager, 47 ConnectionAddressProvider connAddressProvider, TCLogger logger, int maxReconnectTries, 48 int timeout) { 49 this.transport = transport; 50 this.connManager = connManager; 51 this.logger = logger; 52 this.connAddressProvider = connAddressProvider; 53 this.maxReconnectTries = maxReconnectTries; 54 this.timeout = timeout; 55 56 if (maxReconnectTries == 0) desc = "none"; 57 else if (maxReconnectTries < 0) desc = "unlimited"; 58 else desc = "" + maxReconnectTries; 59 60 this.transport.addTransportListener(this); 61 } 62 63 71 public void open() throws TCTimeoutException, IOException { 72 synchronized (connecting) { 73 Assert.eval("Can't call open() concurrently", !connecting.get()); 74 connecting.set(true); 75 76 try { 77 connectTryAllOnce(); 78 } finally { 79 connecting.set(false); 80 } 81 } 82 } 83 84 private void connectTryAllOnce() throws TCTimeoutException, IOException { 85 connAddressProvider.setPolicy(ConnectionAddressProvider.LINEAR); 86 do { 87 try { 88 connect(); 89 return; 90 } catch (TCTimeoutException e) { 91 if (!connAddressProvider.hasNext()) { throw e; } 92 } catch (IOException e) { 93 if (!connAddressProvider.hasNext()) { throw e; } 94 } 95 } while (connAddressProvider.hasNext() && (connAddressProvider.next() != null)); 96 97 } 98 99 106 void connect() throws TCTimeoutException, IOException { 107 108 TCConnection connection = this.connManager.createConnection(transport.getProtocolAdapter()); 109 transport.wireNewConnection(connection); 110 transport.fireTransportConnectAttemptEvent(); 111 TCSocketAddress address = createSocketAddress(); 112 connection.connect(address, timeout); 113 } 114 115 void disconnect() { 116 transport.close(); 117 } 118 119 private TCSocketAddress createSocketAddress() throws UnknownHostException { 120 TCSocketAddress socketAddress = new TCSocketAddress(InetAddress.getByName(connAddressProvider.getHostname()), 121 connAddressProvider.getPortNumber()); 122 return socketAddress; 123 } 124 125 public String toString() { 126 return "ClientConnectionEstablisher[" + connAddressProvider + ", timeout=" + timeout + "]"; 127 } 128 129 public void reconnect() throws MaxConnectionsExceededException { 130 131 try { 132 boolean connected = false; 133 connAddressProvider.setPolicy(ConnectionAddressProvider.ROUND_ROBIN); 134 for (int i = 0; ((maxReconnectTries < 0) || (i < maxReconnectTries)) && !connected; i++) { 135 for (int j = 0; j < connAddressProvider.getCount() && !connected; j++, connAddressProvider.next()) { 136 138 try { 139 if (i % 20 == 0) { 140 logger.warn("Reconnect attempt " + i + " of " + desc + " reconnect tries to " 141 + connAddressProvider.getConnectionInfo() + ", timeout=" + timeout); 142 } 143 connect(); 144 transport.reconnect(); 145 connected = true; 146 } catch (MaxConnectionsExceededException e) { 147 throw e; 148 } catch (TCTimeoutException e) { 149 handleConnectException(e, false); 150 } catch (IOException e) { 151 handleConnectException(e, false); 152 } catch (Exception e) { 153 handleConnectException(e, true); 154 } 155 } 156 } 157 158 transport.endIfDisconnected(); 159 160 } finally { 161 connecting.set(false); 162 } 163 } 164 165 private void handleConnectException(Exception e, boolean logFullException) { 166 if (logger.isDebugEnabled() || logFullException) { 167 logger.error("Connect Exception", e); 168 } else { 169 logger.warn(e.getMessage()); 170 } 171 try { 172 Thread.sleep(CONNECT_RETRY_INTERVAL); 173 } catch (InterruptedException e1) { 174 } 176 } 177 178 public void notifyTransportConnected(MessageTransport mt) { 179 } 181 182 public void notifyTransportDisconnected(MessageTransport mt) { 183 synchronized (connecting) { 184 if (connecting.get()) return; 185 186 if (connectionEstablisher == null) { 187 connecting.set(true); 188 connectionEstablisher = new Thread (this, "ConnectionEstablisher"); 190 connectionEstablisher.setDaemon(true); 191 connectionEstablisher.start(); 192 193 } 194 reconnectRequest.put(RECONNECT); 195 } 196 } 197 198 public void notifyTransportConnectAttempt(MessageTransport mt) { 199 } 201 202 public void notifyTransportClosed(MessageTransport mt) { 203 reconnectRequest.put(QUIT); 204 } 205 206 public void run() { 207 Object request = null; 208 while ((request = reconnectRequest.take()) != null) { 209 if (request == RECONNECT) { 210 try { 211 reconnect(); 212 } catch (MaxConnectionsExceededException e) { 213 logger.warn(e); 214 logger.warn("No longer trying to reconnect."); 215 return; 216 } catch (Throwable t) { 217 logger.warn("Reconnect failed !", t); 218 } 219 } else if (request == QUIT) { 220 connectionEstablisher = null; 221 break; 222 } 223 } 224 } 225 226 } 227 | Popular Tags |