1 7 package org.jboss.mq.il.oil2; 8 9 import java.io.BufferedInputStream ; 10 import java.io.BufferedOutputStream ; 11 import java.io.IOException ; 12 import java.io.ObjectInputStream ; 13 import java.io.ObjectOutputStream ; 14 import java.lang.reflect.Method ; 15 import java.net.InetAddress ; 16 import java.net.ServerSocket ; 17 import java.net.Socket ; 18 import java.net.SocketException ; 19 import java.net.UnknownHostException ; 20 import java.rmi.RemoteException ; 21 import java.util.Properties ; 22 23 import javax.jms.Destination ; 24 import javax.jms.JMSException ; 25 import javax.naming.InitialContext ; 26 import javax.net.ServerSocketFactory; 27 28 import org.jboss.logging.Logger; 29 import org.jboss.mq.AcknowledgementRequest; 30 import org.jboss.mq.ConnectionToken; 31 import org.jboss.mq.DurableSubscriptionID; 32 import org.jboss.mq.SpyDestination; 33 import org.jboss.mq.SpyMessage; 34 import org.jboss.mq.Subscription; 35 import org.jboss.mq.TransactionRequest; 36 import org.jboss.mq.il.Invoker; 37 import org.jboss.mq.il.ServerIL; 38 import org.jboss.security.SecurityDomain; 39 40 47 public final class OIL2ServerILService 48 extends org.jboss.mq.il.ServerILJMXService 49 implements java.lang.Runnable , OIL2ServerILServiceMBean 50 { 51 54 final static private Logger log = Logger.getLogger(OIL2ServerILService.class); 55 56 61 private final static int SO_TIMEOUT = 5000; 62 63 66 private Invoker server; 68 69 72 private boolean enableTcpNoDelay = false; 73 74 76 private String securityDomain; 77 78 80 private String clientSocketFactoryName; 81 83 private ServerSocketFactory serverSocketFactory; 84 88 private ServerSocket serverSocket; 89 90 93 private OIL2ServerIL serverIL; 94 95 100 private volatile boolean running; 101 102 105 private int serverBindPort = 0; 106 107 111 private InetAddress bindAddress = null; 112 113 116 private Properties connectionProperties; 117 118 public class RequestListner implements OIL2RequestListner 119 { 120 121 Socket socket; 122 ObjectInputStream in; 123 ObjectOutputStream out; 124 OIL2SocketHandler socketHandler; 125 ConnectionToken connectionToken; 126 boolean closing = false; 127 128 RequestListner(Socket socket) throws IOException 129 { 130 socket.setSoTimeout(0); 131 socket.setTcpNoDelay(enableTcpNoDelay); 132 out = new ObjectOutputStream (new BufferedOutputStream (socket.getOutputStream())); 133 out.flush(); 134 in = new ObjectInputStream (new BufferedInputStream (socket.getInputStream())); 135 } 136 137 public void handleRequest(OIL2Request request) 138 { 139 142 if (closing) 143 { 144 log.trace("A connection that is closing received another request. Droping request."); 145 return; 146 } 147 148 Object result = null; 149 Exception resultException = null; 150 151 try 156 { 157 switch (request.operation) 158 { 159 case OIL2Constants.SERVER_SET_SPY_DISTRIBUTED_CONNECTION : 160 connectionToken = (ConnectionToken) request.arguments[0]; 161 ((OIL2ClientIL) connectionToken.clientIL).setRequestListner(this); 164 break; 165 166 case OIL2Constants.SERVER_ACKNOWLEDGE : 167 server.acknowledge(connectionToken, (AcknowledgementRequest) request.arguments[0]); 168 break; 169 170 case OIL2Constants.SERVER_ADD_MESSAGE : 171 server.addMessage(connectionToken, (SpyMessage) request.arguments[0]); 172 break; 173 174 case OIL2Constants.SERVER_BROWSE : 175 result = 176 server.browse(connectionToken, (Destination ) request.arguments[0], (String ) request.arguments[1]); 177 break; 178 179 case OIL2Constants.SERVER_CHECK_ID : 180 server.checkID((String ) request.arguments[0]); 181 if (connectionToken != null) 182 connectionToken.setClientID((String ) request.arguments[0]); 183 break; 184 185 case OIL2Constants.SERVER_CONNECTION_CLOSING : 186 beginClose(); 187 break; 188 189 case OIL2Constants.SERVER_CREATE_QUEUE : 190 result = server.createQueue(connectionToken, (String ) request.arguments[0]); 191 break; 192 193 case OIL2Constants.SERVER_CREATE_TOPIC : 194 result = server.createTopic(connectionToken, (String ) request.arguments[0]); 195 break; 196 197 case OIL2Constants.SERVER_DELETE_TEMPORARY_DESTINATION : 198 server.deleteTemporaryDestination(connectionToken, (SpyDestination) request.arguments[0]); 199 break; 200 201 case OIL2Constants.SERVER_GET_ID : 202 result = server.getID(); 203 if (connectionToken != null) 204 connectionToken.setClientID((String ) result); 205 break; 206 207 case OIL2Constants.SERVER_GET_TEMPORARY_QUEUE : 208 result = server.getTemporaryQueue(connectionToken); 209 break; 210 211 case OIL2Constants.SERVER_GET_TEMPORARY_TOPIC : 212 result = server.getTemporaryTopic(connectionToken); 213 break; 214 215 case OIL2Constants.SERVER_RECEIVE : 216 result = 217 server.receive( 218 connectionToken, 219 ((Integer ) request.arguments[0]).intValue(), 220 ((Long ) request.arguments[1]).longValue()); 221 break; 222 223 case OIL2Constants.SERVER_SET_ENABLED : 224 server.setEnabled(connectionToken, ((Boolean ) request.arguments[0]).booleanValue()); 225 break; 226 227 case OIL2Constants.SERVER_SUBSCRIBE : 228 server.subscribe(connectionToken, (Subscription) request.arguments[0]); 229 break; 230 231 case OIL2Constants.SERVER_TRANSACT : 232 server.transact(connectionToken, (TransactionRequest) request.arguments[0]); 233 break; 234 235 case OIL2Constants.SERVER_UNSUBSCRIBE : 236 server.unsubscribe(connectionToken, ((Integer ) request.arguments[0]).intValue()); 237 break; 238 239 case OIL2Constants.SERVER_DESTROY_SUBSCRIPTION : 240 server.destroySubscription(connectionToken, (DurableSubscriptionID) request.arguments[0]); 241 break; 242 243 case OIL2Constants.SERVER_CHECK_USER : 244 result = server.checkUser((String ) request.arguments[0], (String ) request.arguments[1]); 245 break; 246 247 case OIL2Constants.SERVER_PING : 248 server.ping(connectionToken, ((Long ) request.arguments[0]).longValue()); 249 break; 250 251 case OIL2Constants.SERVER_AUTHENTICATE : 252 result = server.authenticate((String ) request.arguments[0], (String ) request.arguments[1]); 253 break; 254 255 default : 256 throw new RemoteException ("Bad method code !"); 257 } } 259 catch (Exception e) 260 { 261 resultException = e; 262 } 264 try 265 { 266 OIL2Response response = new OIL2Response(request); 267 response.result = result; 268 response.exception = resultException; 269 socketHandler.sendResponse(response); 270 } 271 catch (IOException e) 272 { 273 handleConnectionException(e); 274 } 275 } 276 277 public void handleConnectionException(Exception e) 278 { 279 if (!closing) 280 log.info("Client Disconnected: " + e); 281 beginClose(); 282 } 283 284 void beginClose() 285 { 286 closing = true; 287 try 288 { 289 if (connectionToken != null) 290 server.connectionClosing(connectionToken); 291 } 292 catch (JMSException ignore) 293 { 294 } 295 finally 296 { 297 close(); 298 } 299 } 300 301 void close() 302 { 303 try 304 { 305 if (socket != null) 306 { 307 socketHandler.stop(); 308 in.close(); 309 out.close(); 310 socket.close(); 311 socket = null; 312 } 313 } 314 catch (IOException e) 315 { 316 log.debug("Exception occured while closing opened resources: ", e); 317 } 318 } 319 320 public OIL2SocketHandler getSocketHandler() 321 { 322 return socketHandler; 323 } 324 325 } 326 327 334 public java.util.Properties getClientConnectionProperties() 335 { 336 return connectionProperties; 337 } 338 339 344 public String getName() 345 { 346 return "JBossMQ-OILServerIL"; 347 } 348 349 356 public ServerIL getServerIL() 357 { 358 return serverIL; 359 } 360 361 364 public void run() 365 { 366 try 367 { 368 while (running) 369 { 370 Socket socket = null; 371 try 372 { 373 socket = serverSocket.accept(); 374 if (log.isTraceEnabled()) 375 log.trace("Accepted connection: " + socket); 376 } 377 catch (java.io.InterruptedIOException e) 378 { 379 continue; 381 } 382 383 if (!running) 388 { 389 if (socket != null) 390 { 391 try 392 { 393 socket.close(); 394 } 395 catch (Exception ignore) 396 { 397 } 398 } 399 return; 400 } 401 402 try 403 { 404 405 if (log.isTraceEnabled()) 406 log.trace("Initializing RequestListner for socket: " + socket); 407 RequestListner requestListner = new RequestListner(socket); 408 OIL2SocketHandler socketHandler = 409 new OIL2SocketHandler(requestListner.in, requestListner.out, Thread.currentThread().getThreadGroup()); 410 requestListner.socketHandler = socketHandler; 411 socketHandler.setRequestListner(requestListner); 412 socketHandler.start(); 413 414 } 415 catch (IOException ie) 416 { 417 log.debug("Client connection could not be accepted: ", ie); 418 } 419 } 420 } 421 catch (SocketException e) 422 { 423 if (running) 428 log.warn("SocketException occured (Connection reset by peer?). Cannot initialize the OIL2ServerILService."); 429 } 430 catch (IOException e) 431 { 432 if (running) 433 log.warn("IOException occured. Cannot initialize the OIL2ServerILService."); 434 } 435 catch (Throwable t) 436 { 437 log.warn("Unexpected error occured. Cannot initialize the OIL2ServerILService.", t); 438 } 439 try 440 { 441 serverSocket.close(); 442 } 443 catch (Exception e) 444 { 445 log.debug("error closing server socket", e); 446 } 447 return; 448 } 449 450 455 public void startService() throws Exception 456 { 457 super.startService(); 458 459 running = true; 460 this.server = lookupJMSServer(); 461 462 if (serverSocketFactory == null) 464 serverSocketFactory = ServerSocketFactory.getDefault(); 465 466 469 if (securityDomain != null) 470 { 471 try 472 { 473 InitialContext ctx = new InitialContext (); 474 Class ssfClass = serverSocketFactory.getClass(); 475 SecurityDomain domain = (SecurityDomain) ctx.lookup(securityDomain); 476 Class [] parameterTypes = { SecurityDomain.class }; 477 Method m = ssfClass.getMethod("setSecurityDomain", parameterTypes); 478 Object [] args = { domain }; 479 m.invoke(serverSocketFactory, args); 480 } 481 catch (NoSuchMethodException e) 482 { 483 log.error("Socket factory does not support setSecurityDomain(SecurityDomain)"); 484 } 485 catch (Exception e) 486 { 487 log.error("Failed to setSecurityDomain=" + securityDomain + " on socket factory"); 488 } 489 } 490 491 serverSocket = serverSocketFactory.createServerSocket(serverBindPort, 50, bindAddress); 493 serverSocket.setSoTimeout(SO_TIMEOUT); 494 495 InetAddress socketAddress = serverSocket.getInetAddress(); 496 log.info("JBossMQ OIL2 service available at : " + socketAddress + ":" + serverSocket.getLocalPort()); 497 498 new Thread (server.getThreadGroup(), this, "OIL2 Worker Server").start(); 499 503 if (socketAddress.toString().equals("0.0.0.0/0.0.0.0")) 504 socketAddress = InetAddress.getLocalHost(); 505 506 serverIL = 507 new OIL2ServerIL( 508 socketAddress.getHostAddress(), 509 serverSocket.getLocalPort(), 510 clientSocketFactoryName, 511 enableTcpNoDelay); 512 513 connectionProperties = super.getClientConnectionProperties(); 515 connectionProperties.setProperty( 516 OIL2ServerILFactory.CLIENT_IL_SERVICE_KEY, 517 "org.jboss.mq.il.oil2.OIL2ClientILService"); 518 connectionProperties.setProperty(OIL2ServerILFactory.OIL2_PORT_KEY, "" + serverSocket.getLocalPort()); 519 connectionProperties.setProperty(OIL2ServerILFactory.OIL2_ADDRESS_KEY, "" + socketAddress.getHostAddress()); 520 connectionProperties.setProperty(OIL2ServerILFactory.OIL2_TCPNODELAY_KEY, enableTcpNoDelay ? "yes" : "no"); 521 522 bindJNDIReferences(); 523 524 } 525 526 529 public void stopService() 530 { 531 try 532 { 533 unbindJNDIReferences(); 534 } 535 catch (Exception e) 536 { 537 log.error("Exception unbinding from JNDI", e); 538 } 539 try 540 { 541 running = false; 542 if (serverSocket != null) 543 serverSocket.close(); 544 } 545 catch (Exception e) 546 { 547 log.debug("Exception stopping server thread", e); 548 } 549 } 550 551 557 public int getServerBindPort() 558 { 559 return serverBindPort; 560 } 561 562 568 public void setServerBindPort(int serverBindPort) 569 { 570 this.serverBindPort = serverBindPort; 571 } 572 573 580 public String getBindAddress() 581 { 582 String addr = "0.0.0.0"; 583 if (bindAddress != null) 584 addr = bindAddress.getHostName(); 585 return addr; 586 } 587 596 public void setBindAddress(String host) throws UnknownHostException 597 { 598 if (host == null || host.length() == 0) 600 bindAddress = null; 601 else 602 bindAddress = InetAddress.getByName(host); 603 } 604 605 610 public boolean getEnableTcpNoDelay() 611 { 612 return enableTcpNoDelay; 613 } 614 615 620 public void setEnableTcpNoDelay(boolean enableTcpNoDelay) 621 { 622 this.enableTcpNoDelay = enableTcpNoDelay; 623 } 624 625 629 public String getClientSocketFactory() 630 { 631 return clientSocketFactoryName; 632 } 633 637 public void setClientSocketFactory(String name) 638 { 639 this.clientSocketFactoryName = name; 640 } 641 642 646 public void setServerSocketFactory(String name) throws Exception 647 { 648 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 649 Class ssfClass = loader.loadClass(name); 650 serverSocketFactory = (ServerSocketFactory) ssfClass.newInstance(); 651 } 652 656 public String getServerSocketFactory() 657 { 658 String name = null; 659 if (serverSocketFactory != null) 660 name = serverSocketFactory.getClass().getName(); 661 return name; 662 } 663 664 667 public void setSecurityDomain(String domainName) 668 { 669 this.securityDomain = domainName; 670 } 671 674 public String getSecurityDomain() 675 { 676 return this.securityDomain; 677 } 678 } 679 | Popular Tags |