1 7 package org.jboss.mq.il.oil2; 8 9 import java.io.BufferedInputStream ; 10 import java.io.BufferedOutputStream ; 11 import java.io.IOException ; 12 import java.io.ObjectInputStream ; 13 import java.io.ObjectOutputStream ; 14 import java.net.InetAddress ; 15 import java.net.Socket ; 16 17 import javax.jms.Destination ; 18 import javax.jms.JMSException ; 19 import javax.jms.Queue ; 20 import javax.jms.TemporaryQueue ; 21 import javax.jms.TemporaryTopic ; 22 import javax.jms.Topic ; 23 import javax.net.SocketFactory; 24 25 import org.jboss.logging.Logger; 26 import org.jboss.mq.AcknowledgementRequest; 27 import org.jboss.mq.Connection; 28 import org.jboss.mq.ConnectionToken; 29 import org.jboss.mq.DurableSubscriptionID; 30 import org.jboss.mq.SpyDestination; 31 import org.jboss.mq.SpyMessage; 32 import org.jboss.mq.TransactionRequest; 33 import org.jboss.mq.il.ServerIL; 34 35 42 public final class OIL2ServerIL 43 implements java.io.Serializable , java.lang.Cloneable , org.jboss.mq.il.ServerIL, OIL2Constants 44 { 45 static final long serialVersionUID = 1841984837999477932L; 46 47 private final static Logger log = Logger.getLogger(OIL2ServerIL.class); 48 51 private final static String LOCAL_ADDR = "org.jboss.mq.il.oil2.localAddr"; 52 55 private final static String LOCAL_PORT = "org.jboss.mq.il.oil2.localPort"; 56 57 59 private String addr; 60 62 private int port; 63 66 private String socketFactoryName; 67 68 71 private boolean enableTcpNoDelay = false; 72 74 private transient InetAddress localAddr; 75 77 private transient int localPort; 78 81 private transient ObjectInputStream in; 82 83 86 private transient ObjectOutputStream out; 87 88 91 private transient Socket socket; 92 93 OIL2SocketHandler socketHandler; 94 95 class RequestListner implements OIL2RequestListner 96 { 97 public void handleConnectionException(Exception e) 98 { 99 } 100 101 public void handleRequest(OIL2Request request) 102 { 103 } 104 105 } 106 107 115 public OIL2ServerIL(String addr, int port, 116 String socketFactoryName, boolean enableTcpNoDelay) 117 { 118 this.addr = addr; 119 this.port = port; 120 this.socketFactoryName = socketFactoryName; 121 this.enableTcpNoDelay = enableTcpNoDelay; 122 } 123 124 synchronized public void connect() throws IOException 125 { 126 if (socket == null) 127 { 128 boolean tracing = log.isTraceEnabled(); 129 if( tracing ) 130 log.trace("Connecting to : "+addr+":"+port); 131 132 135 SocketFactory socketFactory = null; 136 if( socketFactoryName != null ) 137 { 138 try 139 { 140 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 141 Class factoryClass = loader.loadClass(socketFactoryName); 142 socketFactory = (SocketFactory) factoryClass.newInstance(); 143 } 144 catch(Exception e) 145 { 146 log.debug("Failed to load socket factory: "+socketFactoryName, e); 147 } 148 } 149 if( socketFactory == null ) 151 { 152 socketFactory = SocketFactory.getDefault(); 153 } 154 155 String tmp = System.getProperty(LOCAL_ADDR); 157 if( tmp != null ) 158 this.localAddr = InetAddress.getByName(tmp); 159 tmp = System.getProperty(LOCAL_PORT); 160 if( tmp != null ) 161 this.localPort = Integer.parseInt(tmp); 162 if( tracing ) 163 { 164 log.trace("Connecting with addr="+addr+", port="+port 165 + ", localAddr="+localAddr+", localPort="+localPort 166 + ", socketFactory="+socketFactory); 167 } 168 169 if( localAddr != null ) 170 socket = socketFactory.createSocket(addr, port, localAddr, localPort); 171 else 172 socket = socketFactory.createSocket(addr, port); 173 174 if( tracing ) 175 log.trace("Connection established."); 176 177 socket.setTcpNoDelay(enableTcpNoDelay); 178 out = new ObjectOutputStream (new BufferedOutputStream (socket.getOutputStream())); 179 out.flush(); 180 in = new ObjectInputStream (new BufferedInputStream (socket.getInputStream())); 181 182 if( tracing ) 183 log.trace("Streams initialized."); 184 185 socketHandler = new OIL2SocketHandler(in, out, Connection.getThreadGroup()); 186 socketHandler.setRequestListner(new RequestListner()); 187 socketHandler.start(); 188 } 189 } 190 191 197 public void setConnectionToken(ConnectionToken dest) throws Exception 198 { 199 connect(); 200 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_SET_SPY_DISTRIBUTED_CONNECTION, new Object [] { dest }); 201 OIL2Response response = socketHandler.synchRequest(request); 202 response.evalThrowsJMSException(); 203 } 204 205 213 public void setEnabled(ConnectionToken dc, boolean enabled) throws JMSException , Exception 214 { 215 connect(); 216 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_SET_ENABLED, new Object [] { new Boolean (enabled)}); 217 OIL2Response response = socketHandler.synchRequest(request); 218 response.evalThrowsJMSException(); 219 } 220 221 227 public String getID() throws Exception 228 { 229 connect(); 230 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_GET_ID, null); 231 OIL2Response response = socketHandler.synchRequest(request); 232 return (String ) response.evalThrowsJMSException(); 233 } 234 235 243 public TemporaryQueue getTemporaryQueue(ConnectionToken dc) throws JMSException , Exception 244 { 245 connect(); 246 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_GET_TEMPORARY_QUEUE, null); 247 OIL2Response response = socketHandler.synchRequest(request); 248 return (TemporaryQueue ) response.evalThrowsJMSException(); 249 } 250 251 259 public TemporaryTopic getTemporaryTopic(ConnectionToken dc) throws JMSException , Exception 260 { 261 connect(); 262 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_GET_TEMPORARY_TOPIC, null); 263 OIL2Response response = socketHandler.synchRequest(request); 264 return (TemporaryTopic ) response.evalThrowsJMSException(); 265 } 266 267 275 public void acknowledge(ConnectionToken dc, AcknowledgementRequest item) throws JMSException , Exception 276 { 277 connect(); 278 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_ACKNOWLEDGE, new Object [] { item }); 279 OIL2Response response = socketHandler.synchRequest(request); 280 response.evalThrowsJMSException(); 281 282 } 283 284 291 public void addMessage(ConnectionToken dc, SpyMessage val) throws Exception 292 { 293 294 connect(); 295 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_ADD_MESSAGE, new Object [] { val }); 296 OIL2Response response = socketHandler.synchRequest(request); 297 response.evalThrowsJMSException(); 298 } 299 300 310 public SpyMessage[] browse(ConnectionToken dc, Destination dest, String selector) 311 throws JMSException , Exception 312 { 313 connect(); 314 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_BROWSE, new Object [] { dest, selector }); 315 OIL2Response response = socketHandler.synchRequest(request); 316 return (SpyMessage[]) response.evalThrowsJMSException(); 317 } 318 319 326 public void checkID(String ID) throws JMSException , Exception 327 { 328 connect(); 329 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_CHECK_ID, new Object [] { ID }); 330 OIL2Response response = socketHandler.synchRequest(request); 331 response.evalThrowsJMSException(); 332 } 333 334 343 public String checkUser(String userName, String password) throws JMSException , Exception 344 { 345 connect(); 346 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_CHECK_USER, new Object [] { userName, password }); 347 OIL2Response response = socketHandler.synchRequest(request); 348 return (String ) response.evalThrowsJMSException(); 349 } 350 351 360 public String authenticate(String userName, String password) throws JMSException , Exception 361 { 362 connect(); 363 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_AUTHENTICATE, new Object [] { userName, password }); 364 OIL2Response response = socketHandler.synchRequest(request); 365 return (String ) response.evalThrowsJMSException(); 366 367 } 368 374 public Object clone() throws CloneNotSupportedException 375 { 376 return super.clone(); 377 } 378 379 386 public ServerIL cloneServerIL() throws Exception 387 { 388 return (ServerIL) clone(); 389 } 390 391 398 public void connectionClosing(ConnectionToken dc) throws JMSException , Exception 399 { 400 try 401 { 402 connect(); 403 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_CONNECTION_CLOSING, null); 404 OIL2Response response = socketHandler.synchRequest(request); 405 response.evalThrowsJMSException(); 406 } 407 finally 408 { 409 close(); 410 } 411 } 412 413 422 public Queue createQueue(ConnectionToken dc, String dest) throws JMSException , Exception 423 { 424 connect(); 425 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_CREATE_QUEUE, new Object [] { dest }); 426 OIL2Response response = socketHandler.synchRequest(request); 427 return (Queue ) response.evalThrowsJMSException(); 428 429 } 430 431 440 public Topic createTopic(ConnectionToken dc, String dest) throws JMSException , Exception 441 { 442 connect(); 443 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_CREATE_TOPIC, new Object [] { dest }); 444 OIL2Response response = socketHandler.synchRequest(request); 445 return (Topic ) response.evalThrowsJMSException(); 446 447 } 448 449 457 public void deleteTemporaryDestination(ConnectionToken dc, SpyDestination dest) 458 throws JMSException , Exception 459 { 460 connect(); 461 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_DELETE_TEMPORARY_DESTINATION, new Object [] { dest }); 462 OIL2Response response = socketHandler.synchRequest(request); 463 response.evalThrowsJMSException(); 464 465 } 466 467 474 public void destroySubscription(ConnectionToken dc, DurableSubscriptionID id) 475 throws JMSException , Exception 476 { 477 connect(); 478 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_DESTROY_SUBSCRIPTION, new Object [] { id }); 479 OIL2Response response = socketHandler.synchRequest(request); 480 response.evalThrowsJMSException(); 481 482 } 483 484 491 public void ping(ConnectionToken dc, long clientTime) throws Exception 492 { 493 connect(); 494 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_PING, new Object [] { new Long (clientTime)}); 495 OIL2Response response = socketHandler.synchRequest(request); 496 response.evalThrowsJMSException(); 497 } 498 499 508 public SpyMessage receive(ConnectionToken dc, int subscriberId, long wait) throws Exception , Exception 509 { 510 connect(); 511 OIL2Request request = 512 new OIL2Request(OIL2Constants.SERVER_RECEIVE, new Object [] { new Integer (subscriberId), new Long (wait)}); 513 OIL2Response response = socketHandler.synchRequest(request); 514 return (SpyMessage) response.evalThrowsJMSException(); 515 516 } 517 518 526 public void subscribe(ConnectionToken dc, org.jboss.mq.Subscription s) throws JMSException , Exception 527 { 528 connect(); 529 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_SUBSCRIBE, new Object [] { s }); 530 OIL2Response response = socketHandler.synchRequest(request); 531 response.evalThrowsJMSException(); 532 533 } 534 535 543 public void transact(org.jboss.mq.ConnectionToken dc, TransactionRequest t) 544 throws JMSException , Exception 545 { 546 connect(); 547 OIL2Request request = new OIL2Request(OIL2Constants.SERVER_TRANSACT, new Object [] { t }); 548 OIL2Response response = socketHandler.synchRequest(request); 549 response.evalThrowsJMSException(); 550 } 551 552 560 public void unsubscribe(ConnectionToken dc, int subscriptionId) throws JMSException , Exception 561 { 562 connect(); 563 OIL2Request request = 564 new OIL2Request(OIL2Constants.SERVER_UNSUBSCRIBE, new Object [] { new Integer (subscriptionId)}); 565 OIL2Response response = socketHandler.synchRequest(request); 566 response.evalThrowsJMSException(); 567 568 } 569 570 575 synchronized public void close() 576 { 577 try 578 { 579 if (socket != null) 580 { 581 socketHandler.stop(); 582 in.close(); 583 out.close(); 584 socket.close(); 585 socket = null; 586 } 587 } 588 catch (IOException e) 589 { 590 log.debug("Exception occured while closing opened resources: ", e); 591 } 592 } 593 } 594 | Popular Tags |