1 package com.coldcore.coloradoftp.connection.impl; 2 3 import com.coldcore.coloradoftp.command.Command; 4 import com.coldcore.coloradoftp.command.CommandFactory; 5 import com.coldcore.coloradoftp.command.CommandProcessor; 6 import com.coldcore.coloradoftp.command.Reply; 7 import com.coldcore.coloradoftp.connection.*; 8 import com.coldcore.coloradoftp.factory.ObjectFactory; 9 import com.coldcore.coloradoftp.factory.ObjectName; 10 import com.coldcore.coloradoftp.session.Session; 11 import com.coldcore.coloradoftp.session.SessionAttributeName; 12 import org.apache.log4j.Logger; 13 14 import java.io.ByteArrayOutputStream ; 15 import java.nio.ByteBuffer ; 16 import java.nio.channels.SocketChannel ; 17 import java.nio.channels.Channel ; 18 19 27 public class GenericControlConnection extends GenericConnection implements ControlConnection { 28 29 private static Logger log = Logger.getLogger(GenericControlConnection.class); 30 protected ByteArrayOutputStream warray; 31 protected ByteArrayOutputStream rarray; 32 protected boolean rarrayComplete; 33 protected StringBuffer incomingBuffer; 34 protected StringBuffer outgoingBuffer; 35 protected boolean interruptState; 36 protected CommandProcessor commandProcessor; 37 protected CommandFactory commandFactory; 38 protected Session session; 39 protected DataConnection dataConnection; 40 protected DataConnectionInitiator dataConnectionInitiator; 41 protected boolean utf8; 42 43 public static final String CHARSET_UTF8 = "UTF-8"; 44 public static final String CHARSET_ASCII = "US-ASCII"; 45 46 public static final char UTF8_MAGIC_NUMBER = (char)65279; 47 48 49 public GenericControlConnection(int bufferSize) { 50 super(); 51 52 utf8 = true; 53 54 incomingBuffer = new StringBuffer (); 55 outgoingBuffer = new StringBuffer (); 56 57 rbuffer = ByteBuffer.allocate(bufferSize); 58 rbuffer.flip(); 59 60 wbuffer = ByteBuffer.allocate(bufferSize); 61 wbuffer.flip(); 62 63 warray = new ByteArrayOutputStream (); 64 rarray = new ByteArrayOutputStream (); 65 } 66 67 68 public synchronized void initialize(SocketChannel channel) { 69 super.initialize(channel); 70 71 commandProcessor = (CommandProcessor) ObjectFactory.getObject(ObjectName.COMMAND_PROCESSOR); 73 commandFactory = (CommandFactory) ObjectFactory.getObject(ObjectName.COMMAND_FACTORY); 74 session = (Session) ObjectFactory.getObject(ObjectName.SESSION); 75 dataConnectionInitiator = (DataConnectionInitiator) ObjectFactory.getObject(ObjectName.DATA_CONNECTION_INITIATOR); 76 dataConnectionInitiator.setControlConnection(this); 77 78 startReaderThread(); 79 startWriterThread(); 80 } 81 82 83 86 protected boolean flushReadArray() throws Exception { 87 if (!rarrayComplete) return false; 88 89 String s = new String (rarray.toByteArray(), utf8 ? CHARSET_UTF8 : CHARSET_ASCII); 91 char[] carr = s.toCharArray(); 92 synchronized (incomingBuffer) { 93 for (char c : carr) 94 if (c != UTF8_MAGIC_NUMBER) incomingBuffer.append(c); 95 } 96 97 rarray.reset(); 99 rarrayComplete = false; 100 101 return true; 102 } 103 104 105 110 protected void pushIncomingBuffer(byte[] arr, int stopIndex) throws Exception { 111 118 for (int z = 0; z < stopIndex; z++) { 119 byte b = arr[z]; 120 121 if (b >= 0 && b <= 127) { 123 rarray.write(b); 125 rarrayComplete = true; 126 } else { 127 flushReadArray(); 129 rarray.write(b); 130 } 131 } 132 133 flushReadArray(); 135 } 136 137 138 protected void read() throws Exception { 139 144 if (getOutgoingBufferSize() > 0 || bytesWrote == 0 || poisoned) { 145 Thread.sleep(sleep); 146 return; 147 } 148 149 rbuffer.clear(); 151 int i = sc.read(rbuffer); 153 if (i == -1) throw new BrokenPipeException(); 155 156 bytesRead += i; 157 log.debug("Read from socket "+i+" bytes (total "+bytesRead+")"); 158 159 byte[] barr = rbuffer.array(); 161 pushIncomingBuffer(barr, i); 162 163 executeCommands(); 165 } 166 167 168 173 protected byte[] popOutgoingBuffer(int maxBytes) throws Exception { 174 byte[] barr; 175 if (warray.size() > 0) { 176 177 barr = warray.toByteArray(); 179 warray.reset(); 180 181 } else { 182 183 String str; 185 int end = maxBytes; 186 synchronized (outgoingBuffer) { 187 if (end > outgoingBuffer.length()) end = outgoingBuffer.length(); 189 if (end == 0) return null; str = outgoingBuffer.substring(0, end); 191 outgoingBuffer.delete(0, end); 193 } 194 195 barr = str.getBytes(utf8 ? CHARSET_UTF8 : CHARSET_ASCII); 197 198 } 199 200 if (barr.length > maxBytes) { 202 warray.write(barr, maxBytes, barr.length-maxBytes); byte[] trg = new byte[maxBytes]; 204 System.arraycopy(barr,0,trg,0,maxBytes); 205 return trg; 206 } else { 207 return barr; 208 } 209 } 210 211 212 protected void write() throws Exception { 213 214 if (!wbuffer.hasRemaining()) { 216 217 int cap = wbuffer.capacity(); 219 byte[] barr = popOutgoingBuffer(cap); 220 221 if (barr == null) { 223 Thread.sleep(sleep); 224 return; 225 } 226 227 wbuffer.clear(); 229 wbuffer.put(barr); 230 wbuffer.flip(); 231 } 232 233 int i = sc.write(wbuffer); 236 if (i == -1) throw new BrokenPipeException(); 238 239 bytesWrote += i; 240 log.debug("Wrote into socket "+i+" bytes (total "+bytesWrote+")"); 241 } 242 243 244 public void service() throws Exception { 245 249 if (poisoned) { 250 boolean kill = true; 251 252 if (dataConnection != null && !dataConnection.isDestroyed()) kill = false; 253 254 if (getOutgoingBufferSize() > 0) kill = false; 255 256 if (bytesWrote == 0) kill = false; 257 258 if (kill) throw new PoisonedException(); 259 } 260 } 261 262 263 264 protected void executeCommands() throws Exception { 265 while (true) { 266 Command command = getNextCommand(); 267 if (command == null) break; 268 commandProcessor.execute(command); 269 } 270 } 271 272 273 276 protected Command getNextCommand() throws Exception { 277 String input; 279 synchronized (incomingBuffer) { 280 int i = incomingBuffer.indexOf("\r\n"); 281 if (i == -1) return null; input = incomingBuffer.substring(0, i); 283 incomingBuffer.delete(0, i+2); if (input.trim().length() == 0) return null; log.debug("Extracted user input: "+input); 286 } 287 288 Command command = commandFactory.create(input); 289 command.setConnection(this); 290 291 if (interruptState && !command.processInInterruptState()) { 293 log.debug("Execution of the command is not allowed while the connection is in INTERRUPT state (dropping command)"); 294 return null; 295 } 296 if (poisoned && !command.processInInterruptState()) { 297 log.debug("Execution of the command is not allowed while the connection is poisoned (dropping command)"); 298 return null; 299 } 300 301 return command; 302 } 303 304 305 public synchronized void reply(Reply reply) { 306 String prepared = reply.prepare(); 308 synchronized (outgoingBuffer) { 309 outgoingBuffer.append(prepared); 310 } 311 log.debug("Prepared reply: "+prepared.trim()); 312 313 316 if (reply.getCode().startsWith("1")) { 317 interruptState = true; 318 log.debug("Reply has triggered INTERRUPT state"); 319 } 320 else 321 if (interruptState) { 322 Command command = reply.getCommand(); 324 if (command == null || command.canClearInterruptState()) { 325 interruptState = false; 326 log.debug("Reply has cleared INTERRUPT state"); 327 } 328 } 329 } 330 331 332 public Session getSession() { 333 return session; 334 } 335 336 337 public DataConnection getDataConnection() { 338 return dataConnection; 339 } 340 341 342 public void setDataConnection(DataConnection dataConnection) { 343 this.dataConnection = dataConnection; 344 } 345 346 347 public synchronized void destroy() { 348 if (dataConnectionInitiator.isActive()) dataConnectionInitiator.abort(); 350 351 if (dataConnection != null) dataConnection.destroy(); 353 354 closeSessionDataChannel(); 356 357 super.destroy(); 358 } 359 360 361 362 protected void closeSessionDataChannel() { 363 Channel odc = (Channel ) session.getAttribute(SessionAttributeName.DATA_CONNECTION_CHANNEL); 364 if (odc != null) { 365 log.debug("Attempting to close data channel in session"); 366 session.removeAttribute(SessionAttributeName.DATA_CONNECTION_CHANNEL); 367 try { 368 odc.close(); 369 } catch (Throwable e) { 370 log.error("Error closing data channel (ignoring)", e); 371 } 372 } 373 } 374 375 376 public DataConnectionInitiator getDataConnectionInitiator() { 377 return dataConnectionInitiator; 378 } 379 380 381 public int getOutgoingBufferSize() { 382 synchronized (outgoingBuffer) { 383 return outgoingBuffer.length(); 384 } 385 } 386 387 388 public int getIncomingBufferSize() { 389 synchronized (incomingBuffer) { 390 return incomingBuffer.length(); 391 } 392 } 393 394 395 public boolean isUtf8() { 396 return utf8; 397 } 398 399 400 public void setUtf8(boolean utf8) { 401 this.utf8 = utf8; 402 } 403 } 404 | Popular Tags |