1 package com.coldcore.coloradoftp.connection.impl; 2 3 import com.coldcore.coloradoftp.command.Reply; 4 import com.coldcore.coloradoftp.connection.ConnectionPool; 5 import com.coldcore.coloradoftp.connection.ControlConnection; 6 import com.coldcore.coloradoftp.connection.DataConnection; 7 import com.coldcore.coloradoftp.connection.DataPortListener; 8 import com.coldcore.coloradoftp.factory.ObjectFactory; 9 import com.coldcore.coloradoftp.factory.ObjectName; 10 import org.apache.log4j.Logger; 11 12 import java.io.IOException ; 13 import java.net.InetSocketAddress ; 14 import java.nio.channels.ServerSocketChannel ; 15 import java.nio.channels.SocketChannel ; 16 import java.util.HashMap ; 17 import java.util.Map ; 18 19 22 public class GenericDataPortListener implements DataPortListener, Runnable { 23 24 private static Logger log = Logger.getLogger(GenericDataPortListener.class); 25 protected int port; 26 protected boolean bound; 27 protected ServerSocketChannel ssc; 28 protected Map <String , ControlConnection> awaiting; 29 protected ConnectionPool dataConnectionPool; 30 protected Reply errorReply; 31 protected Thread thr; 32 protected long sleep; 33 34 35 public GenericDataPortListener() { 36 port = -1; 37 sleep = 100L; 38 awaiting = new HashMap <String ,ControlConnection>(); 39 } 40 41 42 protected Reply getErrorReply() { 43 if (errorReply == null) { 44 errorReply = (Reply) ObjectFactory.getObject(ObjectName.REPLY); 45 errorReply.setCode("425"); 46 errorReply.setText("Can't open data connection."); 47 } 48 return errorReply; 49 } 50 51 52 55 public long getSleep() { 56 return sleep; 57 } 58 59 60 63 public void setSleep(long sleep) { 64 this.sleep = sleep; 65 } 66 67 68 public void setPort(int port) { 69 this.port = port; 70 } 71 72 73 public int getPort() { 74 return port; 75 } 76 77 78 public synchronized void bind() throws IOException { 79 if (port < 1) throw new IllegalArgumentException ("Set correct port first"); 80 81 if (bound) { 82 log.warn("Listener on port "+port+" was bound when bind routine was submitted"); 83 throw new IllegalStateException ("Unbind the listener on port "+port+" first"); 84 } 85 86 dataConnectionPool = (ConnectionPool) ObjectFactory.getObject(ObjectName.DATA_CONNECTION_POOL); 88 89 ssc = ServerSocketChannel.open(); 91 ssc.socket().bind(new InetSocketAddress (port)); 92 93 thr = new Thread (this); 95 thr.start(); 96 97 bound = true; 98 log.debug("Listener is bound to port "+port); 99 } 100 101 102 public synchronized void unbind() throws IOException { 103 if (!bound) { 104 log.warn("Listener on port "+port+" was not bound when unbind routine was submitted"); 105 throw new IllegalStateException ("Cannot unbind the listener on port "+port+", it is not bound"); 106 } 107 108 synchronized (awaiting) { 110 for (ControlConnection connection : awaiting.values()) 111 removeConnection(connection); 112 } 113 114 if (ssc.isOpen()) ssc.close(); 116 117 bound = false; 118 log.debug("Listener on port "+port+" is unbound"); 119 } 120 121 122 public boolean isBound() { 123 return bound; 124 } 125 126 127 public boolean addConnection(ControlConnection connection) { 128 if (!bound) return false; 129 130 cleanup(); 132 133 String ip = connection.getSocketChannel().socket().getInetAddress().getHostAddress(); 135 synchronized (awaiting) { 136 ControlConnection con = awaiting.get(ip); 137 if (con != null && con != connection) return false; 138 awaiting.put(ip, connection); 139 return true; 140 } 141 } 142 143 144 public boolean removeConnection(ControlConnection connection) { 145 String ip = connection.getSocketChannel().socket().getInetAddress().getHostAddress(); 146 synchronized (awaiting) { 147 ControlConnection c = awaiting.get(ip); 148 if (c == connection) { 149 awaiting.remove(ip); 150 c.reply(getErrorReply()); 151 return true; 152 } 153 return false; 154 } 155 } 156 157 158 public void run() { 159 while (bound) { 160 161 ControlConnection controlConnection = null; 162 DataConnection dataConnection = null; 163 SocketChannel sc = null; 164 try { 165 sc = ssc.accept(); String ip = sc.socket().getInetAddress().getHostAddress(); 167 log.debug("New incoming data connection (IP "+ip+")"); 168 169 dataConnection = (DataConnection) ObjectFactory.getObject(ObjectName.DATA_CONNECTION); 171 dataConnection.initialize(sc); 172 173 controlConnection = popControlConnection(dataConnection); 175 if (controlConnection == null) { 176 log.warn("No control connection found for an incoming data connection"); 177 dataConnection.destroyNoReply(); 178 } else { 179 180 DataConnection existing = controlConnection.getDataConnection(); 182 if (existing != null && !existing.isDestroyed()) { 183 log.warn("BUG: Replacing existing data connection with a new one!"); 184 existing.destroyNoReply(); 185 } 186 187 controlConnection.setDataConnection(dataConnection); 189 dataConnection.setControlConnection(controlConnection); 190 configure(dataConnection); 191 dataConnectionPool.add(dataConnection); 192 log.debug("New data connection is ready"); 193 } 194 195 Thread.sleep(sleep); 196 197 } catch (Throwable e) { 198 log.warn("Failed to accept a connection (ignoring)", e); 199 try { 200 dataConnection.destroyNoReply(); 201 } catch (Throwable ex) {} 202 try { 203 sc.close(); 204 } catch (Throwable ex) { 205 log.error("Cannot close the channel (ignoring)", e); 206 } 207 208 if (controlConnection != null) controlConnection.reply(getErrorReply()); 210 } 211 212 } 213 log.debug("Data port listener thread finished"); 214 } 215 216 217 218 protected void cleanup() { 219 synchronized (awaiting) { 220 for (String ip : awaiting.keySet()) { 221 ControlConnection connection = awaiting.get(ip); 222 if (connection.isDestroyed()) awaiting.remove(ip); 223 } 224 } 225 } 226 227 228 232 protected ControlConnection popControlConnection(DataConnection dataConnection) { 233 String dip = dataConnection.getSocketChannel().socket().getInetAddress().getHostAddress(); 234 synchronized (awaiting) { 235 for (String ip : awaiting.keySet()) { 236 if (ip.equals(dip)) { 237 ControlConnection controlConnection = awaiting.remove(ip); 238 return controlConnection.isDestroyed() ? null : controlConnection; 239 } 240 } 241 } 242 return null; 243 } 244 245 246 249 public void configure(DataConnection connection) { 250 } 251 } 252 | Popular Tags |