1 7 package org.jboss.mq.il.oil; 8 9 import java.io.BufferedInputStream ; 10 import java.io.BufferedOutputStream ; 11 12 import java.io.ObjectInputStream ; 13 import java.io.ObjectOutputStream ; 14 import java.net.InetAddress ; 15 16 import java.net.Socket ; 17 import javax.jms.Destination ; 18 19 import javax.jms.JMSException ; 20 import javax.jms.Queue ; 21 import javax.jms.TemporaryQueue ; 22 import javax.jms.TemporaryTopic ; 23 import javax.jms.Topic ; 24 import javax.net.SocketFactory; 25 26 import org.jboss.logging.Logger; 27 import org.jboss.mq.AcknowledgementRequest; 28 import org.jboss.mq.ConnectionToken; 29 import org.jboss.mq.DurableSubscriptionID; 30 import org.jboss.mq.SpyDestination; 31 import org.jboss.mq.SpyMessage; 32 import org.jboss.mq.TransactionRequest; 33 import org.jboss.mq.il.ServerIL; 34 35 43 public final class OILServerIL 44 implements java.io.Serializable , 45 java.lang.Cloneable , 46 org.jboss.mq.il.ServerIL 47 { 48 static final long serialVersionUID = 5576846920031604128L; 49 private static Logger log = Logger.getLogger(OILServerIL.class); 50 51 54 private final static String LOCAL_ADDR = "org.jboss.mq.il.oil.localAddr"; 55 58 private final static String LOCAL_PORT = "org.jboss.mq.il.oil.localPort"; 59 60 62 private InetAddress addr; 63 65 private int port; 66 69 private String socketFactoryName; 70 71 74 private boolean enableTcpNoDelay = false; 75 77 private transient InetAddress localAddr; 78 80 private transient int localPort; 81 82 85 private transient ObjectInputStream in; 86 87 90 private transient ObjectOutputStream out; 91 92 95 private transient Socket socket; 96 97 103 public OILServerIL(InetAddress addr, int port, 104 String socketFactoryName, boolean enableTcpNoDelay) 105 { 106 this.addr = addr; 107 this.port = port; 108 this.socketFactoryName = socketFactoryName; 109 this.enableTcpNoDelay = enableTcpNoDelay; 110 } 111 112 118 public synchronized void setConnectionToken(ConnectionToken dest) 119 throws Exception 120 { 121 checkConnection(); 122 out.writeByte(OILConstants.SET_SPY_DISTRIBUTED_CONNECTION); 123 out.writeObject(dest); 124 waitAnswer(); 125 } 126 127 135 public synchronized void setEnabled(ConnectionToken dc, boolean enabled) 136 throws JMSException , Exception 137 { 138 checkConnection(); 139 out.writeByte(OILConstants.SET_ENABLED); 140 out.writeBoolean(enabled); 141 waitAnswer(); 142 } 143 144 150 public synchronized String getID() 151 throws Exception 152 { 153 checkConnection(); 154 out.writeByte(OILConstants.GET_ID); 155 return (String )waitAnswer(); 156 } 157 158 166 public synchronized TemporaryQueue getTemporaryQueue(ConnectionToken dc) 167 throws JMSException , Exception 168 { 169 checkConnection(); 170 out.writeByte(OILConstants.GET_TEMPORARY_QUEUE); 171 return (TemporaryQueue )waitAnswer(); 172 } 173 174 182 public synchronized TemporaryTopic getTemporaryTopic(ConnectionToken dc) 183 throws JMSException , Exception 184 { 185 checkConnection(); 186 out.writeByte(OILConstants.GET_TEMPORARY_TOPIC); 187 return (TemporaryTopic )waitAnswer(); 188 } 189 190 198 public synchronized void acknowledge(ConnectionToken dc, AcknowledgementRequest item) 199 throws JMSException , Exception 200 { 201 checkConnection(); 202 out.writeByte(OILConstants.ACKNOWLEDGE); 203 item.writeExternal(out); 204 waitAnswer(); 205 } 206 207 214 public synchronized void addMessage(ConnectionToken dc, SpyMessage val) 215 throws Exception 216 { 217 checkConnection(); 218 out.writeByte(OILConstants.ADD_MESSAGE); 219 SpyMessage.writeMessage(val, out); 220 waitAnswer(); 221 } 222 223 233 public synchronized SpyMessage[] browse(ConnectionToken dc, Destination dest, String selector) 234 throws JMSException , Exception 235 { 236 checkConnection(); 237 out.writeByte(OILConstants.BROWSE); 238 out.writeObject(dest); 239 out.writeObject(selector); 240 return (SpyMessage[])waitAnswer(); 241 } 242 243 250 public synchronized void checkID(String ID) 251 throws JMSException , Exception 252 { 253 checkConnection(); 254 out.writeByte(OILConstants.CHECK_ID); 255 out.writeObject(ID); 256 waitAnswer(); 257 } 258 259 268 public synchronized String checkUser(String userName, String password) 269 throws JMSException , Exception 270 { 271 checkConnection(); 272 out.writeByte(OILConstants.CHECK_USER); 273 out.writeObject(userName); 274 out.writeObject(password); 275 return (String )waitAnswer(); 276 } 277 278 287 public synchronized String authenticate(String userName, String password) 288 throws JMSException , Exception 289 { 290 checkConnection(); 291 out.writeByte(OILConstants.AUTHENTICATE); 292 out.writeObject(userName); 293 out.writeObject(password); 294 return (String )waitAnswer(); 295 } 296 302 public Object clone() 303 throws CloneNotSupportedException 304 { 305 return super.clone(); 306 } 307 308 315 public ServerIL cloneServerIL() 316 throws Exception 317 { 318 return (ServerIL)clone(); 319 } 320 321 328 public synchronized void connectionClosing(ConnectionToken dc) 329 throws JMSException , Exception 330 { 331 try 332 { 333 checkConnection(); 334 out.writeByte(OILConstants.CONNECTION_CLOSING); 335 waitAnswer(); 336 } 337 finally 338 { 339 destroyConnection(); 340 } 341 } 342 343 352 public synchronized Queue createQueue(ConnectionToken dc, String dest) 353 throws JMSException , Exception 354 { 355 checkConnection(); 356 out.writeByte(OILConstants.CREATE_QUEUE); 357 out.writeObject(dest); 358 return (Queue )waitAnswer(); 359 } 360 361 370 public synchronized Topic createTopic(ConnectionToken dc, String dest) 371 throws JMSException , Exception 372 { 373 checkConnection(); 374 out.writeByte(OILConstants.CREATE_TOPIC); 375 out.writeObject(dest); 376 return (Topic )waitAnswer(); 377 } 378 379 387 public synchronized void deleteTemporaryDestination(ConnectionToken dc, SpyDestination dest) 388 throws JMSException , Exception 389 { 390 checkConnection(); 391 out.writeByte(OILConstants.DELETE_TEMPORARY_DESTINATION); 392 out.writeObject(dest); 393 waitAnswer(); 394 } 395 396 403 public synchronized void destroySubscription(ConnectionToken dc,DurableSubscriptionID id) 404 throws JMSException , Exception 405 { 406 checkConnection(); 407 out.writeByte(OILConstants.DESTROY_SUBSCRIPTION); 408 out.writeObject(id); 409 waitAnswer(); 410 } 411 412 419 public synchronized void ping(ConnectionToken dc, long clientTime) 420 throws Exception 421 { 422 checkConnection(); 423 out.writeByte(OILConstants.PING); 424 out.writeLong(clientTime); 425 waitAnswer(); 426 } 427 428 437 public synchronized SpyMessage receive(ConnectionToken dc, int subscriberId, long wait) 438 throws Exception , Exception 439 { 440 checkConnection(); 441 out.writeByte(OILConstants.RECEIVE); 442 out.writeInt(subscriberId); 443 out.writeLong(wait); 444 return (SpyMessage)waitAnswer(); 445 } 446 447 455 public synchronized void subscribe(ConnectionToken dc, org.jboss.mq.Subscription s) 456 throws JMSException , Exception 457 { 458 checkConnection(); 459 out.writeByte(OILConstants.SUBSCRIBE); 460 out.writeObject(s); 461 waitAnswer(); 462 } 463 464 472 public synchronized void transact(org.jboss.mq.ConnectionToken dc, TransactionRequest t) 473 throws JMSException , Exception 474 { 475 checkConnection(); 476 out.writeByte(OILConstants.TRANSACT); 477 t.writeExternal(out); 478 waitAnswer(); 479 } 480 481 489 public synchronized void unsubscribe(ConnectionToken dc, int subscriptionId) 490 throws JMSException , Exception 491 { 492 checkConnection(); 493 out.writeByte(OILConstants.UNSUBSCRIBE); 494 out.writeInt(subscriptionId); 495 waitAnswer(); 496 } 497 498 503 private void checkConnection() 504 throws Exception 505 { 506 if (socket == null) 507 { 508 createConnection(); 509 } 510 } 511 512 517 private void createConnection() 518 throws Exception 519 { 520 boolean tracing = log.isTraceEnabled(); 521 if( tracing ) 522 log.trace("Connecting to : "+addr+":"+port); 523 524 527 SocketFactory socketFactory = null; 528 if( socketFactoryName != null ) 529 { 530 try 531 { 532 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 533 Class factoryClass = loader.loadClass(socketFactoryName); 534 socketFactory = (SocketFactory) factoryClass.newInstance(); 535 } 536 catch(Exception e) 537 { 538 log.debug("Failed to load socket factory: "+socketFactoryName, e); 539 } 540 } 541 if( socketFactory == null ) 543 { 544 socketFactory = SocketFactory.getDefault(); 545 } 546 547 String tmp = System.getProperty(LOCAL_ADDR); 549 if( tmp != null ) 550 this.localAddr = InetAddress.getByName(tmp); 551 tmp = System.getProperty(LOCAL_PORT); 552 if( tmp != null ) 553 this.localPort = Integer.parseInt(tmp); 554 if( tracing ) 555 { 556 log.trace("Connecting with addr="+addr+", port="+port 557 + ", localAddr="+localAddr+", localPort="+localPort 558 + ", socketFactory="+socketFactory); 559 } 560 561 if( localAddr != null ) 562 socket = socketFactory.createSocket(addr, port, localAddr, localPort); 563 else 564 socket = socketFactory.createSocket(addr, port); 565 566 socket.setTcpNoDelay(enableTcpNoDelay); 567 in = new ObjectInputStream (new BufferedInputStream (socket.getInputStream())); 568 out = new ObjectOutputStream (new BufferedOutputStream (socket.getOutputStream())); 569 out.flush(); 570 } 571 572 577 private void destroyConnection() 578 throws Exception 579 { 580 try 581 { 582 out.close(); 583 in.close(); 584 } 585 finally 586 { 587 socket.close(); 588 } 589 } 590 591 597 private Object waitAnswer() 598 throws Exception 599 { 600 out.reset(); 601 out.flush(); 602 int val = in.readByte(); 603 if (val == OILConstants.SUCCESS) 604 { 605 return null; 606 } 607 if (val == OILConstants.SUCCESS_OBJECT) 608 { 609 return in.readObject(); 610 } 611 else 612 { 613 Exception e = (Exception )in.readObject(); 614 throw e; 615 } 616 } 617 } 618 | Popular Tags |