KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > coldcore > coloradoftp > connection > impl > GenericControlConnection


1 package com.coldcore.coloradoftp.connection.impl;
2
3 import com.coldcore.coloradoftp.command.Command;
4 import com.coldcore.coloradoftp.command.CommandFactory;
5 import com.coldcore.coloradoftp.command.CommandProcessor;
6 import com.coldcore.coloradoftp.command.Reply;
7 import com.coldcore.coloradoftp.connection.*;
8 import com.coldcore.coloradoftp.factory.ObjectFactory;
9 import com.coldcore.coloradoftp.factory.ObjectName;
10 import com.coldcore.coloradoftp.session.Session;
11 import com.coldcore.coloradoftp.session.SessionAttributeName;
12 import org.apache.log4j.Logger;
13
14 import java.io.ByteArrayOutputStream JavaDoc;
15 import java.nio.ByteBuffer JavaDoc;
16 import java.nio.channels.SocketChannel JavaDoc;
17 import java.nio.channels.Channel JavaDoc;
18
19 /**
20  * @see com.coldcore.coloradoftp.connection.Connection
21  *
22  * Control connections usualy do not eat much network traffic (that is a job of
23  * data connections), so there is no need to control network overhead for this type
24  * of connection. Control connection always have lower priority than data connections
25  * and their execution does not take place on every round of lyfe cycle thread.
26  */

27 public class GenericControlConnection extends GenericConnection implements ControlConnection {
28
29   private static Logger log = Logger.getLogger(GenericControlConnection.class);
30   protected ByteArrayOutputStream JavaDoc warray;
31   protected ByteArrayOutputStream JavaDoc rarray;
32   protected boolean rarrayComplete;
33   protected StringBuffer JavaDoc incomingBuffer;
34   protected StringBuffer JavaDoc outgoingBuffer;
35   protected boolean interruptState;
36   protected CommandProcessor commandProcessor;
37   protected CommandFactory commandFactory;
38   protected Session session;
39   protected DataConnection dataConnection;
40   protected DataConnectionInitiator dataConnectionInitiator;
41   protected boolean utf8;
42
43   public static final String JavaDoc CHARSET_UTF8 = "UTF-8";
44   public static final String JavaDoc CHARSET_ASCII = "US-ASCII";
45
46   public static final char UTF8_MAGIC_NUMBER = (char)65279;
47
48
49   public GenericControlConnection(int bufferSize) {
50     super();
51
52     utf8 = true;
53
54     incomingBuffer = new StringBuffer JavaDoc();
55     outgoingBuffer = new StringBuffer JavaDoc();
56
57     rbuffer = ByteBuffer.allocate(bufferSize);
58     rbuffer.flip();
59
60     wbuffer = ByteBuffer.allocate(bufferSize);
61     wbuffer.flip();
62
63     warray = new ByteArrayOutputStream JavaDoc();
64     rarray = new ByteArrayOutputStream JavaDoc();
65   }
66
67
68   public synchronized void initialize(SocketChannel JavaDoc channel) {
69     super.initialize(channel);
70
71     //ObjectFactory cannot be used in a constructor, so we create the rest of the objects here
72
commandProcessor = (CommandProcessor) ObjectFactory.getObject(ObjectName.COMMAND_PROCESSOR);
73     commandFactory = (CommandFactory) ObjectFactory.getObject(ObjectName.COMMAND_FACTORY);
74     session = (Session) ObjectFactory.getObject(ObjectName.SESSION);
75     dataConnectionInitiator = (DataConnectionInitiator) ObjectFactory.getObject(ObjectName.DATA_CONNECTION_INITIATOR);
76     dataConnectionInitiator.setControlConnection(this);
77
78     startReaderThread();
79     startWriterThread();
80   }
81
82
83   /** Try to flush the content of the read array to the incoming buffer.
84    * @return TRUE is flushed, FALSE otherwise
85    */

86   protected boolean flushReadArray() throws Exception JavaDoc {
87     if (!rarrayComplete) return false;
88
89     //Decode and feed collected bytes to the incoming buffer (also remove the UTF-8 magic number if present)
90
String JavaDoc s = new String JavaDoc(rarray.toByteArray(), utf8 ? CHARSET_UTF8 : CHARSET_ASCII);
91     char[] carr = s.toCharArray();
92     synchronized (incomingBuffer) {
93       for (char c : carr)
94         if (c != UTF8_MAGIC_NUMBER) incomingBuffer.append(c);
95     }
96
97     //Reset the read array
98
rarray.reset();
99     rarrayComplete = false;
100
101     return true;
102   }
103
104
105   /** Decode and write the next part of user's input to the incoming buffer.
106    * Used by the "read" routine to receive UFT-8 bytes.
107    * @param arr Byte array containing the next part of user's input
108    * @param stopIndex User's input stops at this index in the byte array
109    */

110   protected void pushIncomingBuffer(byte[] arr, int stopIndex) throws Exception JavaDoc {
111     /* We will feed user input to the read array one byte at a time till we hit any 1-byte character.
112      * Only then the bytes in the read array may be safely decoded to UTF-8 (where one char may be
113      * represented by 2-3 bytes) and written to the incoming buffer. For performance we will not flush
114      * the read array every time we encounter 1-byte character, but we will do so before any 2-3 bytes
115      * UTF-8 character and in the end if possible (rarrayComplete). If the read array cannot be decoded
116      * then the bytes in it will wait till the next "read" routine.
117      */

118     for (int z = 0; z < stopIndex; z++) {
119       byte b = arr[z];
120
121       //x00-x7F is a 1-byte character (UTF-8/ASCII), otherwise the character takes 2-3 bytes
122
if (b >= 0 && b <= 127) {
123         //Add the byte to the read array and proceed (read array may now be decoded)
124
rarray.write(b);
125         rarrayComplete = true;
126       } else {
127         //Try to flush the read array then add the byte to it and proceed
128
flushReadArray();
129         rarray.write(b);
130       }
131     }
132
133     //Try to flush the read array one more time in the end
134
flushReadArray();
135   }
136
137
138   protected void read() throws Exception JavaDoc {
139     /* We must not read anything if:
140      * 1. There is some data in outgoing buffer waiting to be send to the user
141      * 2. User did not receive a welcome message yet and it is not yet in the outgoing buffer
142      * 3. Connection is poisoned
143      */

144     if (getOutgoingBufferSize() > 0 || bytesWrote == 0 || poisoned) {
145       Thread.sleep(sleep);
146       return;
147     }
148
149     //Read data from socket and append it to the incoming buffer.
150
rbuffer.clear();
151     int i = sc.read(rbuffer); //Thread blocks here...
152

153     //Client disconnected?
154
if (i == -1) throw new BrokenPipeException();
155
156     bytesRead += i;
157     log.debug("Read from socket "+i+" bytes (total "+bytesRead+")");
158
159     //This will add user input to the incoming buffer decoded with proper charset
160
byte[] barr = rbuffer.array();
161     pushIncomingBuffer(barr, i);
162
163     //Execute commands waiting in the buffer
164
executeCommands();
165   }
166
167
168   /** Encode and return the next part of server's response from the the outgoing buffer.
169    * Used by the "write" routine to send UFT-8 bytes.
170    * @param maxBytes Byte array size limit
171    * @return Encoded byte array not longer than the limit or NULL if there is nothing to write
172    */

173   protected byte[] popOutgoingBuffer(int maxBytes) throws Exception JavaDoc {
174     byte[] barr;
175     if (warray.size() > 0) {
176
177       //Reminder from the last output is still pending
178
barr = warray.toByteArray();
179       warray.reset();
180
181     } else {
182
183       //Get a string from the outgoing buffer to write into socket
184
String JavaDoc str;
185       int end = maxBytes;
186       synchronized (outgoingBuffer) {
187         //Correct sub-string length (if current length is longer than available data size)
188
if (end > outgoingBuffer.length()) end = outgoingBuffer.length();
189         if (end == 0) return null; //Nothing to write
190
str = outgoingBuffer.substring(0, end);
191         //Remove this string from the outgoing buffer
192
outgoingBuffer.delete(0, end);
193       }
194
195       //Convert to byte array, the length of the byte array may be greater than the string length (UTF-8 encoding)
196
barr = str.getBytes(utf8 ? CHARSET_UTF8 : CHARSET_ASCII);
197
198     }
199
200     //Will the new array fit into the buffer?
201
if (barr.length > maxBytes) {
202       warray.write(barr, maxBytes, barr.length-maxBytes); //This will not fit into the socket buffer, save it for the next "write"
203
byte[] trg = new byte[maxBytes];
204       System.arraycopy(barr,0,trg,0,maxBytes);
205       return trg;
206     } else {
207       return barr;
208     }
209   }
210
211
212   protected void write() throws Exception JavaDoc {
213
214     //Read more data from the outgoing buffer into the buffer only if the buffer is empty
215
if (!wbuffer.hasRemaining()) {
216
217       //This will get server response from the outgoing buffer encoded with proper charset
218
int cap = wbuffer.capacity();
219       byte[] barr = popOutgoingBuffer(cap);
220
221       //Nothing to write?
222
if (barr == null) {
223         Thread.sleep(sleep);
224         return;
225       }
226
227       //Write out to the buffer
228
wbuffer.clear();
229       wbuffer.put(barr);
230       wbuffer.flip();
231     }
232
233     //Forward the data to the user
234
int i = sc.write(wbuffer); //Thread blocks here...
235

236     //Client disconnected?
237
if (i == -1) throw new BrokenPipeException();
238
239     bytesWrote += i;
240     log.debug("Wrote into socket "+i+" bytes (total "+bytesWrote+")");
241   }
242
243
244   public void service() throws Exception JavaDoc {
245     /* If connection has been poisoned then we can destroy it only when it writes all data out.
246      * We cannot kill it while it has an active data connection.
247      * We cannot kill if it did not write a welcome message yet.
248      */

249     if (poisoned) {
250       boolean kill = true;
251
252       if (dataConnection != null && !dataConnection.isDestroyed()) kill = false;
253
254       if (getOutgoingBufferSize() > 0) kill = false;
255
256       if (bytesWrote == 0) kill = false;
257
258       if (kill) throw new PoisonedException();
259     }
260   }
261
262
263   /** Execute commands waiting in the incoming buffer */
264   protected void executeCommands() throws Exception JavaDoc {
265     while (true) {
266       Command command = getNextCommand();
267       if (command == null) break;
268       commandProcessor.execute(command);
269     }
270   }
271
272
273   /** Reads next user command from the incoming buffer
274    * @return Command or NULL if it's not ready yet
275    */

276   protected Command getNextCommand() throws Exception JavaDoc {
277     //Extract the next command from buffer
278
String JavaDoc input;
279     synchronized (incomingBuffer) {
280       int i = incomingBuffer.indexOf("\r\n");
281       if (i == -1) return null; //Nothing to extraxt yet (the line is not finished)
282
input = incomingBuffer.substring(0, i);
283       incomingBuffer.delete(0, i+2); //Also delete \r\n in the end of the command
284
if (input.trim().length() == 0) return null; //This is an empty string, skip it
285
log.debug("Extracted user input: "+input);
286     }
287
288     Command command = commandFactory.create(input);
289     command.setConnection(this);
290
291     //If INTERRUPT state is set then ignore all but special FTP commands (same for the poisoned).
292
if (interruptState && !command.processInInterruptState()) {
293       log.debug("Execution of the command is not allowed while the connection is in INTERRUPT state (dropping command)");
294       return null;
295     }
296     if (poisoned && !command.processInInterruptState()) {
297       log.debug("Execution of the command is not allowed while the connection is poisoned (dropping command)");
298       return null;
299     }
300
301     return command;
302   }
303
304
305   public synchronized void reply(Reply reply) {
306     //Prepare reply and write it out
307
String JavaDoc prepared = reply.prepare();
308     synchronized (outgoingBuffer) {
309       outgoingBuffer.append(prepared);
310     }
311     log.debug("Prepared reply: "+prepared.trim());
312
313     /* Change "interrupt" state: if code starts with "1" then set it, otherwise unset.
314      * FTP spec: all codes that start with 1 demand client to wait for another reply.
315      */

316     if (reply.getCode().startsWith("1")) {
317       interruptState = true;
318       log.debug("Reply has triggered INTERRUPT state");
319     }
320     else
321     if (interruptState) {
322       //Check if reply's command can clear INTERRUPT state, otherwise leave the state as it is
323
Command command = reply.getCommand();
324       if (command == null || command.canClearInterruptState()) {
325         interruptState = false;
326         log.debug("Reply has cleared INTERRUPT state");
327       }
328     }
329   }
330
331
332   public Session getSession() {
333     return session;
334   }
335
336
337   public DataConnection getDataConnection() {
338     return dataConnection;
339   }
340
341
342   public void setDataConnection(DataConnection dataConnection) {
343     this.dataConnection = dataConnection;
344   }
345
346
347   public synchronized void destroy() {
348     //Abort data connection initiator if active
349
if (dataConnectionInitiator.isActive()) dataConnectionInitiator.abort();
350
351     //Destory data connection if exists
352
if (dataConnection != null) dataConnection.destroy();
353
354     //Test if there is data channel left in the session
355
closeSessionDataChannel();
356
357     super.destroy();
358   }
359
360
361   /** Close a data channel if exists in the session */
362   protected void closeSessionDataChannel() {
363     Channel JavaDoc odc = (Channel JavaDoc) session.getAttribute(SessionAttributeName.DATA_CONNECTION_CHANNEL);
364     if (odc != null) {
365       log.debug("Attempting to close data channel in session");
366       session.removeAttribute(SessionAttributeName.DATA_CONNECTION_CHANNEL);
367       try {
368         odc.close();
369       } catch (Throwable JavaDoc e) {
370         log.error("Error closing data channel (ignoring)", e);
371       }
372     }
373   }
374
375
376   public DataConnectionInitiator getDataConnectionInitiator() {
377     return dataConnectionInitiator;
378   }
379
380
381   public int getOutgoingBufferSize() {
382     synchronized (outgoingBuffer) {
383       return outgoingBuffer.length();
384     }
385   }
386
387
388   public int getIncomingBufferSize() {
389     synchronized (incomingBuffer) {
390       return incomingBuffer.length();
391     }
392   }
393
394
395   public boolean isUtf8() {
396     return utf8;
397   }
398
399
400   public void setUtf8(boolean utf8) {
401     this.utf8 = utf8;
402   }
403 }
404
Popular Tags