KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > coldcore > coloradoftp > connection > impl > GenericDataConnectionInitiator


1 package com.coldcore.coloradoftp.connection.impl;
2
3 import com.coldcore.coloradoftp.command.Reply;
4 import com.coldcore.coloradoftp.connection.ConnectionPool;
5 import com.coldcore.coloradoftp.connection.ControlConnection;
6 import com.coldcore.coloradoftp.connection.DataConnection;
7 import com.coldcore.coloradoftp.connection.DataConnectionInitiator;
8 import com.coldcore.coloradoftp.factory.ObjectFactory;
9 import com.coldcore.coloradoftp.factory.ObjectName;
10 import com.coldcore.coloradoftp.session.Session;
11 import com.coldcore.coloradoftp.session.SessionAttributeName;
12 import org.apache.log4j.Logger;
13
14 import java.net.InetSocketAddress JavaDoc;
15 import java.nio.channels.SocketChannel JavaDoc;
16
17 /**
18  * @see com.coldcore.coloradoftp.connection.DataConnectionInitiator
19  */

20 public class GenericDataConnectionInitiator implements DataConnectionInitiator, Runnable JavaDoc {
21
22   private static Logger log = Logger.getLogger(GenericDataConnectionInitiator.class);
23   protected String JavaDoc ip;
24   protected int port;
25   protected boolean active;
26   protected ControlConnection controlConnection;
27   protected ConnectionPool dataConnectionPool;
28   protected SocketChannel JavaDoc sc;
29   protected Reply errorReply;
30   protected Thread JavaDoc thr;
31   protected long sleep;
32   protected boolean aborted;
33
34
35   public GenericDataConnectionInitiator() {
36     sleep = 100L;
37   }
38
39
40   protected Reply getErrorReply() {
41     if (errorReply == null) {
42       errorReply = (Reply) ObjectFactory.getObject(ObjectName.REPLY);
43       errorReply.setCode("425");
44       errorReply.setText("Can't open data connection.");
45     }
46     return errorReply;
47   }
48
49
50   /** Get thread sleep time
51    * @return Time in mills
52    */

53   public long getSleep() {
54     return sleep;
55   }
56
57
58   /** Set thread sleep time
59    * @param sleep Time in mills
60    */

61   public void setSleep(long sleep) {
62     this.sleep = sleep;
63   }
64
65
66   /** Test if user got a "150" reply
67    * @return TRUE if user got the reply, FALSE if not yet
68    */

69   protected boolean isReply150() {
70     Session session = controlConnection.getSession();
71     Long JavaDoc bytesWrote = (Long JavaDoc) session.getAttribute(SessionAttributeName.BYTE_MARKER_150_REPLY);
72     if (bytesWrote == null || controlConnection.getBytesWrote() == bytesWrote || controlConnection.getOutgoingBufferSize() != 0) return false;
73     log.debug("User got a 150 reply");
74     return true;
75   }
76
77
78   public String JavaDoc getIp() {
79     return ip;
80   }
81
82
83   public void setIp(String JavaDoc ip) {
84     this.ip = ip;
85   }
86
87
88   public int getPort() {
89     return port;
90   }
91
92
93   public void setPort(int port) {
94     this.port = port;
95   }
96
97
98   public void run() {
99     while (active) {
100
101       DataConnection dataConnection = null;
102       try {
103
104         /* We cannot open the socket yet. We must wait until user receives the positive "150" reply.
105          * The reply might not be in the buffer of the control connection just yet.
106          */

107         if (!isReply150()) {
108           Thread.sleep(sleep);
109           continue;
110         }
111
112         //Get required objects
113
dataConnectionPool = (ConnectionPool) ObjectFactory.getObject(ObjectName.DATA_CONNECTION_POOL);
114
115         //Configure socket and connect
116
sc = SocketChannel.open();
117         sc.connect(new InetSocketAddress JavaDoc(ip, port)); //Thread blocks here...
118
if (!sc.finishConnect()) throw new RuntimeException JavaDoc("Failed finishConnect");
119         String JavaDoc ip = sc.socket().getInetAddress().getHostAddress();
120         log.debug("New data connection established (IP "+ip+")");
121
122         //Create new connection instance
123
dataConnection = (DataConnection) ObjectFactory.getObject(ObjectName.DATA_CONNECTION);
124         dataConnection.initialize(sc);
125
126         //If there is a data connection already then kill it
127
DataConnection existing = controlConnection.getDataConnection();
128         if (existing != null && !existing.isDestroyed()) {
129           log.warn("BUG: Replacing existing data connection with a new one!");
130           existing.destroyNoReply();
131         }
132
133         //Configure the data connection and wire it with the control connection and add to pool
134
controlConnection.setDataConnection(dataConnection);
135         dataConnection.setControlConnection(controlConnection);
136         configure(dataConnection);
137         dataConnectionPool.add(dataConnection);
138         log.debug("New data connection is ready");
139
140         active = false;
141
142       } catch (Throwable JavaDoc e) {
143
144         //If aborted then do not post an error message
145
if (!aborted) {
146           log.warn("Failed to establish a connection with "+ip+":"+port+" (ignoring)", e);
147           try {
148             dataConnection.destroyNoReply();
149           } catch (Throwable JavaDoc ex) {}
150           try {
151             sc.close();
152           } catch (Throwable JavaDoc ex) {
153             log.error("Cannot close the channel (ignoring)", e);
154           }
155
156           controlConnection.reply(getErrorReply());
157         }
158
159         active = false;
160       }
161
162     }
163     log.debug("Data connection initiator thread finished");
164   }
165
166
167   public boolean isActive() {
168     return active;
169   }
170
171
172   public synchronized void activate() {
173     if (active) {
174       log.warn("Data connection initiator was active when activate routine was called");
175       return;
176     }
177
178     active = true;
179     aborted = false;
180
181     //Start this class
182
thr = new Thread JavaDoc(this);
183     thr.start();
184   }
185
186
187   public synchronized void abort() {
188     aborted = true;
189     if (!active) return;
190
191     //Close the channel
192
try {
193       if (sc != null && sc.isOpen()) sc.close();
194     } catch (Throwable JavaDoc e) {
195       log.error("Cannot close channel (ignoring)", e);
196     }
197
198     controlConnection.reply(getErrorReply());
199
200     //Clear the attribute to prevent misuse by future instances
201
Session session = controlConnection.getSession();
202     session.removeAttribute(SessionAttributeName.BYTE_MARKER_150_REPLY);
203
204     active = false;
205   }
206
207
208   public ControlConnection getControlConnection() {
209     return controlConnection;
210   }
211
212
213   public void setControlConnection(ControlConnection controlConnection) {
214     this.controlConnection = controlConnection;
215   }
216
217
218   /** Configure connection before adding it to a pool
219    * @param connection Connection
220    */

221   public void configure(DataConnection connection) {
222   }
223 }
224
Popular Tags