1 7 package org.jboss.mq.il.oil; 8 9 import java.io.BufferedInputStream ; 10 import java.io.BufferedOutputStream ; 11 import java.io.EOFException ; 12 import java.io.IOException ; 13 import java.io.ObjectInputStream ; 14 import java.io.ObjectOutputStream ; 15 import java.lang.reflect.Method ; 16 import java.net.InetAddress ; 17 import java.net.ServerSocket ; 18 import java.net.Socket ; 19 import java.net.SocketException ; 20 import java.net.UnknownHostException ; 21 import java.rmi.RemoteException ; 22 import java.util.Properties ; 23 24 import javax.jms.Destination ; 25 import javax.jms.JMSException ; 26 import javax.naming.InitialContext ; 27 import javax.net.ServerSocketFactory; 28 29 import org.jboss.logging.Logger; 30 import org.jboss.mq.AcknowledgementRequest; 31 import org.jboss.mq.ConnectionToken; 32 import org.jboss.mq.DurableSubscriptionID; 33 import org.jboss.mq.SpyDestination; 34 import org.jboss.mq.SpyMessage; 35 import org.jboss.mq.Subscription; 36 import org.jboss.mq.TransactionRequest; 37 import org.jboss.mq.il.Invoker; 38 import org.jboss.mq.il.ServerIL; 39 import org.jboss.security.SecurityDomain; 40 import org.jboss.system.server.ServerConfigUtil; 41 42 50 public final class OILServerILService 51 extends org.jboss.mq.il.ServerILJMXService 52 implements java.lang.Runnable , OILServerILServiceMBean 53 { 54 57 final static private Logger log = Logger.getLogger(OILServerILService.class); 58 59 64 private final static int SO_TIMEOUT = 5000; 65 66 69 private Invoker server; 71 72 75 private boolean enableTcpNoDelay = false; 76 77 80 private int readTimeout = 0; 81 82 84 private String securityDomain; 85 86 88 private String clientSocketFactoryName; 89 91 private ServerSocketFactory serverSocketFactory; 92 96 private ServerSocket serverSocket; 97 98 101 private OILServerIL serverIL; 102 103 108 private volatile boolean running; 109 110 113 private int serverBindPort = 0; 114 115 119 private InetAddress bindAddress = null; 120 121 124 private int threadNumber = 0; 125 126 129 private Properties connectionProperties; 130 131 140 private final class Client implements Runnable 141 { 142 145 private Socket sock; 146 147 151 private ObjectOutputStream out; 152 153 157 private ObjectInputStream in; 158 159 170 Client(Socket s) throws IOException 171 { 172 this.sock = s; 173 this.out = new ObjectOutputStream (new BufferedOutputStream (this.sock.getOutputStream())); 174 this.out.flush(); 175 this.in = new ObjectInputStream (new BufferedInputStream (this.sock.getInputStream())); 176 sock.setTcpNoDelay(enableTcpNoDelay); 177 if (log.isTraceEnabled()) 178 log.trace("Setting TcpNoDelay Option to:" + enableTcpNoDelay); 179 } 180 181 186 public void run() 187 { 188 int code = 0; 189 boolean closed = false; 190 ConnectionToken connectionToken = null; 191 192 while (!closed && running) 193 { 194 try 195 { 196 code = in.readByte(); 200 } 201 catch (EOFException e) 202 { 203 break; 206 } 207 catch (IOException e) 208 { 209 if (closed || !running) 210 { 211 break; 215 } 216 log.warn("Connection failure (1).", e); 217 break; 218 } 219 220 try 225 { 226 Object result = null; 227 228 switch (code) 229 { 230 case OILConstants.SET_SPY_DISTRIBUTED_CONNECTION : 231 connectionToken = (ConnectionToken) in.readObject(); 233 break; 234 235 case OILConstants.ACKNOWLEDGE : 236 AcknowledgementRequest ack = new AcknowledgementRequest(); 237 ack.readExternal(in); 238 server.acknowledge(connectionToken, ack); 239 break; 240 241 case OILConstants.ADD_MESSAGE : 242 server.addMessage(connectionToken, SpyMessage.readMessage(in)); 243 break; 244 245 case OILConstants.BROWSE : 246 result = server.browse(connectionToken, (Destination ) in.readObject(), (String ) in.readObject()); 247 break; 248 249 case OILConstants.CHECK_ID : 250 String ID = (String ) in.readObject(); 251 server.checkID(ID); 252 if (connectionToken != null) 253 connectionToken.setClientID(ID); 254 break; 255 256 case OILConstants.CONNECTION_CLOSING : 257 server.connectionClosing(connectionToken); 258 closed = true; 259 break; 260 261 case OILConstants.CREATE_QUEUE : 262 result = server.createQueue(connectionToken, (String ) in.readObject()); 263 break; 264 265 case OILConstants.CREATE_TOPIC : 266 result = server.createTopic(connectionToken, (String ) in.readObject()); 267 break; 268 269 case OILConstants.DELETE_TEMPORARY_DESTINATION : 270 server.deleteTemporaryDestination(connectionToken, (SpyDestination) in.readObject()); 271 break; 272 273 case OILConstants.GET_ID : 274 result = server.getID(); 275 if (connectionToken != null) 276 connectionToken.setClientID((String ) result); 277 break; 278 279 case OILConstants.GET_TEMPORARY_QUEUE : 280 result = server.getTemporaryQueue(connectionToken); 281 break; 282 283 case OILConstants.GET_TEMPORARY_TOPIC : 284 result = server.getTemporaryTopic(connectionToken); 285 break; 286 287 case OILConstants.RECEIVE : 288 result = server.receive(connectionToken, in.readInt(), in.readLong()); 289 break; 290 291 case OILConstants.SET_ENABLED : 292 server.setEnabled(connectionToken, in.readBoolean()); 293 break; 294 295 case OILConstants.SUBSCRIBE : 296 server.subscribe(connectionToken, (Subscription) in.readObject()); 297 break; 298 299 case OILConstants.TRANSACT : 300 TransactionRequest trans = new TransactionRequest(); 301 trans.readExternal(in); 302 server.transact(connectionToken, trans); 303 break; 304 305 case OILConstants.UNSUBSCRIBE : 306 server.unsubscribe(connectionToken, in.readInt()); 307 break; 308 309 case OILConstants.DESTROY_SUBSCRIPTION : 310 server.destroySubscription(connectionToken, (DurableSubscriptionID) in.readObject()); 311 break; 312 313 case OILConstants.CHECK_USER : 314 result = server.checkUser((String ) in.readObject(), (String ) in.readObject()); 315 break; 316 317 case OILConstants.PING : 318 server.ping(connectionToken, in.readLong()); 319 break; 320 321 case OILConstants.AUTHENTICATE : 322 result = server.authenticate((String ) in.readObject(), (String ) in.readObject()); 323 break; 324 325 default : 326 throw new RemoteException ("Bad method code !"); 327 } 328 329 try 334 { 335 if (result == null) 336 { 337 out.writeByte(OILConstants.SUCCESS); 338 } 339 else 340 { 341 out.writeByte(OILConstants.SUCCESS_OBJECT); 342 out.writeObject(result); 343 out.reset(); 344 } 345 out.flush(); 346 } 347 catch (IOException e) 348 { 349 if (closed) 350 break; 351 352 log.warn("Connection failure (2).", e); 353 break; 354 } 355 356 } 357 catch (Exception e) 358 { 359 if (closed) 363 break; 364 365 log.warn("Client request resulted in a server exception: ", e); 366 367 try 368 { 369 out.writeByte(OILConstants.EXCEPTION); 370 out.writeObject(e); 371 out.reset(); 372 out.flush(); 373 } 374 catch (IOException e2) 375 { 376 if (closed) 377 break; 378 379 log.warn("Connection failure (3).", e); 380 break; 381 } 382 } } 385 try 386 { 387 if (!closed) 388 { 389 try 390 { 391 server.connectionClosing(connectionToken); 392 } 393 catch (JMSException e) 394 { 395 } 397 } 398 in.close(); 399 out.close(); 400 } 401 catch (IOException e) 402 { 403 log.warn("Connection failure during connection close.", e); 404 } 405 finally 406 { 407 try 408 { 409 sock.close(); 410 } 411 catch (IOException e) 412 { 413 log.warn("Connection failure during connection close.", e); 414 } 415 } 416 } } 418 419 426 public java.util.Properties getClientConnectionProperties() 427 { 428 return connectionProperties; 429 } 430 431 436 public String getName() 437 { 438 return "JBossMQ-OILServerIL"; 439 } 440 441 448 public ServerIL getServerIL() 449 { 450 return serverIL; 451 } 452 453 456 public void run() 457 { 458 try 459 { 460 while (running) 461 { 462 Socket socket = null; 463 try 464 { 465 socket = serverSocket.accept(); 466 } 467 catch (java.io.InterruptedIOException e) 468 { 469 continue; 470 } 471 472 if (!running) 477 { 478 if (socket != null) 479 { 480 try 481 { 482 socket.close(); 483 } 484 catch (Exception e) 485 { 486 } 488 } 489 return; 490 } 491 492 try 493 { 494 socket.setSoTimeout(readTimeout); 495 new Thread (new Client(socket), "OIL Worker-" + threadNumber++).start(); 496 } 497 catch (IOException ie) 498 { 499 if (log.isDebugEnabled()) 500 { 501 log.debug("IOException processing client connection", ie); 502 log.debug("Dropping client connection, server will not terminate"); 503 } 504 } 505 } 506 } 507 catch (SocketException e) 508 { 509 if (running) 514 log.warn("SocketException occured (Connection reset by peer?). Cannot initialize the OILServerILService."); 515 } 516 catch (IOException e) 517 { 518 if (running) 519 log.warn("IOException occured. Cannot initialize the OILServerILService.", e); 520 } 521 catch (Throwable t) 522 { 523 log.warn("Unexpected error occured. Cannot initialize the OILServerILService.", t); 524 } 525 try 526 { 527 serverSocket.close(); 528 } 529 catch (Exception e) 530 { 531 log.debug("error closing server socket", e); 532 } 533 return; 534 } 535 536 541 public void startService() throws Exception 542 { 543 super.startService(); 544 545 running = true; 546 this.server = lookupJMSServer(); 547 548 if (serverSocketFactory == null) 550 serverSocketFactory = ServerSocketFactory.getDefault(); 551 552 555 if (securityDomain != null) 556 { 557 try 558 { 559 InitialContext ctx = new InitialContext (); 560 Class ssfClass = serverSocketFactory.getClass(); 561 SecurityDomain domain = (SecurityDomain) ctx.lookup(securityDomain); 562 Class [] parameterTypes = { SecurityDomain.class }; 563 Method m = ssfClass.getMethod("setSecurityDomain", parameterTypes); 564 Object [] args = { domain }; 565 m.invoke(serverSocketFactory, args); 566 } 567 catch (NoSuchMethodException e) 568 { 569 log.error("Socket factory does not support setSecurityDomain(SecurityDomain)"); 570 } 571 catch (Exception e) 572 { 573 log.error("Failed to setSecurityDomain=" + securityDomain + " on socket factory"); 574 } 575 } 576 577 serverSocket = serverSocketFactory.createServerSocket(serverBindPort, 50, bindAddress); 579 serverSocket.setSoTimeout(SO_TIMEOUT); 580 581 InetAddress socketAddress = serverSocket.getInetAddress(); 582 if (log.isInfoEnabled()) 583 log.info("JBossMQ OIL service available at : " + socketAddress + ":" + serverSocket.getLocalPort()); 584 585 new Thread (server.getThreadGroup(), this, "OIL Worker Server").start(); 586 587 591 socketAddress = ServerConfigUtil.fixRemoteAddress(socketAddress); 592 593 serverIL = new OILServerIL(socketAddress, serverSocket.getLocalPort(), 594 clientSocketFactoryName, enableTcpNoDelay); 595 596 connectionProperties = super.getClientConnectionProperties(); 598 connectionProperties.setProperty( 599 OILServerILFactory.CLIENT_IL_SERVICE_KEY, 600 "org.jboss.mq.il.oil.OILClientILService"); 601 connectionProperties.setProperty(OILServerILFactory.OIL_PORT_KEY, "" + serverSocket.getLocalPort()); 602 connectionProperties.setProperty(OILServerILFactory.OIL_ADDRESS_KEY, "" + socketAddress.getHostAddress()); 603 connectionProperties.setProperty(OILServerILFactory.OIL_TCPNODELAY_KEY, enableTcpNoDelay ? "yes" : "no"); 604 605 bindJNDIReferences(); 606 607 } 608 609 612 public void stopService() 613 { 614 try 615 { 616 unbindJNDIReferences(); 617 } 618 catch (Exception e) 619 { 620 log.error("Exception unbinding from JNDI", e); 621 } 622 try 623 { 624 running = false; 625 if (serverSocket != null) 626 serverSocket.close(); 627 } 628 catch (Exception e) 629 { 630 log.debug("Exception stopping server thread", e); 631 } 632 } 633 634 641 public int getServerBindPort() 642 { 643 return serverBindPort; 644 } 645 646 653 public void setServerBindPort(int serverBindPort) 654 { 655 this.serverBindPort = serverBindPort; 656 } 657 658 666 public String getBindAddress() 667 { 668 String addr = "0.0.0.0"; 669 if (bindAddress != null) 670 addr = bindAddress.getHostName(); 671 return addr; 672 } 673 683 public void setBindAddress(String host) throws UnknownHostException 684 { 685 if (host == null || host.length() == 0) 687 bindAddress = null; 688 else 689 bindAddress = InetAddress.getByName(host); 690 } 691 692 698 public boolean getEnableTcpNoDelay() 699 { 700 return enableTcpNoDelay; 701 } 702 703 709 public void setEnableTcpNoDelay(boolean enableTcpNoDelay) 710 { 711 this.enableTcpNoDelay = enableTcpNoDelay; 712 } 713 714 720 public int getReadTimeout() 721 { 722 return readTimeout; 723 } 724 725 731 public void setReadTimeout(int timeout) 732 { 733 this.readTimeout = timeout; 734 } 735 736 740 public String getClientSocketFactory() 741 { 742 return clientSocketFactoryName; 743 } 744 748 public void setClientSocketFactory(String name) 749 { 750 this.clientSocketFactoryName = name; 751 } 752 753 757 public void setServerSocketFactory(String name) throws Exception 758 { 759 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 760 Class ssfClass = loader.loadClass(name); 761 serverSocketFactory = (ServerSocketFactory) ssfClass.newInstance(); 762 } 763 767 public String getServerSocketFactory() 768 { 769 String name = null; 770 if (serverSocketFactory != null) 771 name = serverSocketFactory.getClass().getName(); 772 return name; 773 } 774 775 778 public void setSecurityDomain(String domainName) 779 { 780 this.securityDomain = domainName; 781 } 782 785 public String getSecurityDomain() 786 { 787 return this.securityDomain; 788 } 789 } 790 | Popular Tags |