1 25 26 package org.objectweb.easybeans.component.smartclient.server; 27 28 import static org.objectweb.easybeans.component.smartclient.api.ProtocolConstants.PROTOCOL_VERSION; 29 30 import java.io.IOException ; 31 import java.io.InputStream ; 32 import java.net.URL ; 33 import java.nio.ByteBuffer ; 34 import java.nio.channels.ClosedChannelException ; 35 import java.nio.channels.SelectionKey ; 36 import java.nio.channels.Selector ; 37 import java.nio.channels.ServerSocketChannel ; 38 import java.nio.channels.SocketChannel ; 39 import java.util.Iterator ; 40 import java.util.Set ; 41 import java.util.logging.Level ; 42 import java.util.logging.Logger ; 43 44 import org.objectweb.easybeans.component.api.EZBComponent; 45 import org.objectweb.easybeans.component.api.EZBComponentException; 46 import org.objectweb.easybeans.component.itf.RegistryComponent; 47 import org.objectweb.easybeans.component.smartclient.api.ProtocolConstants; 48 import org.objectweb.easybeans.component.smartclient.message.AbsMessage; 49 import org.objectweb.easybeans.component.smartclient.message.ChannelAttachment; 50 import org.objectweb.easybeans.component.smartclient.message.ClassAnswer; 51 import org.objectweb.easybeans.component.smartclient.message.ClassNotFound; 52 import org.objectweb.easybeans.component.smartclient.message.ClassRequest; 53 import org.objectweb.easybeans.component.smartclient.message.ProviderURLAnswer; 54 import org.objectweb.easybeans.component.smartclient.message.ResourceAnswer; 55 import org.objectweb.easybeans.component.smartclient.message.ResourceNotFound; 56 import org.objectweb.easybeans.component.smartclient.message.ResourceRequest; 57 58 64 public class SmartClientEndPointComponent implements EZBComponent, Runnable { 65 66 69 private static Logger logger = Logger.getLogger(SmartClientEndPointComponent.class.getName()); 70 71 72 75 private static final int MAX_LENGTH_INCOMING_MSG = 500; 76 77 80 private static final int DEFAULT_PORT_NUMBER = 2503; 81 82 85 private static final int BUF_APPEND = 1000; 86 87 90 private int portNumber = DEFAULT_PORT_NUMBER; 91 92 95 private Selector selector = null; 96 97 100 private SelectionKey serverkey = null; 101 102 105 private ServerSocketChannel server = null; 106 107 110 private boolean waitingSelector = true; 111 112 115 private RegistryComponent registryComponent = null; 116 117 121 public void init() throws EZBComponentException { 122 123 try { 125 selector = Selector.open(); 126 } catch (IOException e) { 127 throw new EZBComponentException("Cannot open a new selector.", e); 128 } 129 try { 131 server = ServerSocketChannel.open(); 132 } catch (IOException e) { 133 throw new EZBComponentException("Cannot open a new server socket channel.", e); 134 } 135 136 try { 138 server.configureBlocking(false); 139 } catch (IOException e) { 140 throw new EZBComponentException("Cannot configure the server socket with non-blocking mode.", e); 141 } 142 } 143 144 148 public void start() throws EZBComponentException { 149 150 try { 152 server.socket().bind(new java.net.InetSocketAddress (portNumber)); 153 } catch (IOException e) { 154 throw new EZBComponentException("Cannot listen on the port number '" + portNumber 155 + "', maybe the port is already used.", e); 156 } 157 158 try { 160 serverkey = server.register(selector, SelectionKey.OP_ACCEPT); 161 } catch (ClosedChannelException e) { 162 throw new EZBComponentException("Cannot register the current selector as an accepting selector waiting clients.", e); 163 } 164 165 waitingSelector = true; 167 168 new Thread (this).start(); 170 171 logger.info("SmartClient Endpoint listening on port '" + portNumber + "'."); 172 173 } 174 175 180 public void stop() throws EZBComponentException { 181 waitingSelector = false; 183 selector.wakeup(); 184 } 185 186 189 public void handleSelectors() { 190 191 while (waitingSelector) { 193 194 int updatedKeys = 0; 196 try { 197 updatedKeys = selector.select(); 198 } catch (IOException e) { 199 logger.log(Level.SEVERE, "Selector has been closed, stopping listener", e); 200 waitingSelector = false; 201 } 202 203 if (updatedKeys == 0) { 205 continue; 206 } 207 208 Set <SelectionKey > selectedKeys = selector.selectedKeys(); 210 211 for (Iterator <SelectionKey > itSelectedKeys = selectedKeys.iterator(); itSelectedKeys.hasNext();) { 212 SelectionKey selectionKey = itSelectedKeys.next(); 213 itSelectedKeys.remove(); 215 if (selectionKey == serverkey) { 217 if (selectionKey.isAcceptable()) { 219 try { 220 handleAccept(); 221 } catch (Exception e) { 222 logger.log(Level.SEVERE, "Unable to accept a new connection.", e); 224 } 225 } 226 } else if (selectionKey.isReadable()) { 227 try { 229 handleRead(selectionKey); 230 } catch (Exception e) { 231 logger.log(Level.SEVERE, "Unable to read data from the client.", e); 233 } 234 } else if (selectionKey.isWritable()) { 235 try { 237 handleWrite(selectionKey); 238 } catch (Exception e) { 239 logger.log(Level.SEVERE, "Unable to write data to the client.", e); 241 } 242 } 243 } 244 } 245 } 246 247 251 private void handleAccept() throws IOException { 252 SocketChannel client = server.accept(); 254 255 client.configureBlocking(false); 257 258 client.register(selector, SelectionKey.OP_READ, new ChannelAttachment()); 260 } 261 262 267 private void handleRead(final SelectionKey selectionKey) throws IOException { 268 SocketChannel client = (SocketChannel ) selectionKey.channel(); 270 271 ChannelAttachment channAttachment = (ChannelAttachment) selectionKey.attachment(); 273 ByteBuffer channBuffer = channAttachment.getByteBuffer(); 274 275 int bytesread = client.read(channBuffer); 277 if (bytesread == -1) { 278 selectionKey.cancel(); 280 client.close(); 281 } 282 283 285 if (channBuffer.position() >= AbsMessage.HEADER_SIZE) { 287 288 byte version = channBuffer.get(0); 291 if (version != PROTOCOL_VERSION) { 292 selectionKey.cancel(); 293 client.close(); 294 throw new IllegalStateException ("Invalid protocol version : waiting '" + PROTOCOL_VERSION + "', got '" + version 295 + "'."); 296 } 297 298 byte opCode = channBuffer.get(1); 300 301 int length = channBuffer.getInt(2); 303 if (length < 0) { 304 selectionKey.cancel(); 305 client.close(); 306 throw new IllegalStateException ("Invalid length for client '" + length + "'."); 307 } 308 309 if (length > MAX_LENGTH_INCOMING_MSG) { 310 selectionKey.cancel(); 311 client.close(); 312 throw new IllegalStateException ("Length too big, max length = '" + MAX_LENGTH_INCOMING_MSG + "', current = '" 313 + length + "'."); 314 } 315 316 if (channBuffer.position() >= AbsMessage.HEADER_SIZE + length) { 318 channBuffer.limit(AbsMessage.HEADER_SIZE + length); 321 322 ByteBuffer dataBuffer = channBuffer.duplicate(); 324 325 dataBuffer.position(AbsMessage.HEADER_SIZE); 327 328 try { 330 switch (opCode) { 331 case ProtocolConstants.CLASS_REQUEST: 332 handleReadClassRequest(selectionKey, dataBuffer); 333 break; 334 case ProtocolConstants.RESOURCE_REQUEST: 335 handleReadResourceRequest(selectionKey, dataBuffer); 336 break; 337 case ProtocolConstants.PROVIDER_URL_REQUEST: 338 handleReadProviderURLRequest(selectionKey, dataBuffer); 339 break; 340 default: 341 } 343 } catch (Exception e) { 344 selectionKey.cancel(); 346 client.close(); 347 throw new IllegalStateException ("Cannot handle request with opCode '" + opCode + "'.", e); 348 } 349 } 350 } 351 352 } 353 354 360 private void handleReadClassRequest(final SelectionKey selectionKey, final ByteBuffer dataBuffer) throws IOException { 361 ClassRequest classRequest = new ClassRequest(dataBuffer); 363 String className = classRequest.getClassName(); 364 365 selectionKey.interestOps(SelectionKey.OP_WRITE); 367 String encodedClassName = className.replaceAll("\\.", "/").concat(".class"); 368 369 InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(encodedClassName); 371 372 if (inputStream == null) { 373 ClassNotFound classNotFound = new ClassNotFound(className); 374 selectionKey.attach(classNotFound.getMessage()); 375 logger.log(Level.FINE, "Class '" + className + "' not found"); 376 return; 377 } 378 byte[] bytecode = null; 379 try { 380 bytecode = readClass(inputStream); 382 383 } finally { 384 inputStream.close(); 385 } 386 387 ClassAnswer classAnswer = new ClassAnswer(className, bytecode); 389 390 selectionKey.attach(classAnswer.getMessage()); 392 393 } 394 395 401 private void handleReadResourceRequest(final SelectionKey selectionKey, final ByteBuffer dataBuffer) throws IOException { 402 403 ResourceRequest resourceRequest = new ResourceRequest(dataBuffer); 405 String resourceName = resourceRequest.getResourceName(); 406 407 selectionKey.interestOps(SelectionKey.OP_WRITE); 409 410 URL url = Thread.currentThread().getContextClassLoader().getResource(resourceName); 412 if (url == null) { 413 ResourceNotFound resourceNotFound = new ResourceNotFound(resourceName); 414 selectionKey.attach(resourceNotFound.getMessage()); 415 logger.log(Level.FINE, "Resource '" + resourceName + "' not found"); 416 return; 417 } 418 InputStream inputStream = url.openStream(); 419 420 byte[] bytes = null; 421 try { 422 bytes = readClass(inputStream); 424 425 } finally { 426 inputStream.close(); 427 } 428 429 ResourceAnswer resourceAnswer = new ResourceAnswer(resourceName, bytes); 431 432 selectionKey.attach(resourceAnswer.getMessage()); 434 435 } 436 437 443 private void handleReadProviderURLRequest(final SelectionKey selectionKey, final ByteBuffer dataBuffer) throws IOException { 444 445 selectionKey.interestOps(SelectionKey.OP_WRITE); 447 448 String providerURL = registryComponent.getProviderURL(); 449 450 logger.log(Level.INFO, "Provider URL asked by client : '" + providerURL + "'."); 451 452 ProviderURLAnswer providerURLAnswer = new ProviderURLAnswer(providerURL); 454 455 selectionKey.attach(providerURLAnswer.getMessage()); 457 458 } 459 460 465 private void handleWrite(final SelectionKey selectionKey) throws IOException { 466 SocketChannel channel = (SocketChannel ) selectionKey.channel(); 467 468 ByteBuffer buffer = (ByteBuffer ) selectionKey.attachment(); 470 if (buffer.hasRemaining()) { 471 channel.write(buffer); 472 } else { 473 channel.close(); 475 } 476 } 477 478 484 private static byte[] readClass(final InputStream is) throws IOException { 485 if (is == null) { 486 throw new IOException ("Given input stream is null"); 487 } 488 byte[] b = new byte[is.available()]; 489 int len = 0; 490 while (true) { 491 int n = is.read(b, len, b.length - len); 492 if (n == -1) { 493 if (len < b.length) { 494 byte[] c = new byte[len]; 495 System.arraycopy(b, 0, c, 0, len); 496 b = c; 497 } 498 return b; 499 } 500 len += n; 501 if (len == b.length) { 502 byte[] c = new byte[b.length + BUF_APPEND]; 503 System.arraycopy(b, 0, c, 0, len); 504 b = c; 505 } 506 } 507 } 508 509 512 public void run() { 513 handleSelectors(); 514 } 515 516 520 public void setPortNumber(final int portNumber) { 521 this.portNumber = portNumber; 522 } 523 524 528 public void setRegistryComponent(final RegistryComponent registryComponent) { 529 this.registryComponent = registryComponent; 530 } 531 532 } 533 | Popular Tags |