1 22 package org.jboss.mq.il.uil2; 23 24 import java.io.Serializable ; 25 import java.io.IOException ; 26 import java.net.InetAddress ; 27 import java.net.ConnectException ; 28 import java.net.Socket ; 29 import javax.jms.Destination ; 30 import javax.jms.JMSException ; 31 import javax.jms.Queue ; 32 import javax.jms.TemporaryQueue ; 33 import javax.jms.TemporaryTopic ; 34 import javax.jms.Topic ; 35 import javax.net.SocketFactory; 36 import javax.transaction.xa.Xid ; 37 38 import org.jboss.logging.Logger; 39 import org.jboss.mq.AcknowledgementRequest; 40 import org.jboss.mq.Connection; 41 import org.jboss.mq.ConnectionToken; 42 import org.jboss.mq.DurableSubscriptionID; 43 import org.jboss.mq.Recoverable; 44 import org.jboss.mq.SpyDestination; 45 import org.jboss.mq.SpyMessage; 46 import org.jboss.mq.TransactionRequest; 47 import org.jboss.mq.il.ServerIL; 48 import org.jboss.mq.il.uil2.msgs.MsgTypes; 49 import org.jboss.mq.il.uil2.msgs.ConnectionTokenMsg; 50 import org.jboss.mq.il.uil2.msgs.EnableConnectionMsg; 51 import org.jboss.mq.il.uil2.msgs.GetIDMsg; 52 import org.jboss.mq.il.uil2.msgs.RecoverMsg; 53 import org.jboss.mq.il.uil2.msgs.TemporaryDestMsg; 54 import org.jboss.mq.il.uil2.msgs.AcknowledgementRequestMsg; 55 import org.jboss.mq.il.uil2.msgs.AddMsg; 56 import org.jboss.mq.il.uil2.msgs.BrowseMsg; 57 import org.jboss.mq.il.uil2.msgs.CheckIDMsg; 58 import org.jboss.mq.il.uil2.msgs.CheckUserMsg; 59 import org.jboss.mq.il.uil2.msgs.CloseMsg; 60 import org.jboss.mq.il.uil2.msgs.CreateDestMsg; 61 import org.jboss.mq.il.uil2.msgs.DeleteTemporaryDestMsg; 62 import org.jboss.mq.il.uil2.msgs.DeleteSubscriptionMsg; 63 import org.jboss.mq.il.uil2.msgs.PingMsg; 64 import org.jboss.mq.il.uil2.msgs.ReceiveMsg; 65 import org.jboss.mq.il.uil2.msgs.SubscribeMsg; 66 import org.jboss.mq.il.uil2.msgs.TransactMsg; 67 import org.jboss.mq.il.uil2.msgs.UnsubscribeMsg; 68 69 76 public class UILServerIL 77 implements Cloneable , MsgTypes, Serializable , ServerIL, Recoverable 78 { 79 80 private static final long serialVersionUID = 853594001646066224L; 81 private static Logger log = Logger.getLogger(UILServerIL.class); 82 83 86 private final static String USE_SERVER_HOST = "org.jboss.mq.il.uil2.useServerHost"; 87 88 91 private final static String LOCAL_ADDR = "org.jboss.mq.il.uil2.localAddr"; 92 95 private final static String LOCAL_PORT = "org.jboss.mq.il.uil2.localPort"; 96 100 private final static String SERVER_ADDR = "org.jboss.mq.il.uil2.serverAddr"; 101 106 private final static String SERVER_PORT = "org.jboss.mq.il.uil2.serverPort"; 107 112 private final static String RETRY_COUNT = "org.jboss.mq.il.uil2.retryCount"; 113 116 private final static String RETRY_DELAY = "org.jboss.mq.il.uil2.retryDelay"; 117 118 120 private InetAddress addr; 121 123 private int port; 124 127 private String socketFactoryName; 128 129 132 private boolean enableTcpNoDelay = false; 133 134 137 private int soTimeout = 0; 138 139 142 private String connectAddress; 143 144 147 private int connectPort = 0; 148 149 152 private int bufferSize; 153 154 157 private int chunkSize; 158 159 161 private transient InetAddress localAddr; 162 164 private transient int localPort; 165 166 169 protected transient Socket socket; 170 173 protected transient SocketManager socketMgr; 174 175 public UILServerIL(InetAddress addr, int port, String socketFactoryName, 176 boolean enableTcpNoDelay, int bufferSize, int chunkSize, int soTimeout, String connectAddress, int connectPort) 177 throws Exception 178 { 179 this.addr = addr; 180 this.port = port; 181 this.socketFactoryName = socketFactoryName; 182 this.enableTcpNoDelay = enableTcpNoDelay; 183 this.bufferSize = bufferSize; 184 this.chunkSize = chunkSize; 185 this.soTimeout = soTimeout; 186 this.connectAddress = connectAddress; 187 this.connectPort = connectPort; 188 } 189 190 public void setConnectionToken(ConnectionToken dest) 191 throws Exception 192 { 193 ConnectionTokenMsg msg = new ConnectionTokenMsg(dest); 194 getSocketMgr().sendMessage(msg); 195 } 196 197 public void setEnabled(ConnectionToken dc, boolean enabled) 198 throws JMSException , Exception 199 { 200 EnableConnectionMsg msg = new EnableConnectionMsg(enabled); 201 getSocketMgr().sendMessage(msg); 202 } 203 204 public String getID() 205 throws Exception 206 { 207 GetIDMsg msg = new GetIDMsg(); 208 getSocketMgr().sendMessage(msg); 209 String id = msg.getID(); 210 return id; 211 } 212 213 public TemporaryQueue getTemporaryQueue(ConnectionToken dc) 214 throws JMSException , Exception 215 { 216 TemporaryDestMsg msg = new TemporaryDestMsg(true); 217 getSocketMgr().sendMessage(msg); 218 TemporaryQueue dest = msg.getQueue(); 219 return dest; 220 } 221 222 public TemporaryTopic getTemporaryTopic(ConnectionToken dc) 223 throws JMSException , Exception 224 { 225 TemporaryDestMsg msg = new TemporaryDestMsg(false); 226 getSocketMgr().sendMessage(msg); 227 TemporaryTopic dest = msg.getTopic(); 228 return dest; 229 } 230 231 public void acknowledge(ConnectionToken dc, AcknowledgementRequest item) 232 throws JMSException , Exception 233 { 234 AcknowledgementRequestMsg msg = new AcknowledgementRequestMsg(item); 235 if (item.isAck()) 236 getSocketMgr().sendMessage(msg); 237 else 238 getSocketMgr().sendOneWay(msg); 239 } 240 241 public void addMessage(ConnectionToken dc, SpyMessage val) 242 throws Exception 243 { 244 AddMsg msg = new AddMsg(val); 245 getSocketMgr().sendMessage(msg); 246 } 247 248 public SpyMessage[] browse(ConnectionToken dc, Destination dest, String selector) 249 throws JMSException , Exception 250 { 251 BrowseMsg msg = new BrowseMsg(dest, selector); 252 getSocketMgr().sendMessage(msg); 253 SpyMessage[] msgs = msg.getMessages(); 254 return msgs; 255 } 256 257 public void checkID(String id) 258 throws JMSException , Exception 259 { 260 CheckIDMsg msg = new CheckIDMsg(id); 261 getSocketMgr().sendMessage(msg); 262 } 263 264 public String checkUser(String username, String password) 265 throws JMSException , Exception 266 { 267 CheckUserMsg msg = new CheckUserMsg(username, password, false); 268 getSocketMgr().sendMessage(msg); 269 String clientID = msg.getID(); 270 return clientID; 271 } 272 273 public String authenticate(String username, String password) 274 throws JMSException , Exception 275 { 276 CheckUserMsg msg = new CheckUserMsg(username, password, true); 277 getSocketMgr().sendMessage(msg); 278 String sessionID = msg.getID(); 279 return sessionID; 280 } 281 282 public Object clone() 283 throws CloneNotSupportedException 284 { 285 return super.clone(); 286 } 287 288 public ServerIL cloneServerIL() 289 throws Exception 290 { 291 return (ServerIL)clone(); 292 } 293 294 public void connectionClosing(ConnectionToken dc) 295 throws JMSException , Exception 296 { 297 CloseMsg msg = new CloseMsg(); 298 try 299 { 300 getSocketMgr().sendMessage(msg); 301 } 302 catch (IOException ignored) 303 { 304 } 305 destroyConnection(); 306 } 307 308 public Queue createQueue(ConnectionToken dc, String destName) 309 throws JMSException , Exception 310 { 311 CreateDestMsg msg = new CreateDestMsg(destName, true); 312 getSocketMgr().sendMessage(msg); 313 Queue dest = msg.getQueue(); 314 return dest; 315 } 316 317 public Topic createTopic(ConnectionToken dc, String destName) 318 throws JMSException , Exception 319 { 320 CreateDestMsg msg = new CreateDestMsg(destName, false); 321 getSocketMgr().sendMessage(msg); 322 Topic dest = msg.getTopic(); 323 return dest; 324 } 325 326 public void deleteTemporaryDestination(ConnectionToken dc, SpyDestination dest) 327 throws JMSException , Exception 328 { 329 DeleteTemporaryDestMsg msg = new DeleteTemporaryDestMsg(dest); 330 getSocketMgr().sendMessage(msg); 331 } 332 333 public void destroySubscription(ConnectionToken dc,DurableSubscriptionID id) 334 throws JMSException , Exception 335 { 336 DeleteSubscriptionMsg msg = new DeleteSubscriptionMsg(id); 337 getSocketMgr().sendMessage(msg); 338 } 339 340 public void ping(ConnectionToken dc, long clientTime) 341 throws Exception 342 { 343 PingMsg msg = new PingMsg(clientTime, true); 344 msg.getMsgID(); 345 getSocketMgr().sendReply(msg); 346 } 347 348 public SpyMessage receive(ConnectionToken dc, int subscriberId, long wait) 349 throws Exception , Exception 350 { 351 ReceiveMsg msg = new ReceiveMsg(subscriberId, wait); 352 getSocketMgr().sendMessage(msg); 353 SpyMessage reply = msg.getMessage(); 354 return reply; 355 } 356 357 public void subscribe(ConnectionToken dc, org.jboss.mq.Subscription s) 358 throws JMSException , Exception 359 { 360 SubscribeMsg msg = new SubscribeMsg(s); 361 getSocketMgr().sendMessage(msg); 362 } 363 364 public void transact(ConnectionToken dc, TransactionRequest t) 365 throws JMSException , Exception 366 { 367 TransactMsg msg = new TransactMsg(t); 368 getSocketMgr().sendMessage(msg); 369 } 370 371 public Xid [] recover(ConnectionToken dc, int flags) throws Exception 372 { 373 RecoverMsg msg = new RecoverMsg(flags); 374 getSocketMgr().sendMessage(msg); 375 Xid [] reply = msg.getXids(); 376 return reply; 377 } 378 379 public void unsubscribe(ConnectionToken dc, int subscriptionID) 380 throws JMSException , Exception 381 { 382 UnsubscribeMsg msg = new UnsubscribeMsg(subscriptionID); 383 getSocketMgr().sendMessage(msg); 384 } 385 386 final SocketManager getSocketMgr() 387 throws Exception 388 { 389 if( socketMgr == null ) 390 createConnection(); 391 return socketMgr; 392 } 393 394 protected void checkConnection() 395 throws Exception 396 { 397 if (socketMgr == null) 398 { 399 createConnection(); 400 } 401 } 402 403 408 protected void createConnection() 409 throws Exception 410 { 411 boolean tracing = log.isTraceEnabled(); 412 413 416 SocketFactory socketFactory = null; 417 if( socketFactoryName != null ) 418 { 419 try 420 { 421 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 422 Class factoryClass = loader.loadClass(socketFactoryName); 423 socketFactory = (SocketFactory) factoryClass.newInstance(); 424 } 425 catch(Exception e) 426 { 427 log.debug("Failed to load socket factory: "+socketFactoryName, e); 428 } 429 } 430 if( socketFactory == null ) 432 { 433 socketFactory = SocketFactory.getDefault(); 434 } 435 436 String tmp = getProperty(LOCAL_ADDR); 438 if( tmp != null ) 439 this.localAddr = InetAddress.getByName(tmp); 440 tmp = getProperty(LOCAL_PORT); 441 if( tmp != null ) 442 this.localPort = Integer.parseInt(tmp); 443 444 InetAddress serverAddr = addr; 446 int serverPort = port; 447 tmp = getProperty(SERVER_ADDR); 448 if (tmp == null) 449 tmp = connectAddress; 450 if( tmp != null ) 451 serverAddr = InetAddress.getByName(tmp); 452 tmp = getProperty(SERVER_PORT); 453 if( tmp != null ) 454 serverPort = Integer.parseInt(tmp); 455 else if (connectPort != 0) 456 serverPort = connectPort; 457 458 String useHostNameProp = getProperty(USE_SERVER_HOST); 459 String serverHost = serverAddr.getHostAddress(); 460 if (Boolean.valueOf(useHostNameProp).booleanValue()) 461 serverHost = serverAddr.getHostName(); 462 463 int retries = 0; 464 int maxRetries = 10; 466 tmp = getProperty(RETRY_COUNT); 467 if( tmp != null ) 468 maxRetries = Integer.parseInt(tmp); 469 long retryDelay = 0; 470 tmp = getProperty(RETRY_DELAY); 471 if( tmp != null ) 472 { 473 retryDelay = Long.parseLong(tmp); 474 if( retryDelay < 0 ) 475 retryDelay = 0; 476 } 477 if( tracing ) 478 log.trace("Begin connect loop, maxRetries="+maxRetries+", delay="+retryDelay); 479 480 while (true) 481 { 482 try 483 { 484 if( tracing ) 485 { 486 log.trace("Connecting with addr="+serverHost+", port="+serverPort 487 + ", localAddr="+localAddr+", localPort="+localPort 488 + ", socketFactory="+socketFactory 489 + ", enableTcpNoDelay="+enableTcpNoDelay 490 + ", bufferSize="+bufferSize 491 + ", chunkSize="+chunkSize 492 ); 493 } 494 if( localAddr != null ) 495 socket = socketFactory.createSocket(serverHost, serverPort, localAddr, localPort); 496 else 497 socket = socketFactory.createSocket(serverHost, serverPort); 498 break; 499 } 500 catch (ConnectException e) 501 { 502 if (++retries > maxRetries) 503 throw e; 504 if( tracing ) 505 log.trace("Failed to connect, retries="+retries, e); 506 } 507 try 508 { 509 Thread.sleep(retryDelay); 510 } 511 catch(InterruptedException e) 512 { 513 break; 514 } 515 } 516 517 socket.setTcpNoDelay(enableTcpNoDelay); 518 if (soTimeout != 0) 519 socket.setSoTimeout(soTimeout); 520 socketMgr = new SocketManager(socket); 521 socketMgr.setBufferSize(bufferSize); 522 socketMgr.setChunkSize(chunkSize); 523 socketMgr.start(Connection.getThreadGroup()); 524 } 525 526 530 protected void destroyConnection() 531 { 532 try 533 { 534 if( socket != null ) 535 { 536 try 537 { 538 socketMgr.stop(); 539 } 540 finally 541 { 542 socket.close(); 543 } 544 } 545 } 546 catch(IOException ignore) 547 { 548 } 549 } 550 551 private String getProperty(String name) 552 { 553 String value = null; 554 try 555 { 556 value = System.getProperty(name); 557 } 558 catch (Throwable ignored) 559 { 560 log.trace("Cannot retrieve system property " + name); 561 } 562 return value; 563 } 564 } 565 | Popular Tags |