KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > core > net > SSLTransportImpl


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Uri Schneider.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 package org.mr.core.net;
47
48 import java.io.IOException JavaDoc;
49 import java.net.InetSocketAddress JavaDoc;
50 import java.net.SocketAddress JavaDoc;
51 import java.nio.ByteBuffer JavaDoc;
52 import java.nio.channels.Channels JavaDoc;
53 import java.nio.channels.ReadableByteChannel JavaDoc;
54 import java.nio.channels.SelectableChannel JavaDoc;
55 import java.nio.channels.WritableByteChannel JavaDoc;
56
57 import javax.net.ssl.SSLSocket;
58
59 import org.apache.commons.logging.Log;
60 import org.apache.commons.logging.LogFactory;
61 import org.mr.core.util.ActiveObject;
62 import org.mr.core.util.Stage;
63 import org.mr.core.util.StageHandler;
64 import org.mr.core.util.StageParams;
65 import org.mr.core.util.byteable.IncomingByteBufferPool;
66
67 /**
68  * SSLTransportImpl.java
69  *
70  * Created: Aug 16, 2004
71  *
72  * @author Uri Schneider
73  */

74 public class SSLTransportImpl implements TransportImpl, StageHandler, Runnable JavaDoc
75 {
76     private static final int IMPL_STATE_DOWN = 0;
77     private static final int IMPL_STATE_CONNECTING = 1;
78     private static final int IMPL_STATE_UP = 2;
79
80 // private SocketAddress local;
81
// private SocketAddress remote;
82
private SSLSocket socket;
83     private Log log;
84     private int implState;
85     private Stage stage;
86     private WritableByteChannel JavaDoc writeChannel;
87     private ReadableByteChannel JavaDoc readChannel;
88     private ByteBuffer JavaDoc outHeaderBuffer;
89     private NetworkListener listener;
90     private Transport owner;
91
92
93     public SSLTransportImpl(SSLSocket socket) throws IOException JavaDoc {
94         try {
95             this.socket = socket;
96             this.socket.setTcpNoDelay(true);
97         } catch (IOException JavaDoc e) {
98             this.implState = IMPL_STATE_DOWN;
99             throw e;
100         }
101         
102         commonInit();
103         postConnectInit();
104 // System.out.println(toString() + ": construct from socket");
105
}
106
107     public SSLTransportImpl(SocketAddress JavaDoc local, SocketAddress JavaDoc remote,
108                             Transport owner)
109         throws IOException JavaDoc
110     {
111         // we take the owner Transport as an argument to notify it
112
// when we finish connecting. normally this is the
113
// NetworkSelector's job, but this impl does not work with the
114
// selector.
115
this.owner = owner;
116
117         try {
118             this.socket =
119                 (SSLSocket) MantaSSLFactory.getInstance().createSocket();
120             if (this.socket == null) {
121                 this.log=LogFactory.getLog("SSLTransportImpl");
122                 if(log.isErrorEnabled()){
123                     log.error("SSL Factory did not return a socket.");
124                 }
125                 this.implState = IMPL_STATE_DOWN;
126                 return;
127             }
128             this.socket.setTcpNoDelay(true);
129             if (local != null) {
130                 this.socket.bind(local);
131             }
132 // this.socket.connect(remote);
133
} catch (IOException JavaDoc e) {
134             this.implState = IMPL_STATE_DOWN;
135             throw e;
136         }
137
138         commonInit();
139
140         final SocketAddress JavaDoc fremote = remote;
141         this.stage.enqueue(new ActiveObject() {
142                 public boolean call() {
143                     doConnect(fremote);
144                     return true;
145                 }
146             });
147
148 // System.out.println(toString() + ": construct from addr");
149
}
150
151     private void commonInit() {
152         // create a stage thread, which asynchronously handles
153
// connecting and writing
154
StageParams params = new StageParams();
155         params.setBlocking(false);
156         params.setPersistent(false);
157         params.setStageName("SSLWrite" + this.socket.getLocalPort());
158         params.setHandler(this);
159         params.setNumberOfStartThreads(1);
160         params.setMaxNumberOfThreads(10);
161         params.setStagePriority(0);
162         this.stage = new Stage(params);
163
164         this.log=LogFactory.getLog("SSLTransportImpl");
165         this.implState = IMPL_STATE_CONNECTING;
166     }
167
168     /**
169      * this should be called only when the socket is connected.
170      */

171     private void postConnectInit() {
172         try {
173             this.writeChannel =
174                 Channels.newChannel(this.socket.getOutputStream());
175             this.readChannel =
176                 Channels.newChannel(this.socket.getInputStream());
177             this.outHeaderBuffer =
178                 ByteBuffer.allocate(CNLMessage.CNL_HEADERLEN);
179             new Thread JavaDoc(this).start();
180         } catch (IOException JavaDoc e) {
181             if(log.isWarnEnabled()){
182                 this.log.warn(toString() +
183                                  ": IOException during post connect: " +
184                                  e.toString()+".");
185             }
186             shutdown();
187         }
188     }
189
190     /* (non-Javadoc)
191      * @see org.mr.core.net.TransportImpl#shutdown()
192      */

193     public synchronized void shutdown() {
194         // this may be called twice in a row, so check
195
if (!isDown()) {
196             try {
197                 if(log.isInfoEnabled()){
198                     log.info("SHUTTING DOWN IMPL " + toString()+".");
199                 }
200                 this.socket.close();
201                 this.socket = null;
202                 this.implState = IMPL_STATE_DOWN;
203             } catch (IOException JavaDoc e) {}
204             this.stage.enqueue(new ActiveObject() {
205                     public boolean call() {
206                         return false;
207                     }
208                 });
209             if (this.listener != null) {
210                 this.listener.implShutdown();
211             }
212         }
213     }
214
215     /* (non-Javadoc)
216      * @see org.mr.core.net.SelectorReadCallback#read()
217      */

218     public void read() {
219         // nothing here. SSL impls won't be handled by the NetworkSelector
220
}
221
222     /* (non-Javadoc)
223      * @see org.mr.core.net.TransportImpl#write(org.mr.core.net.CNLMessage, int, org.mr.core.net.NetworkSelector)
224      */

225     public void write(CNLMessage msg, int id, NetworkSelector selector) {
226 // System.out.println(toString() + ": outer write, id = " + id);
227
final CNLMessage fmsg = msg;
228         final int fid = id;
229
230         msg.use();
231         this.stage.enqueue(new ActiveObject() {
232                 public boolean call() {
233                     doWrite(fmsg, fid);
234                     return true;
235                 }
236             });
237     }
238
239     private void doWrite(CNLMessage message, int id) {
240 // System.out.println(toString() + ": inner write, id = " + id);
241
if (isConnected()) {
242             try {
243                 if(log.isDebugEnabled()){
244                     log.debug("Sending message(" + id + ") to " +
245                                       toString()+".");
246                 }
247
248                 // serialize and send CNL header
249
this.outHeaderBuffer.clear();
250                 message.headerToBuffer(outHeaderBuffer, id);
251                 this.writeChannel.write(outHeaderBuffer);
252                 this.listener.activityDetected();
253
254                 // serialized and send CNL payload
255
ByteBuffer JavaDoc[] payloadBuffers = message.valueAsBuffers();
256                 for (int i = 0; i < payloadBuffers.length; i++) {
257                     this.writeChannel.write(payloadBuffers[i]);
258                     this.listener.activityDetected();
259                 }
260                 message.setSent();
261                 message.unuse();
262             } catch (IOException JavaDoc e) {
263                 if(log.isWarnEnabled()){
264                     this.log.warn(toString() +
265                                      ": IOException during write: " +
266                                      e.toString()+".");
267                 }
268                 message.unuse();
269                 shutdown();
270             }
271         } else {
272             message.unuse();
273         }
274     }
275
276     private void doConnect(SocketAddress JavaDoc remote) {
277 // System.out.println(toString() + ": inner connect to " + remote);
278
try {
279             this.socket.connect(remote);
280             this.owner.finishedConnecting(this);
281             postConnectInit();
282         } catch (IOException JavaDoc e) {
283             if(log.isWarnEnabled()){
284                 this.log.warn(toString() +
285                                  ": IOException during connect: " +
286                                  e.toString()+".");
287             }
288             shutdown();
289         }
290     }
291
292     /* (non-Javadoc)
293      * @see org.mr.core.net.TransportImpl#isInitialized()
294      */

295     public boolean isInitialized() {
296         return this.implState == IMPL_STATE_UP;
297     }
298
299     /* (non-Javadoc)
300      * @see org.mr.core.net.TransportImpl#setInitialized()
301      */

302     public void setInitialized() {
303         this.implState = IMPL_STATE_UP;
304     }
305
306     /* (non-Javadoc)
307      * @see org.mr.core.net.TransportImpl#getChannel()
308      */

309     public SelectableChannel JavaDoc getChannel() {
310         return null;
311     }
312
313     /* (non-Javadoc)
314      * @see org.mr.core.net.TransportImpl#getType()
315      */

316     public TransportType getType() {
317         return TransportType.SSL;
318     }
319
320     /* (non-Javadoc)
321      * @see org.mr.core.net.TransportImpl#isConnected()
322      */

323     public boolean isConnected() {
324         return this.socket != null && this.socket.isConnected();
325     }
326
327     /* (non-Javadoc)
328      * @see org.mr.core.net.TransportImpl#isDown()
329      */

330     public boolean isDown() {
331         return this.implState == IMPL_STATE_DOWN;
332     }
333
334     /* (non-Javadoc)
335      * @see org.mr.core.net.TransportImpl#setListener(org.mr.core.net.NetworkListener)
336      */

337     public void setListener(NetworkListener listener) {
338         this.listener = listener;
339     }
340
341     /* (non-Javadoc)
342      * @see org.mr.core.net.TransportImpl#onConnect()
343      */

344     public void onConnect() {}
345
346     /* (non-Javadoc)
347      * @see org.mr.core.net.TransportImpl#getLocalSocketAddress()
348      */

349     public InetSocketAddress JavaDoc getLocalSocketAddress() {
350         return (InetSocketAddress JavaDoc) this.socket.getLocalSocketAddress();
351     }
352
353     /* (non-Javadoc)
354      * @see org.mr.core.net.TransportImpl#getRemoteSocketAddress()
355      */

356     public InetSocketAddress JavaDoc getRemoteSocketAddress() {
357         return (InetSocketAddress JavaDoc) this.socket.getRemoteSocketAddress();
358     }
359
360     /* (non-Javadoc)
361      * @see org.mr.core.net.SelectorReadCallback#selectWrite()
362      */

363     public void selectWrite() {
364         // never will be called
365
}
366
367     /* (non-Javadoc)
368      * @see org.mr.core.util.StageHandler#handle(java.lang.Object)
369      */

370     public boolean handle(Object JavaDoc o) {
371         ActiveObject ao = (ActiveObject) o;
372         return ao.call();
373     }
374
375     // Reader thread Runnable implementation
376
public void run() {
377         Thread.currentThread().setName("SSLRead" +
378                                        getLocalSocketAddress().getPort());
379
380 // System.out.println(toString() + ": read thread starting.");
381
int nBytes;
382         ByteBuffer JavaDoc lengthBuf = ByteBuffer.allocate(CNLMessage.CNL_HEADERLEN);
383         ByteBuffer JavaDoc messageBuf = null;
384         CNLMessage message;
385
386         try {
387             while (true) {
388                 lengthBuf.clear();
389                 message = new CNLMessage(true);
390                 messageBuf = null;
391                 int readlen;
392
393                 // read header
394
while (lengthBuf.remaining() > 0) {
395                     readlen = readChannel.read(lengthBuf);
396 // System.out.println(toString() + ": read " + readlen);
397
if (readlen == -1) {
398                         if(log.isWarnEnabled())
399                             this.log.warn("Channel " + toString() +
400                                           " EOF. Shutting down.");
401                         shutdown();
402                     }
403                 }
404                 lengthBuf.flip();
405                 message.readHeader(lengthBuf);
406
407 // System.out.println(toString() + ": read " +
408
// message.toString());
409
// allocate payload buffer and read payload
410
messageBuf =
411                     IncomingByteBufferPool.getInstance()
412                     .getBuffer(message.getLength());
413                 messageBuf.limit(message.getLength());
414                 while (messageBuf.remaining() > 0) {
415                     readlen = readChannel.read(messageBuf);
416 // System.out.println(toString() + ": read " + readlen);
417
if (readlen == -1) {
418                         if(log.isWarnEnabled())
419                             log.warn("Channel " + toString() +
420                                      " EOF. Shutting down.");
421                         shutdown();
422                     }
423                 }
424                 messageBuf.flip();
425                 message.setBuffer(messageBuf);
426                 if(log.isDebugEnabled()){
427                     log.debug("Received message(" + message.getID() +
428                                  ") from " + toString()+".");
429                 }
430
431                 // send message up
432
message.setSourceAddress(this.socket.getRemoteSocketAddress());
433                 message.setDestAddress(this.socket.getLocalSocketAddress());
434 // System.out.println(toString() + ": read " +
435
// message.toString());
436
this.listener.messageReady(message);
437             }
438         } catch (IOException JavaDoc e) {
439             if(log.isWarnEnabled()){
440                 this.log.warn(toString() +
441                                  ": IOException during read: " +
442                                  e.toString()+".");
443             }
444             if (messageBuf != null) {
445                 IncomingByteBufferPool.getInstance().release(messageBuf);
446             }
447             shutdown();
448         }
449     }
450
451     public String JavaDoc toString() {
452         StringBuffer JavaDoc buf = new StringBuffer JavaDoc();
453         try {
454             buf.append(this.socket.getLocalSocketAddress().toString());
455             buf.append("--");
456             buf.append(this.socket.getRemoteSocketAddress().toString());
457         } catch (Throwable JavaDoc t) {
458             buf.append("/unknown/unknown");
459         }
460         buf.append("@SSL");
461
462         return buf.toString();
463     }
464 }
465
Popular Tags