1 package com.coldcore.coloradoftp.connection.impl; 2 3 import com.coldcore.coloradoftp.command.Reply; 4 import com.coldcore.coloradoftp.connection.ConnectionPool; 5 import com.coldcore.coloradoftp.connection.ControlConnection; 6 import com.coldcore.coloradoftp.connection.DataConnection; 7 import com.coldcore.coloradoftp.connection.DataConnectionInitiator; 8 import com.coldcore.coloradoftp.factory.ObjectFactory; 9 import com.coldcore.coloradoftp.factory.ObjectName; 10 import com.coldcore.coloradoftp.session.Session; 11 import com.coldcore.coloradoftp.session.SessionAttributeName; 12 import org.apache.log4j.Logger; 13 14 import java.net.InetSocketAddress ; 15 import java.nio.channels.SocketChannel ; 16 17 20 public class GenericDataConnectionInitiator implements DataConnectionInitiator, Runnable { 21 22 private static Logger log = Logger.getLogger(GenericDataConnectionInitiator.class); 23 protected String ip; 24 protected int port; 25 protected boolean active; 26 protected ControlConnection controlConnection; 27 protected ConnectionPool dataConnectionPool; 28 protected SocketChannel sc; 29 protected Reply errorReply; 30 protected Thread thr; 31 protected long sleep; 32 protected boolean aborted; 33 34 35 public GenericDataConnectionInitiator() { 36 sleep = 100L; 37 } 38 39 40 protected Reply getErrorReply() { 41 if (errorReply == null) { 42 errorReply = (Reply) ObjectFactory.getObject(ObjectName.REPLY); 43 errorReply.setCode("425"); 44 errorReply.setText("Can't open data connection."); 45 } 46 return errorReply; 47 } 48 49 50 53 public long getSleep() { 54 return sleep; 55 } 56 57 58 61 public void setSleep(long sleep) { 62 this.sleep = sleep; 63 } 64 65 66 69 protected boolean isReply150() { 70 Session session = controlConnection.getSession(); 71 Long bytesWrote = (Long ) session.getAttribute(SessionAttributeName.BYTE_MARKER_150_REPLY); 72 if (bytesWrote == null || controlConnection.getBytesWrote() == bytesWrote || controlConnection.getOutgoingBufferSize() != 0) return false; 73 log.debug("User got a 150 reply"); 74 return true; 75 } 76 77 78 public String getIp() { 79 return ip; 80 } 81 82 83 public void setIp(String ip) { 84 this.ip = ip; 85 } 86 87 88 public int getPort() { 89 return port; 90 } 91 92 93 public void setPort(int port) { 94 this.port = port; 95 } 96 97 98 public void run() { 99 while (active) { 100 101 DataConnection dataConnection = null; 102 try { 103 104 107 if (!isReply150()) { 108 Thread.sleep(sleep); 109 continue; 110 } 111 112 dataConnectionPool = (ConnectionPool) ObjectFactory.getObject(ObjectName.DATA_CONNECTION_POOL); 114 115 sc = SocketChannel.open(); 117 sc.connect(new InetSocketAddress (ip, port)); if (!sc.finishConnect()) throw new RuntimeException ("Failed finishConnect"); 119 String ip = sc.socket().getInetAddress().getHostAddress(); 120 log.debug("New data connection established (IP "+ip+")"); 121 122 dataConnection = (DataConnection) ObjectFactory.getObject(ObjectName.DATA_CONNECTION); 124 dataConnection.initialize(sc); 125 126 DataConnection existing = controlConnection.getDataConnection(); 128 if (existing != null && !existing.isDestroyed()) { 129 log.warn("BUG: Replacing existing data connection with a new one!"); 130 existing.destroyNoReply(); 131 } 132 133 controlConnection.setDataConnection(dataConnection); 135 dataConnection.setControlConnection(controlConnection); 136 configure(dataConnection); 137 dataConnectionPool.add(dataConnection); 138 log.debug("New data connection is ready"); 139 140 active = false; 141 142 } catch (Throwable e) { 143 144 if (!aborted) { 146 log.warn("Failed to establish a connection with "+ip+":"+port+" (ignoring)", e); 147 try { 148 dataConnection.destroyNoReply(); 149 } catch (Throwable ex) {} 150 try { 151 sc.close(); 152 } catch (Throwable ex) { 153 log.error("Cannot close the channel (ignoring)", e); 154 } 155 156 controlConnection.reply(getErrorReply()); 157 } 158 159 active = false; 160 } 161 162 } 163 log.debug("Data connection initiator thread finished"); 164 } 165 166 167 public boolean isActive() { 168 return active; 169 } 170 171 172 public synchronized void activate() { 173 if (active) { 174 log.warn("Data connection initiator was active when activate routine was called"); 175 return; 176 } 177 178 active = true; 179 aborted = false; 180 181 thr = new Thread (this); 183 thr.start(); 184 } 185 186 187 public synchronized void abort() { 188 aborted = true; 189 if (!active) return; 190 191 try { 193 if (sc != null && sc.isOpen()) sc.close(); 194 } catch (Throwable e) { 195 log.error("Cannot close channel (ignoring)", e); 196 } 197 198 controlConnection.reply(getErrorReply()); 199 200 Session session = controlConnection.getSession(); 202 session.removeAttribute(SessionAttributeName.BYTE_MARKER_150_REPLY); 203 204 active = false; 205 } 206 207 208 public ControlConnection getControlConnection() { 209 return controlConnection; 210 } 211 212 213 public void setControlConnection(ControlConnection controlConnection) { 214 this.controlConnection = controlConnection; 215 } 216 217 218 221 public void configure(DataConnection connection) { 222 } 223 } 224 | Popular Tags |