1 46 package org.mr.core.net; 47 48 import java.io.IOException ; 49 import java.net.InetSocketAddress ; 50 import java.net.SocketAddress ; 51 import java.net.SocketException ; 52 import java.nio.ByteBuffer ; 54 import java.nio.channels.SelectableChannel ; 55 import java.nio.channels.SocketChannel ; 56 import java.util.LinkedList ; 57 58 import org.apache.commons.logging.Log; 59 import org.apache.commons.logging.LogFactory; 60 import org.mr.MantaAgent; 61 import org.mr.core.util.byteable.IncomingByteBufferPool; 62 63 72 73 public class TCPTransportImpl implements TransportImpl { 74 class OutItem { 75 OutItem(CNLMessage message, int id) { 76 this.message = message; this.id = id; 77 } 78 CNLMessage message; 79 int id; 80 } 81 82 protected SocketChannel channel; 83 private ByteBuffer lengthBuf; 84 private ByteBuffer messageBuf; 85 private int bytesRead; 86 private int readState; 87 private int channelState; 88 private CNLMessage message; 89 protected Log log; 90 private boolean logExt; 91 private NetworkListener listener; 92 93 private LinkedList outQueue; 94 private CNLMessage outCNL; 95 private ByteBuffer [] outBuffers; 96 private int outIndex; 97 private NetworkSelector selector; 98 99 100 private static final int READ_STATE_HEADER = 0; 101 private static final int READ_STATE_PAYLOAD = 1; 102 103 private static final int CHANNEL_STATE_DOWN = 0; 104 private static final int CHANNEL_STATE_CONNECTING = 1; 105 private static final int CHANNEL_STATE_CONNECTED = 2; 106 private static final int CHANNEL_STATE_UP = 3; 107 108 public TCPTransportImpl(SocketChannel channel) { 109 this.channel = channel; 110 commonInit(); 111 } 113 public TCPTransportImpl(SocketAddress local, SocketAddress remote) 114 throws IOException 115 { 116 this.channel = SocketChannel.open(); 117 if(NetworkManager.isTcpNoDelay()){ 118 this.channel.socket().setTcpNoDelay(true); 119 } 120 if (local != null) { 121 this.channel.socket().bind(local); 122 } 123 this.channel.configureBlocking(false); 124 try { 125 this.channel.connect(remote); 126 } catch (IOException e) { 127 LogFactory.getLog("TCPTransportImpl").error("Error connecting to " + remote.toString() + ": " + e.toString()); 128 this.channelState = CHANNEL_STATE_DOWN; 129 throw e; 130 } 131 commonInit(); 132 } 133 134 private void commonInit() { 135 this.lengthBuf = ByteBuffer.allocate(CNLMessage.CNL_HEADERLEN); 136 this.log = LogFactory.getLog("TCPTransportImpl"); 137 this.logExt = MantaAgent.getInstance().getSingletonRepository().getConfigManager().getBooleanProperty("log-ext.enable", false); 138 139 this.outQueue = new LinkedList (); 140 this.outCNL = null; 141 this.outBuffers = new ByteBuffer [3]; 142 this.outBuffers[0] = ByteBuffer.allocate(CNLMessage.CNL_HEADERLEN); 143 this.outIndex = 0; 144 this.selector = null; 145 146 if (this.channel.isConnected()) { 147 this.channelState = CHANNEL_STATE_CONNECTED; 148 } else { 149 this.channelState = CHANNEL_STATE_CONNECTING; 150 } 151 if(NetworkManager.isTcpNoDelay()){ 152 try { 153 this.channel.socket().setTcpNoDelay(true); 154 } catch (SocketException e) { 155 if(log.isErrorEnabled()){ 156 log.error("Cannot set TCP_NODELAY option on " + toString()+"."); 157 } 158 } 159 160 } 161 162 reset(); 163 } 164 165 168 public void shutdown() { 169 try { 170 if(log.isInfoEnabled()){ 171 log.info("SHUTTING DOWN IMPL " + toString()+"."); 172 } 173 this.channel.close(); 174 this.channelState = CHANNEL_STATE_DOWN; 175 } catch (IOException e) { 176 if(log.isWarnEnabled()) 177 log.warn("Could not close channel: " + e.toString()+"."); 178 } 179 synchronized (this.outQueue) { 180 while (!this.outQueue.isEmpty()) { 181 OutItem item = (OutItem) this.outQueue.removeFirst(); 182 item.message.unuse(); 183 } 184 if (this.outCNL != null) { 185 this.outCNL.unuse(); 186 } 187 } 188 if (this.listener != null) { 189 this.listener.implShutdown(); 190 } 191 } 192 193 196 public void read() { 197 int nBytes; 198 199 try { 200 if (this.readState == READ_STATE_HEADER) { 201 nBytes = this.channel.read(lengthBuf); 202 if (nBytes == -1) { 203 if(log.isWarnEnabled()) { 204 log.warn("Channel " + toString() + 205 " EOF. Shutting down."); 206 } 207 shutdown(); 208 } 209 this.bytesRead += nBytes; 210 if (this.bytesRead == CNLMessage.CNL_HEADERLEN) { 211 headerComplete(); 212 } 213 } else if (this.readState == READ_STATE_PAYLOAD) { 214 nBytes = this.channel.read(this.messageBuf); 215 if (nBytes == -1) { 216 if(log.isWarnEnabled()) { 217 log.warn("Channel " + toString() + 218 " EOF. Shutting down."); 219 } 220 shutdown(); 221 } 222 if (!this.messageBuf.hasRemaining()) { 223 messageComplete(); 224 } 225 } 226 } catch (IOException e) { 227 if(log.isWarnEnabled()) 228 log.warn("Error reading from channel (remote = " + 229 channel.socket().getRemoteSocketAddress().toString() + 230 "):" + e.getMessage()); 231 shutdown(); 232 } 233 this.listener.activityDetected(); 234 } 235 236 239 public void write(CNLMessage msg, int id, NetworkSelector selector) { 240 if(log.isTraceEnabled() && logExt){ 241 log.trace("Sending message(" + id + ") to " + toString()+"."); 242 } 243 msg.use(); 244 if (this.selector == null) { 245 this.selector = selector; 246 } 247 synchronized (this.outQueue) { 248 OutItem item = new OutItem(msg, id); 249 this.outQueue.addLast(item); 250 if (this.outCNL == null) { 251 prepareOutgoingMessage(); 252 this.selector.addImplForWrite(this); 253 } 254 } 255 } 256 257 private void prepareOutgoingMessage() { 258 OutItem item = (OutItem) this.outQueue.removeFirst(); 259 ByteBuffer [] payloadBuffers = item.message.valueAsBuffers(); 260 261 outgoingMessageHook(item.message); 265 this.outCNL = item.message; 266 this.outBuffers[0].clear(); 267 this.outBuffers[2] = null; 268 this.outCNL.headerToBuffer(this.outBuffers[0], item.id); 269 for (int i = 0; i < payloadBuffers.length; i++) { 270 this.outBuffers[i+1] = payloadBuffers[i]; 271 } 272 this.outIndex = 0; 273 } 274 275 279 protected void outgoingMessageHook(CNLMessage message) {} 280 281 public void selectWrite() { 282 try { 283 while (true) { 284 while (this.outIndex < this.outBuffers.length) { 285 ByteBuffer buf = this.outBuffers[this.outIndex]; 286 if (buf == null) { 287 break; 288 } 289 int nBytes = this.channel.write(buf); 290 this.listener.activityDetected(); 291 if (buf.remaining() > 0) { 292 return; 294 } 295 this.outIndex++; 296 } 297 this.outCNL.setSent(); 298 this.outCNL.unuse(); 299 synchronized (this.outQueue) { 300 if (this.outQueue.isEmpty()) { 301 this.selector.removeImplForWrite(this); 303 this.outCNL = null; 304 return; 305 } else { 306 prepareOutgoingMessage(); 307 } 308 } 309 } 310 } catch (IOException e) { 311 if(log.isErrorEnabled()) 312 log.error("Error writing to " + toString() + ": " + 313 e.toString()+"."); 314 shutdown(); 315 } 316 } 317 318 321 public boolean isInitialized() { 322 return this.channelState == CHANNEL_STATE_UP; 323 } 324 325 328 public void setInitialized() { 329 this.channelState = CHANNEL_STATE_UP; 330 } 331 332 335 public boolean isDown() { 336 return this.channelState == CHANNEL_STATE_DOWN; 337 } 338 339 342 public SelectableChannel getChannel() { 343 return this.channel; 344 } 345 346 349 public TransportType getType() { 350 return TransportType.TCP; 351 } 353 private void headerComplete() { 354 this.lengthBuf.flip(); 355 this.message.readHeader(this.lengthBuf); 356 this.messageBuf = 357 IncomingByteBufferPool.getInstance().getBuffer(this.message.getLength()); 358 this.messageBuf.limit(this.message.getLength()); 359 this.readState = READ_STATE_PAYLOAD; 360 } 361 362 private void messageComplete() { 363 this.messageBuf.flip(); 364 this.message.setBuffer(this.messageBuf); 365 366 if(log.isTraceEnabled() && logExt){ 367 log.trace("Received message(" + this.message.getID() + 368 ") from " + toString()+"."); 369 } 370 371 this.message.setSourceAddress(this.channel.socket().getRemoteSocketAddress()); 372 this.message.setDestAddress(this.channel.socket().getLocalSocketAddress()); 373 this.listener.messageReady(this.message); 374 375 reset(); 376 } 377 378 private void reset() { 379 this.bytesRead = 0; 380 this.readState = READ_STATE_HEADER; 381 this.message = new CNLMessage(true); 382 this.lengthBuf.clear(); 383 } 384 385 388 public boolean isConnected() { 389 return this.channel != null && this.channel.isConnected(); 390 } 391 392 395 public void setListener(NetworkListener listener) { 396 this.listener = listener; 397 } 398 399 402 public String toString() { 403 StringBuffer buf = new StringBuffer (); 404 try { 405 buf.append(this.channel.socket().getLocalSocketAddress(). 406 toString()); 407 buf.append(this.channel.socket().getRemoteSocketAddress(). 408 toString()); 409 } catch (Throwable t) { 410 buf.append("/unknown/unknown"); 411 } 412 buf.append("@TCP"); 413 414 return buf.toString(); 415 } 416 417 public void onConnect() {} 418 419 public InetSocketAddress getLocalSocketAddress() { 420 return (InetSocketAddress ) 421 this.channel.socket().getLocalSocketAddress(); 422 } 423 424 public InetSocketAddress getRemoteSocketAddress() { 425 return (InetSocketAddress ) 426 this.channel.socket().getRemoteSocketAddress(); 427 } 428 } | Popular Tags |