KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > coldcore > coloradoftp > connection > impl > GenericDataPortListener


1 package com.coldcore.coloradoftp.connection.impl;
2
3 import com.coldcore.coloradoftp.command.Reply;
4 import com.coldcore.coloradoftp.connection.ConnectionPool;
5 import com.coldcore.coloradoftp.connection.ControlConnection;
6 import com.coldcore.coloradoftp.connection.DataConnection;
7 import com.coldcore.coloradoftp.connection.DataPortListener;
8 import com.coldcore.coloradoftp.factory.ObjectFactory;
9 import com.coldcore.coloradoftp.factory.ObjectName;
10 import org.apache.log4j.Logger;
11
12 import java.io.IOException JavaDoc;
13 import java.net.InetSocketAddress JavaDoc;
14 import java.nio.channels.ServerSocketChannel JavaDoc;
15 import java.nio.channels.SocketChannel JavaDoc;
16 import java.util.HashMap JavaDoc;
17 import java.util.Map JavaDoc;
18
19 /**
20  * @see com.coldcore.coloradoftp.connection.DataPortListener
21  */

22 public class GenericDataPortListener implements DataPortListener, Runnable JavaDoc {
23
24   private static Logger log = Logger.getLogger(GenericDataPortListener.class);
25   protected int port;
26   protected boolean bound;
27   protected ServerSocketChannel JavaDoc ssc;
28   protected Map JavaDoc<String JavaDoc, ControlConnection> awaiting;
29   protected ConnectionPool dataConnectionPool;
30   protected Reply errorReply;
31   protected Thread JavaDoc thr;
32   protected long sleep;
33
34
35   public GenericDataPortListener() {
36     port = -1;
37     sleep = 100L;
38     awaiting = new HashMap JavaDoc<String JavaDoc,ControlConnection>();
39   }
40
41
42   protected Reply getErrorReply() {
43     if (errorReply == null) {
44       errorReply = (Reply) ObjectFactory.getObject(ObjectName.REPLY);
45       errorReply.setCode("425");
46       errorReply.setText("Can't open data connection.");
47     }
48     return errorReply;
49   }
50
51
52   /** Get thread sleep time
53    * @return Time in mills
54    */

55   public long getSleep() {
56     return sleep;
57   }
58
59
60   /** Set thread sleep time
61    * @param sleep Time in mills
62    */

63   public void setSleep(long sleep) {
64     this.sleep = sleep;
65   }
66
67
68   public void setPort(int port) {
69     this.port = port;
70   }
71
72
73   public int getPort() {
74     return port;
75   }
76
77
78   public synchronized void bind() throws IOException JavaDoc {
79     if (port < 1) throw new IllegalArgumentException JavaDoc("Set correct port first");
80
81     if (bound) {
82       log.warn("Listener on port "+port+" was bound when bind routine was submitted");
83       throw new IllegalStateException JavaDoc("Unbind the listener on port "+port+" first");
84     }
85
86     //Get required objects
87
dataConnectionPool = (ConnectionPool) ObjectFactory.getObject(ObjectName.DATA_CONNECTION_POOL);
88
89     //Bind to the port
90
ssc = ServerSocketChannel.open();
91     ssc.socket().bind(new InetSocketAddress JavaDoc(port));
92
93     //Start this class
94
thr = new Thread JavaDoc(this);
95     thr.start();
96
97     bound = true;
98     log.debug("Listener is bound to port "+port);
99   }
100
101
102   public synchronized void unbind() throws IOException JavaDoc {
103     if (!bound) {
104       log.warn("Listener on port "+port+" was not bound when unbind routine was submitted");
105       throw new IllegalStateException JavaDoc("Cannot unbind the listener on port "+port+", it is not bound");
106     }
107
108     //Remove all awaiting connections
109
synchronized (awaiting) {
110       for (ControlConnection connection : awaiting.values())
111         removeConnection(connection);
112     }
113
114     //Unbind from the port
115
if (ssc.isOpen()) ssc.close();
116
117     bound = false;
118     log.debug("Listener on port "+port+" is unbound");
119   }
120
121
122   public boolean isBound() {
123     return bound;
124   }
125
126
127   public boolean addConnection(ControlConnection connection) {
128     if (!bound) return false;
129
130     //Clean up the map from unnecessary control connections
131
cleanup();
132
133     //Add a new one
134
String JavaDoc ip = connection.getSocketChannel().socket().getInetAddress().getHostAddress();
135     synchronized (awaiting) {
136       ControlConnection con = awaiting.get(ip);
137       if (con != null && con != connection) return false;
138       awaiting.put(ip, connection);
139       return true;
140     }
141   }
142
143
144   public boolean removeConnection(ControlConnection connection) {
145     String JavaDoc ip = connection.getSocketChannel().socket().getInetAddress().getHostAddress();
146     synchronized (awaiting) {
147       ControlConnection c = awaiting.get(ip);
148       if (c == connection) {
149         awaiting.remove(ip);
150         c.reply(getErrorReply());
151         return true;
152       }
153       return false;
154     }
155   }
156
157
158   public void run() {
159     while (bound) {
160
161       ControlConnection controlConnection = null;
162       DataConnection dataConnection = null;
163       SocketChannel JavaDoc sc = null;
164       try {
165         sc = ssc.accept(); //Thread blocks here...
166
String JavaDoc ip = sc.socket().getInetAddress().getHostAddress();
167         log.debug("New incoming data connection (IP "+ip+")");
168
169         //Create new connection instance
170
dataConnection = (DataConnection) ObjectFactory.getObject(ObjectName.DATA_CONNECTION);
171         dataConnection.initialize(sc);
172
173         //Locate a control connection waiting for this data connection
174
controlConnection = popControlConnection(dataConnection);
175         if (controlConnection == null) {
176           log.warn("No control connection found for an incoming data connection");
177           dataConnection.destroyNoReply();
178         } else {
179
180           //If there is a data connection already then kill it
181
DataConnection existing = controlConnection.getDataConnection();
182           if (existing != null && !existing.isDestroyed()) {
183             log.warn("BUG: Replacing existing data connection with a new one!");
184             existing.destroyNoReply();
185           }
186
187           //Configure the data connection and wire it with the control connection and add to the pool
188
controlConnection.setDataConnection(dataConnection);
189           dataConnection.setControlConnection(controlConnection);
190           configure(dataConnection);
191           dataConnectionPool.add(dataConnection);
192           log.debug("New data connection is ready");
193         }
194
195         Thread.sleep(sleep);
196
197       } catch (Throwable JavaDoc e) {
198         log.warn("Failed to accept a connection (ignoring)", e);
199         try {
200           dataConnection.destroyNoReply();
201         } catch (Throwable JavaDoc ex) {}
202         try {
203           sc.close();
204         } catch (Throwable JavaDoc ex) {
205           log.error("Cannot close the channel (ignoring)", e);
206         }
207
208         //Send error reply
209
if (controlConnection != null) controlConnection.reply(getErrorReply());
210       }
211
212     }
213     log.debug("Data port listener thread finished");
214   }
215
216
217   /** Cleas up the map from connections which should not be in it */
218   protected void cleanup() {
219     synchronized (awaiting) {
220       for (String JavaDoc ip : awaiting.keySet()) {
221         ControlConnection connection = awaiting.get(ip);
222         if (connection.isDestroyed()) awaiting.remove(ip);
223       }
224     }
225   }
226
227
228   /** Locate a control connection which awaits for a data connection and remove it
229    * @param dataConnection Incoming data connection
230    * @return Control connection or NULL if a control connection cannot be located and the data connection should be dropped
231    */

232   protected ControlConnection popControlConnection(DataConnection dataConnection) {
233     String JavaDoc dip = dataConnection.getSocketChannel().socket().getInetAddress().getHostAddress();
234     synchronized (awaiting) {
235       for (String JavaDoc ip : awaiting.keySet()) {
236         if (ip.equals(dip)) {
237           ControlConnection controlConnection = awaiting.remove(ip);
238           return controlConnection.isDestroyed() ? null : controlConnection;
239         }
240       }
241     }
242     return null;
243   }
244
245
246   /** Configure connection before adding it to a pool
247    * @param connection Connection
248    */

249   public void configure(DataConnection connection) {
250   }
251 }
252
Popular Tags