1 24 package org.objectweb.joram.client.jms; 25 26 import java.util.Vector ; 27 28 import javax.jms.IllegalStateException ; 29 import javax.jms.InvalidDestinationException ; 30 import javax.jms.InvalidSelectorException ; 31 import javax.jms.JMSException ; 32 import javax.jms.JMSSecurityException ; 33 34 import org.objectweb.joram.client.jms.connection.RequestChannel; 35 import org.objectweb.joram.client.jms.connection.RequestMultiplexer; 36 import org.objectweb.joram.client.jms.connection.Requestor; 37 import org.objectweb.joram.shared.client.AbstractJmsReply; 38 import org.objectweb.joram.shared.client.AbstractJmsRequest; 39 import org.objectweb.joram.shared.client.CnxCloseRequest; 40 import org.objectweb.joram.shared.client.CnxConnectReply; 41 import org.objectweb.joram.shared.client.CnxConnectRequest; 42 import org.objectweb.joram.shared.client.CnxStartRequest; 43 import org.objectweb.joram.shared.client.CnxStopRequest; 44 import org.objectweb.joram.shared.client.ConsumerSubRequest; 45 46 import org.objectweb.util.monolog.api.BasicLevel; 47 import org.objectweb.util.monolog.api.Logger; 48 import fr.dyade.aaa.util.Debug; 49 50 53 public class Connection implements javax.jms.Connection { 54 public static Logger logger = Debug.getLogger(Connection.class.getName()); 55 56 59 private static class Status { 60 64 public static final int STOP = 0; 65 66 69 public static final int START = 1; 70 71 74 public static final int CLOSE = 2; 75 76 private static final String [] names = { 77 "STOP", "START", "CLOSE"}; 78 79 public static String toString(int status) { 80 return names[status]; 81 } 82 } 83 84 88 private RequestMultiplexer mtpx; 89 90 94 private Requestor requestor; 95 96 97 private ConnectionMetaData metaData = null; 98 99 100 private int sessionsC = 0; 101 102 103 private int messagesC = 0; 104 105 106 private int subsC = 0; 107 108 109 String proxyId; 110 111 112 private int key; 113 114 115 private FactoryParameters factoryParameters; 116 117 121 private int status; 122 123 124 private Vector sessions; 125 126 127 private Vector cconsumers; 128 129 132 private Closer closer; 133 134 private String stringImage = null; 135 private int hashCode; 136 137 146 public Connection(FactoryParameters factoryParameters, 147 RequestChannel requestChannel) 148 throws JMSException { 149 if (logger.isLoggable(BasicLevel.DEBUG)) 150 logger.log(BasicLevel.DEBUG, 151 "Connection.<init>(" + factoryParameters + 152 ',' + requestChannel + ')'); 153 this.factoryParameters = factoryParameters; 154 mtpx = new RequestMultiplexer(this, 155 requestChannel, 156 factoryParameters.cnxPendingTimer); 157 if (factoryParameters.multiThreadSync) { 158 mtpx.setMultiThreadSync(factoryParameters.multiThreadSyncDelay, 159 factoryParameters.multiThreadSyncThreshold); 160 } 161 162 requestor = new Requestor(mtpx); 163 sessions = new Vector (); 164 cconsumers = new Vector (); 165 166 closer = new Closer(); 167 168 setStatus(Status.STOP); 169 170 CnxConnectRequest req = new CnxConnectRequest(); 172 CnxConnectReply rep = (CnxConnectReply) requestor.request(req); 173 proxyId = rep.getProxyId(); 174 key = rep.getCnxKey(); 175 176 stringImage = "Cnx:" + proxyId + ':' + key; 177 hashCode = stringImage.hashCode(); 178 179 mtpx.setDemultiplexerDaemonName(toString()); 180 } 181 182 private final String newTrace(String trace) { 183 return "Connection[" + proxyId + ':' + key + ']' + trace; 184 } 185 186 private void setStatus(int status) { 187 if (logger.isLoggable(BasicLevel.DEBUG)) 188 logger.log(BasicLevel.DEBUG, 189 newTrace(".setStatus(" + Status.toString(status) + ')')); 190 this.status = status; 191 } 192 193 boolean isStopped() { 194 return (status == Status.STOP); 195 } 196 197 198 public String toString() { 199 return stringImage; 200 } 201 202 public int hashCode() { 203 return hashCode; 204 } 205 206 211 public boolean equals(Object obj) { 212 return (obj instanceof Connection) && (hashCode() == obj.hashCode()) && toString().equals(obj.toString()); 213 } 214 215 final long getTxPendingTimer() { 216 return factoryParameters.txPendingTimer; 217 } 218 219 final boolean getAsyncSend() { 220 return factoryParameters.asyncSend; 221 } 222 223 final int getQueueMessageReadMax() { 224 return factoryParameters.queueMessageReadMax; 225 } 226 227 final int getTopicAckBufferMax() { 228 return factoryParameters.topicAckBufferMax; 229 } 230 231 final int getTopicActivationThreshold() { 232 return factoryParameters.topicActivationThreshold; 233 } 234 235 final int getTopicPassivationThreshold() { 236 return factoryParameters.topicPassivationThreshold; 237 } 238 239 243 final protected synchronized void checkClosed() throws IllegalStateException { 244 if (status == Status.CLOSE || mtpx.isClosed()) 245 throw new IllegalStateException ("Forbidden call on a closed connection."); 246 } 247 248 257 public synchronized javax.jms.ConnectionConsumer 258 createConnectionConsumer( 259 javax.jms.Destination dest, 260 String selector, 261 javax.jms.ServerSessionPool sessionPool, 262 int maxMessages) throws JMSException { 263 if (logger.isLoggable(BasicLevel.DEBUG)) 264 logger.log(BasicLevel.DEBUG, 265 newTrace(".createConnectionConsumer(" + dest + 266 ',' +selector + ',' + sessionPool + 267 ',' + maxMessages + ')')); 268 checkClosed(); 269 return createConnectionConsumer(dest, null, selector, sessionPool, maxMessages); 270 } 271 272 281 public javax.jms.ConnectionConsumer 282 createDurableConnectionConsumer(javax.jms.Topic topic, 283 String subName, 284 String selector, 285 javax.jms.ServerSessionPool sessPool, 286 int maxMessages) throws JMSException { 287 if (logger.isLoggable(BasicLevel.DEBUG)) 288 logger.log(BasicLevel.DEBUG, 289 newTrace(".createDurableConnectionConsumer(" + 290 topic + ',' + subName + ',' + selector + ',' + 291 sessPool + ',' + maxMessages + ')')); 292 checkClosed(); 293 if (subName == null) 294 throw new JMSException ("Invalid subscription name: " + subName); 295 return createConnectionConsumer((Destination) topic, subName, selector, sessPool, maxMessages); 296 } 297 298 private synchronized javax.jms.ConnectionConsumer 299 createConnectionConsumer( 300 javax.jms.Destination dest, 301 String subName, 302 String selector, 303 javax.jms.ServerSessionPool sessionPool, 304 int maxMessages) throws JMSException { 305 checkClosed(); 306 307 try { 308 org.objectweb.joram.shared.selectors.Selector.checks(selector); 309 } catch (org.objectweb.joram.shared.excepts.SelectorException sE) { 310 throw new InvalidSelectorException ("Invalid selector syntax: " + sE); 311 } 312 313 if (sessionPool == null) 314 throw new JMSException ("Invalid ServerSessionPool parameter: " 315 + sessionPool); 316 if (maxMessages <= 0) 317 throw new JMSException ("Invalid maxMessages parameter: " + maxMessages); 318 319 boolean queueMode; 320 String targetName; 321 boolean durable; 322 323 if (dest instanceof javax.jms.Queue ) { 324 queueMode = true; 325 targetName = ((Destination) dest).getName(); 326 durable = false; 327 } else { 328 queueMode = false; 329 if (subName == null) { 330 targetName = nextSubName(); 331 durable = false; 332 } else { 333 targetName = subName; 334 durable = true; 335 } 336 requestor.request(new ConsumerSubRequest(((Destination) dest).getName(), 337 targetName, selector, false, durable)); 338 } 339 340 MultiSessionConsumer msc = 341 new MultiSessionConsumer( 342 queueMode, 343 durable, 344 selector, 345 targetName, 346 sessionPool, 347 factoryParameters.queueMessageReadMax, 348 factoryParameters.topicActivationThreshold, 349 factoryParameters.topicPassivationThreshold, 350 factoryParameters.topicAckBufferMax, 351 mtpx, 352 this, 353 maxMessages); 354 355 msc.start(); 356 357 cconsumers.addElement(msc); 358 359 return msc; 360 } 361 362 368 public synchronized javax.jms.Session 369 createSession(boolean transacted, 370 int acknowledgeMode) 371 throws JMSException { 372 if (logger.isLoggable(BasicLevel.DEBUG)) 373 logger.log(BasicLevel.DEBUG, 374 newTrace(".createSession(" + transacted + ',' + acknowledgeMode + ')')); 375 checkClosed(); 376 Session session = new Session( 377 this, 378 transacted, 379 acknowledgeMode, 380 mtpx); 381 addSession(session); 382 return session; 383 } 384 385 388 protected synchronized void addSession(Session session) { 389 sessions.addElement(session); 390 if (status == Status.START) { 391 session.start(); 392 } 393 } 394 395 400 public synchronized void 401 setExceptionListener(javax.jms.ExceptionListener listener) throws JMSException { 402 checkClosed(); 403 mtpx.setExceptionListener(listener); 404 } 405 406 411 public javax.jms.ExceptionListener getExceptionListener() throws JMSException { 412 checkClosed(); 413 return mtpx.getExceptionListener(); 414 } 415 416 421 public void setClientID(String clientID) throws JMSException { 422 throw new IllegalStateException ("ClientID is already set by the" 423 + " provider."); 424 } 425 426 431 public String getClientID() throws JMSException { 432 checkClosed(); 433 return proxyId; 434 } 435 436 441 public javax.jms.ConnectionMetaData getMetaData() throws JMSException { 442 checkClosed(); 443 if (metaData == null) 444 metaData = new ConnectionMetaData(); 445 return metaData; 446 } 447 448 453 public synchronized void start() throws JMSException { 454 if (logger.isLoggable(BasicLevel.DEBUG)) 455 logger.log( 456 BasicLevel.DEBUG, 457 newTrace(".start()")); 458 checkClosed(); 459 460 if (status == Status.START) 462 return; 463 464 if (logger.isLoggable(BasicLevel.DEBUG)) 465 logger.log(BasicLevel.DEBUG, "--- " + this 466 + ": starting..."); 467 468 470 for (int i = 0; i < sessions.size(); i++) { 471 Session session = (Session) sessions.elementAt(i); 472 session.start(); 473 } 474 475 mtpx.sendRequest(new CnxStartRequest()); 477 478 setStatus(Status.START); 479 } 480 481 487 public void stop() throws JMSException { 488 if (logger.isLoggable(BasicLevel.DEBUG)) 489 logger.log( 490 BasicLevel.DEBUG, 491 newTrace(".stop()")); 492 checkClosed(); 493 494 synchronized (this) { 495 if (status == Status.STOP) 496 return; 497 } 498 499 for (int i = 0; i < sessions.size(); i++) { 507 Session session = (Session) sessions.get(i); 508 session.stop(); 509 } 510 511 synchronized (this) { 512 if (status == Status.STOP) 513 return; 514 515 requestor.request(new CnxStopRequest()); 517 518 setStatus(Status.STOP); 521 } 522 } 523 524 530 public void close() throws JMSException { 531 if (logger.isLoggable(BasicLevel.DEBUG)) 532 logger.log( 533 BasicLevel.DEBUG, 534 newTrace(".close()")); 535 536 closer.close(); 537 } 538 539 546 class Closer { 547 synchronized void close() { 548 doClose(); 549 } 550 } 551 552 void doClose() { 553 synchronized (this) { 554 if (status == Status.CLOSE) { 555 return; 556 } 557 } 558 559 Vector sessionsToClose = (Vector )sessions.clone(); 560 sessions.clear(); 561 562 for (int i = 0; i < sessionsToClose.size(); i++) { 563 Session session = 564 (Session) sessionsToClose.elementAt(i); 565 try { 566 session.close(); 567 } catch (JMSException exc) { 568 if (logger.isLoggable(BasicLevel.DEBUG)) 569 logger.log( 570 BasicLevel.DEBUG, "", exc); 571 } 572 } 573 574 Vector consumersToClose = (Vector )cconsumers.clone(); 575 cconsumers.clear(); 576 577 for (int i = 0; i < consumersToClose.size(); i++) { 578 MultiSessionConsumer consumer = 579 (MultiSessionConsumer) consumersToClose.elementAt(i); 580 try { 581 consumer.close(); 582 } catch (JMSException exc) { 583 if (logger.isLoggable(BasicLevel.DEBUG)) 584 logger.log( 585 BasicLevel.DEBUG, "", exc); 586 } 587 } 588 589 590 try { 591 CnxCloseRequest closeReq = new CnxCloseRequest(); 592 requestor.request(closeReq); 593 } catch (JMSException exc) { 594 if (logger.isLoggable(BasicLevel.DEBUG)) 595 logger.log( 596 BasicLevel.DEBUG, "", exc); 597 } 598 599 mtpx.close(); 600 601 synchronized (this) { 602 setStatus(Status.CLOSE); 603 } 604 } 605 606 607 612 public void cleanup() { 613 if (logger.isLoggable(BasicLevel.DEBUG)) 614 logger.log( 615 BasicLevel.DEBUG, newTrace(".cleanup()")); 616 617 Vector sessionsToClose = (Vector )sessions.clone(); 620 sessions.clear(); 621 622 for (int i = 0; i < sessionsToClose.size(); i++) { 623 Session session = 624 (Session) sessionsToClose.elementAt(i); 625 try { 626 session.close(); 627 } catch (JMSException exc) { 628 if (logger.isLoggable(BasicLevel.DEBUG)) 629 logger.log( 630 BasicLevel.DEBUG, "", exc); 631 } 632 } 633 634 mtpx.cleanup(); 635 } 636 637 638 synchronized String nextSessionId() { 639 if (sessionsC == Integer.MAX_VALUE) 640 sessionsC = 0; 641 sessionsC++; 642 return "c" + key + "s" + sessionsC; 643 } 644 645 646 synchronized String nextMessageId() { 647 if (messagesC == Integer.MAX_VALUE) 648 messagesC = 0; 649 messagesC++; 650 return "ID:" + proxyId.substring(1) + "c" + key + "m" + messagesC; 651 } 652 653 654 synchronized String nextSubName() { 655 if (subsC == Integer.MAX_VALUE) 656 subsC = 0; 657 subsC++; 658 return "c" + key + "sub" + subsC; 659 } 660 661 664 synchronized void closeSession(Session session) { 665 if (logger.isLoggable(BasicLevel.DEBUG)) 666 logger.log( 667 BasicLevel.DEBUG, 668 newTrace(".closeSession(" + session + ')')); 669 sessions.removeElement(session); 670 } 671 672 676 synchronized void closeConnectionConsumer(MultiSessionConsumer cc) { 677 if (logger.isLoggable(BasicLevel.DEBUG)) 678 logger.log(BasicLevel.DEBUG, 679 newTrace(".closeConnectionConsumer(" + cc + ')')); 680 cconsumers.removeElement(cc); 681 } 682 683 synchronized AbstractJmsReply syncRequest( 684 AbstractJmsRequest request) throws JMSException { 685 if (logger.isLoggable(BasicLevel.DEBUG)) 686 logger.log(BasicLevel.DEBUG, 687 newTrace(".syncRequest(" + request + ')')); 688 return requestor.request(request); 689 } 690 691 694 synchronized void checkConsumers(String agentId) throws JMSException { 695 for (int i = 0; i < sessions.size(); i++) { 696 Session sess = (Session) sessions.elementAt(i); 697 sess.checkConsumers(agentId); 698 } 699 } 700 701 protected final RequestMultiplexer getRequestMultiplexer() { 702 return mtpx; 703 } 704 } 705 | Popular Tags |