1 46 package org.mr.core.net; 47 48 import java.io.IOException ; 49 import java.net.InetSocketAddress ; 50 import java.net.SocketAddress ; 51 import java.nio.ByteBuffer ; 52 import java.nio.channels.Channels ; 53 import java.nio.channels.ReadableByteChannel ; 54 import java.nio.channels.SelectableChannel ; 55 import java.nio.channels.WritableByteChannel ; 56 57 import javax.net.ssl.SSLSocket; 58 59 import org.apache.commons.logging.Log; 60 import org.apache.commons.logging.LogFactory; 61 import org.mr.core.util.ActiveObject; 62 import org.mr.core.util.Stage; 63 import org.mr.core.util.StageHandler; 64 import org.mr.core.util.StageParams; 65 import org.mr.core.util.byteable.IncomingByteBufferPool; 66 67 74 public class SSLTransportImpl implements TransportImpl, StageHandler, Runnable 75 { 76 private static final int IMPL_STATE_DOWN = 0; 77 private static final int IMPL_STATE_CONNECTING = 1; 78 private static final int IMPL_STATE_UP = 2; 79 80 private SSLSocket socket; 83 private Log log; 84 private int implState; 85 private Stage stage; 86 private WritableByteChannel writeChannel; 87 private ReadableByteChannel readChannel; 88 private ByteBuffer outHeaderBuffer; 89 private NetworkListener listener; 90 private Transport owner; 91 92 93 public SSLTransportImpl(SSLSocket socket) throws IOException { 94 try { 95 this.socket = socket; 96 this.socket.setTcpNoDelay(true); 97 } catch (IOException e) { 98 this.implState = IMPL_STATE_DOWN; 99 throw e; 100 } 101 102 commonInit(); 103 postConnectInit(); 104 } 106 107 public SSLTransportImpl(SocketAddress local, SocketAddress remote, 108 Transport owner) 109 throws IOException 110 { 111 this.owner = owner; 116 117 try { 118 this.socket = 119 (SSLSocket) MantaSSLFactory.getInstance().createSocket(); 120 if (this.socket == null) { 121 this.log=LogFactory.getLog("SSLTransportImpl"); 122 if(log.isErrorEnabled()){ 123 log.error("SSL Factory did not return a socket."); 124 } 125 this.implState = IMPL_STATE_DOWN; 126 return; 127 } 128 this.socket.setTcpNoDelay(true); 129 if (local != null) { 130 this.socket.bind(local); 131 } 132 } catch (IOException e) { 134 this.implState = IMPL_STATE_DOWN; 135 throw e; 136 } 137 138 commonInit(); 139 140 final SocketAddress fremote = remote; 141 this.stage.enqueue(new ActiveObject() { 142 public boolean call() { 143 doConnect(fremote); 144 return true; 145 } 146 }); 147 148 } 150 151 private void commonInit() { 152 StageParams params = new StageParams(); 155 params.setBlocking(false); 156 params.setPersistent(false); 157 params.setStageName("SSLWrite" + this.socket.getLocalPort()); 158 params.setHandler(this); 159 params.setNumberOfStartThreads(1); 160 params.setMaxNumberOfThreads(10); 161 params.setStagePriority(0); 162 this.stage = new Stage(params); 163 164 this.log=LogFactory.getLog("SSLTransportImpl"); 165 this.implState = IMPL_STATE_CONNECTING; 166 } 167 168 171 private void postConnectInit() { 172 try { 173 this.writeChannel = 174 Channels.newChannel(this.socket.getOutputStream()); 175 this.readChannel = 176 Channels.newChannel(this.socket.getInputStream()); 177 this.outHeaderBuffer = 178 ByteBuffer.allocate(CNLMessage.CNL_HEADERLEN); 179 new Thread (this).start(); 180 } catch (IOException e) { 181 if(log.isWarnEnabled()){ 182 this.log.warn(toString() + 183 ": IOException during post connect: " + 184 e.toString()+"."); 185 } 186 shutdown(); 187 } 188 } 189 190 193 public synchronized void shutdown() { 194 if (!isDown()) { 196 try { 197 if(log.isInfoEnabled()){ 198 log.info("SHUTTING DOWN IMPL " + toString()+"."); 199 } 200 this.socket.close(); 201 this.socket = null; 202 this.implState = IMPL_STATE_DOWN; 203 } catch (IOException e) {} 204 this.stage.enqueue(new ActiveObject() { 205 public boolean call() { 206 return false; 207 } 208 }); 209 if (this.listener != null) { 210 this.listener.implShutdown(); 211 } 212 } 213 } 214 215 218 public void read() { 219 } 221 222 225 public void write(CNLMessage msg, int id, NetworkSelector selector) { 226 final CNLMessage fmsg = msg; 228 final int fid = id; 229 230 msg.use(); 231 this.stage.enqueue(new ActiveObject() { 232 public boolean call() { 233 doWrite(fmsg, fid); 234 return true; 235 } 236 }); 237 } 238 239 private void doWrite(CNLMessage message, int id) { 240 if (isConnected()) { 242 try { 243 if(log.isDebugEnabled()){ 244 log.debug("Sending message(" + id + ") to " + 245 toString()+"."); 246 } 247 248 this.outHeaderBuffer.clear(); 250 message.headerToBuffer(outHeaderBuffer, id); 251 this.writeChannel.write(outHeaderBuffer); 252 this.listener.activityDetected(); 253 254 ByteBuffer [] payloadBuffers = message.valueAsBuffers(); 256 for (int i = 0; i < payloadBuffers.length; i++) { 257 this.writeChannel.write(payloadBuffers[i]); 258 this.listener.activityDetected(); 259 } 260 message.setSent(); 261 message.unuse(); 262 } catch (IOException e) { 263 if(log.isWarnEnabled()){ 264 this.log.warn(toString() + 265 ": IOException during write: " + 266 e.toString()+"."); 267 } 268 message.unuse(); 269 shutdown(); 270 } 271 } else { 272 message.unuse(); 273 } 274 } 275 276 private void doConnect(SocketAddress remote) { 277 try { 279 this.socket.connect(remote); 280 this.owner.finishedConnecting(this); 281 postConnectInit(); 282 } catch (IOException e) { 283 if(log.isWarnEnabled()){ 284 this.log.warn(toString() + 285 ": IOException during connect: " + 286 e.toString()+"."); 287 } 288 shutdown(); 289 } 290 } 291 292 295 public boolean isInitialized() { 296 return this.implState == IMPL_STATE_UP; 297 } 298 299 302 public void setInitialized() { 303 this.implState = IMPL_STATE_UP; 304 } 305 306 309 public SelectableChannel getChannel() { 310 return null; 311 } 312 313 316 public TransportType getType() { 317 return TransportType.SSL; 318 } 319 320 323 public boolean isConnected() { 324 return this.socket != null && this.socket.isConnected(); 325 } 326 327 330 public boolean isDown() { 331 return this.implState == IMPL_STATE_DOWN; 332 } 333 334 337 public void setListener(NetworkListener listener) { 338 this.listener = listener; 339 } 340 341 344 public void onConnect() {} 345 346 349 public InetSocketAddress getLocalSocketAddress() { 350 return (InetSocketAddress ) this.socket.getLocalSocketAddress(); 351 } 352 353 356 public InetSocketAddress getRemoteSocketAddress() { 357 return (InetSocketAddress ) this.socket.getRemoteSocketAddress(); 358 } 359 360 363 public void selectWrite() { 364 } 366 367 370 public boolean handle(Object o) { 371 ActiveObject ao = (ActiveObject) o; 372 return ao.call(); 373 } 374 375 public void run() { 377 Thread.currentThread().setName("SSLRead" + 378 getLocalSocketAddress().getPort()); 379 380 int nBytes; 382 ByteBuffer lengthBuf = ByteBuffer.allocate(CNLMessage.CNL_HEADERLEN); 383 ByteBuffer messageBuf = null; 384 CNLMessage message; 385 386 try { 387 while (true) { 388 lengthBuf.clear(); 389 message = new CNLMessage(true); 390 messageBuf = null; 391 int readlen; 392 393 while (lengthBuf.remaining() > 0) { 395 readlen = readChannel.read(lengthBuf); 396 if (readlen == -1) { 398 if(log.isWarnEnabled()) 399 this.log.warn("Channel " + toString() + 400 " EOF. Shutting down."); 401 shutdown(); 402 } 403 } 404 lengthBuf.flip(); 405 message.readHeader(lengthBuf); 406 407 messageBuf = 411 IncomingByteBufferPool.getInstance() 412 .getBuffer(message.getLength()); 413 messageBuf.limit(message.getLength()); 414 while (messageBuf.remaining() > 0) { 415 readlen = readChannel.read(messageBuf); 416 if (readlen == -1) { 418 if(log.isWarnEnabled()) 419 log.warn("Channel " + toString() + 420 " EOF. Shutting down."); 421 shutdown(); 422 } 423 } 424 messageBuf.flip(); 425 message.setBuffer(messageBuf); 426 if(log.isDebugEnabled()){ 427 log.debug("Received message(" + message.getID() + 428 ") from " + toString()+"."); 429 } 430 431 message.setSourceAddress(this.socket.getRemoteSocketAddress()); 433 message.setDestAddress(this.socket.getLocalSocketAddress()); 434 this.listener.messageReady(message); 437 } 438 } catch (IOException e) { 439 if(log.isWarnEnabled()){ 440 this.log.warn(toString() + 441 ": IOException during read: " + 442 e.toString()+"."); 443 } 444 if (messageBuf != null) { 445 IncomingByteBufferPool.getInstance().release(messageBuf); 446 } 447 shutdown(); 448 } 449 } 450 451 public String toString() { 452 StringBuffer buf = new StringBuffer (); 453 try { 454 buf.append(this.socket.getLocalSocketAddress().toString()); 455 buf.append("--"); 456 buf.append(this.socket.getRemoteSocketAddress().toString()); 457 } catch (Throwable t) { 458 buf.append("/unknown/unknown"); 459 } 460 buf.append("@SSL"); 461 462 return buf.toString(); 463 } 464 } 465 | Popular Tags |