1 24 package com.scalagent.kjoram; 25 26 import com.scalagent.kjoram.excepts.IllegalStateException; 27 import com.scalagent.kjoram.excepts.*; 28 import com.scalagent.kjoram.jms.*; 29 import com.scalagent.kjoram.*; 30 import com.scalagent.kjoram.util.StoppedQueueException; 31 32 import java.util.*; 33 34 public class Connection 35 { 36 37 private ConnectionItf connectionImpl; 38 39 40 private String proxyId; 41 42 private int key; 43 44 45 private ConnectionMetaData metaData = null; 46 47 private ExceptionListener excListener = null; 48 49 50 private int requestsC = 0; 51 52 private int sessionsC = 0; 53 54 private int messagesC = 0; 55 56 private int subsC = 0; 57 58 59 private com.scalagent.kjoram.util.Timer sessionsTimer = null; 60 61 62 FactoryParameters factoryParameters; 63 64 65 Driver driver; 66 67 68 boolean started = false; 69 70 boolean closing = false; 71 72 boolean closed = false; 73 74 Vector sessions; 75 76 Vector cconsumers; 77 81 Hashtable requestsTable; 82 85 Hashtable repliesTable; 86 87 String name = null; 88 89 98 public Connection(FactoryParameters factoryParameters, 99 ConnectionItf connectionImpl) throws JMSException 100 { 101 try { 102 this.factoryParameters = factoryParameters; 103 104 sessions = new Vector(); 105 requestsTable = new Hashtable(); 106 repliesTable = new Hashtable(); 107 108 this.connectionImpl = connectionImpl; 109 name = connectionImpl.getUserName(); 110 111 driver = connectionImpl.createDriver(this); 113 driver.start(); 114 115 CnxConnectRequest req = new CnxConnectRequest(); 117 CnxConnectReply rep = (CnxConnectReply) syncRequest(req); 118 proxyId = rep.getProxyId(); 119 key = rep.getCnxKey(); 120 121 if (factoryParameters.txPendingTimer != 0) 123 sessionsTimer = new com.scalagent.kjoram.util.Timer(); 124 125 if (JoramTracing.dbgClient) 126 JoramTracing.log(JoramTracing.DEBUG, this + ": opened."); 127 } 128 catch (JMSException jE) { 130 JoramTracing.log(JoramTracing.ERROR, jE); 131 throw jE; 132 } 133 } 134 135 public String getUserName() { 136 return name; 137 } 138 139 140 public String toString() 141 { 142 return "Cnx:" + proxyId + "-" + key; 143 } 144 145 150 public boolean equals(Object obj) 151 { 152 return (obj instanceof Connection) 153 && toString().equals(obj.toString()); 154 } 155 156 157 166 public ConnectionConsumer 167 createConnectionConsumer(Destination dest, String selector, 168 ServerSessionPool sessionPool, 169 int maxMessages) throws JMSException 170 { 171 if (closed) 172 throw new IllegalStateException ("Forbidden call on a closed" 173 + " connection."); 174 175 return new ConnectionConsumer(this, (Destination) dest, selector, 176 sessionPool, maxMessages); 177 } 178 179 188 public ConnectionConsumer 189 createDurableConnectionConsumer(Topic topic, String subName, 190 String selector, 191 ServerSessionPool sessPool, 192 int maxMessages) throws JMSException 193 { 194 if (closed) 195 throw new IllegalStateException ("Forbidden call on a closed" 196 + " connection."); 197 198 return new ConnectionConsumer(this, (Topic) topic, subName, selector, 199 sessPool, maxMessages); 200 } 201 202 208 public Session 209 createSession(boolean transacted, int acknowledgeMode) 210 throws JMSException 211 { 212 if (closed) 213 throw new IllegalStateException ("Forbidden call on a closed" 214 + " connection."); 215 216 return new Session(this, transacted, acknowledgeMode); 217 } 218 219 224 public void setExceptionListener(ExceptionListener listener) 225 throws JMSException 226 { 227 if (closed) 228 throw new IllegalStateException ("Forbidden call on a closed" 229 + " connection."); 230 this.excListener = listener; 231 } 232 233 238 public ExceptionListener getExceptionListener() throws JMSException 239 { 240 if (closed) 241 throw new IllegalStateException ("Forbidden call on a closed" 242 + " connection."); 243 return excListener; 244 } 245 246 251 synchronized void onException(JMSException jE) 252 { 253 if (JoramTracing.dbgClient) 254 JoramTracing.log(JoramTracing.WARN, this + ": " + jE); 255 256 if (excListener != null) 257 excListener.onException(jE); 258 } 259 260 265 public void setClientID(String clientID) throws JMSException 266 { 267 throw new IllegalStateException ("ClientID is already set by the" 268 + " provider."); 269 } 270 271 276 public String getClientID() throws JMSException 277 { 278 if (closed) 279 throw new IllegalStateException ("Forbidden call on a closed" 280 + " connection."); 281 return proxyId; 282 } 283 284 289 public ConnectionMetaData getMetaData() throws JMSException 290 { 291 if (closed) 292 throw new IllegalStateException ("Forbidden call on a closed" 293 + " connection."); 294 if (metaData == null) 295 metaData = new ConnectionMetaData(); 296 return metaData; 297 } 298 299 304 public void start() throws JMSException 305 { 306 if (closed) 308 throw new IllegalStateException ("Forbidden call on a closed" 309 + " connection."); 310 311 if (started) 313 return; 314 315 if (JoramTracing.dbgClient) 316 JoramTracing.log(JoramTracing.DEBUG, "--- " + this 317 + ": starting..."); 318 319 Session session; 321 for (int i = 0; i < sessions.size(); i++) { 322 session = (Session) sessions.elementAt(i); 323 session.repliesIn.start(); 324 session.start(); 325 } 326 asyncRequest(new CnxStartRequest()); 328 329 started = true; 330 331 if (JoramTracing.dbgClient) 332 JoramTracing.log(JoramTracing.DEBUG, this + ": started."); 333 } 334 335 341 public void stop() throws JMSException 342 { 343 IllegalStateException isE = null; 344 345 if (closed) 347 throw new IllegalStateException ("Forbidden call on a closed" 348 + " connection."); 349 350 if (! started) 352 return; 353 354 if (JoramTracing.dbgClient) 355 JoramTracing.log(JoramTracing.DEBUG, this + ": stopping..."); 356 357 try { 359 syncRequest(new CnxStopRequest()); 360 } 361 catch (IllegalStateException caughtISE) { 363 isE = caughtISE; 364 } 365 366 Session session; 370 for (int i = 0; i < sessions.size(); i++) { 371 session = (Session) sessions.elementAt(i); 372 try { 373 session.repliesIn.stop(); 374 } 375 catch (InterruptedException iE) {} 376 session.stop(); 377 } 378 379 started = false; 380 381 if (isE != null) { 382 JoramTracing.log(JoramTracing.ERROR, isE); 383 throw isE; 384 } 385 386 if (JoramTracing.dbgClient) 387 JoramTracing.log(JoramTracing.DEBUG, this + ": is stopped."); 388 } 389 390 391 397 public void close() throws JMSException 398 { 399 if (closed) 401 return; 402 403 closing = true; 404 405 if (JoramTracing.dbgClient) 406 JoramTracing.log(JoramTracing.DEBUG, "--- " + this 407 + ": closing..."); 408 409 if (sessionsTimer != null) 411 sessionsTimer.cancel(); 412 413 try { 415 stop(); 416 } 417 catch (JMSException jE) {} 419 420 Session session; 422 while (! sessions.isEmpty()) { 423 session = (Session) sessions.elementAt(0); 424 try { 425 session.close(); 426 } 427 catch (JMSException jE) {} 429 } 430 431 if (cconsumers != null) { 433 ConnectionConsumer cc; 434 while (! cconsumers.isEmpty()) { 435 cc = (ConnectionConsumer) cconsumers.elementAt(0); 436 cc.close(); 437 } 438 } 439 440 connectionImpl.close(); 442 443 if (! driver.stopping) 445 driver.stop(); 446 447 requestsTable.clear(); 448 requestsTable = null; 449 repliesTable.clear(); 450 repliesTable = null; 451 452 closed = true; 453 454 if (JoramTracing.dbgClient) 455 JoramTracing.log(JoramTracing.DEBUG, this + ": closed."); 456 } 457 458 459 synchronized int nextRequestId() 460 { 461 if (requestsC == Integer.MAX_VALUE) 462 requestsC = 0; 463 return requestsC++; 464 } 465 466 467 synchronized String nextSessionId() 468 { 469 if (sessionsC == Integer.MAX_VALUE) 470 sessionsC = 0; 471 sessionsC++; 472 return "c" + key + "s" + sessionsC; 473 } 474 475 476 synchronized String nextMessageId() 477 { 478 if (messagesC == Integer.MAX_VALUE) 479 messagesC = 0; 480 messagesC++; 481 return "ID:" + proxyId + "c" + key + "m" + messagesC; 482 } 483 484 485 synchronized String nextSubName() 486 { 487 if (subsC == Integer.MAX_VALUE) 488 subsC = 0; 489 subsC++; 490 return "c" + key + "sub" + subsC; 491 } 492 493 494 synchronized void schedule(com.scalagent.kjoram.util.TimerTask task) 495 { 496 if (sessionsTimer == null) 497 return; 498 499 try { 500 sessionsTimer.schedule(task, factoryParameters.txPendingTimer * 1000); 501 } 502 catch (Exception exc) {} 503 } 504 505 516 AbstractJmsReply syncRequest(AbstractJmsRequest request) throws JMSException 517 { 518 if (closed) 519 throw new IllegalStateException ("Forbidden call on a closed" 520 + " connection."); 521 522 if (request.getRequestId() == -1) 523 request.setRequestId(nextRequestId()); 524 525 int requestId = request.getRequestId(); 526 527 try { 528 if (JoramTracing.dbgClient) 529 JoramTracing.log(JoramTracing.DEBUG, this + ": sends request: " 530 + request.getClass().getName() 531 + " with id: " + requestId); 532 533 Lock lock = new Lock(); 534 requestsTable.put(request.getKey(), lock); 535 synchronized(lock) { 536 connectionImpl.send(request); 537 while (true) { 538 try { 539 lock.wait(); 540 break; 541 } 542 catch (InterruptedException iE) { 543 if (JoramTracing.dbgClient) 544 JoramTracing.log(JoramTracing.WARN,this 545 + ": caught InterruptedException"); 546 continue; 547 } 548 } 549 requestsTable.remove(request.getKey()); 550 } 551 } 552 catch (Exception e) { 554 JMSException jE = null; 555 if (e instanceof JMSException) 556 throw (JMSException) e; 557 else 558 jE = new JMSException("Exception while getting a reply."); 559 560 jE.setLinkedException(e); 561 562 if (requestsTable != null) 564 requestsTable.remove(request.getKey()); 565 566 JoramTracing.log(JoramTracing.ERROR, jE); 567 throw jE; 568 } 569 AbstractJmsReply reply = 571 (AbstractJmsReply) repliesTable.remove(request.getKey()); 572 573 if (JoramTracing.dbgClient) 574 JoramTracing.log(JoramTracing.DEBUG, this + ": got reply."); 575 576 if (reply == null) 579 throw new IllegalStateException ("Connection is broken."); 580 else if (reply instanceof MomExceptionReply) { 582 MomException mE = ((MomExceptionReply) reply).getException(); 583 584 if (mE instanceof AccessException) 585 throw new JMSSecurityException(mE.getMessage()); 586 else if (mE instanceof DestinationException) 587 throw new InvalidDestinationException(mE.getMessage()); 588 else 589 throw new JMSException(mE.getMessage()); 590 } 591 else 593 return reply; 594 } 595 596 601 void asyncRequest(AbstractJmsRequest request) throws IllegalStateException 602 { 603 if (closed) 604 throw new IllegalStateException ("Forbidden call on a closed" 605 + " connection."); 606 607 if (request.getRequestId() == -1) 608 request.setRequestId(nextRequestId()); 609 610 try { 611 if (JoramTracing.dbgClient) 612 JoramTracing.log(JoramTracing.DEBUG, this + ": sends request: " 613 + request.getClass().getName() 614 + " with id: " + request.getRequestId()); 615 connectionImpl.send(request); 616 } 617 catch (IllegalStateException exc) { 619 requestsTable.remove(request.getKey()); 621 622 JoramTracing.log(JoramTracing.ERROR, exc); 623 throw exc; 624 } 625 } 626 627 635 void distribute(AbstractJmsReply reply) 636 { 637 int correlationId = reply.getCorrelationId(); 639 640 if (JoramTracing.dbgClient) 641 JoramTracing.log(JoramTracing.DEBUG, this + ": got reply: " 642 + correlationId); 643 644 Object obj = null; 645 if (correlationId != -1) 646 obj = requestsTable.get(reply.getKey()); 647 648 if (obj instanceof Lock) { 651 repliesTable.put(reply.getKey(), reply); 652 653 synchronized(obj) { 654 obj.notify(); 655 } 656 } 657 else if (reply instanceof MomExceptionReply) { 659 requestsTable.remove(reply.getKey()); 661 662 MomException mE = ((MomExceptionReply) reply).getException(); 663 JMSException jE = null; 664 665 if (mE instanceof AccessException) 666 jE = new JMSSecurityException(mE.getMessage()); 667 else if (mE instanceof DestinationException) 668 jE = new InvalidDestinationException(mE.getMessage()); 669 else 670 jE = new JMSException(mE.getMessage()); 671 672 onException(jE); 673 } 674 else if (obj != null) { 676 try { 677 if (obj instanceof ConnectionConsumer) 679 ((ConnectionConsumer) obj).repliesIn.push(reply); 680 else if (obj instanceof MessageConsumer) 681 ((MessageConsumer) obj).sess.repliesIn.push(reply); 682 } 683 catch (StoppedQueueException sqE) { 684 denyDelivery((ConsumerMessages) reply); 685 } 686 } 687 else if (reply instanceof ConsumerMessages) 689 denyDelivery((ConsumerMessages) reply); 690 } 691 692 693 private void denyDelivery(ConsumerMessages delivery) 694 { 695 Vector msgs = delivery.getMessages(); 696 com.scalagent.kjoram.messages.Message msg; 697 Vector ids = new Vector(); 698 699 for (int i = 0; i < msgs.size(); i++) { 700 msg = (com.scalagent.kjoram.messages.Message) msgs.elementAt(i); 701 ids.addElement(msg.getIdentifier()); 702 } 703 704 if (ids.isEmpty()) 705 return; 706 707 try { 708 asyncRequest(new SessDenyRequest(delivery.comesFrom(), ids, 711 delivery.getQueueMode(), true)); 712 } 713 catch (JMSException jE) {} 715 } 716 } 717 | Popular Tags |