1 23 package org.objectweb.joram.mom.util; 24 25 import fr.dyade.aaa.agent.AgentId; 26 import fr.dyade.aaa.agent.Channel; 27 import fr.dyade.aaa.util.Daemon; 28 import org.objectweb.joram.shared.messages.Message; 29 30 import java.util.Enumeration ; 31 import java.util.Properties ; 32 import java.util.Vector ; 33 34 import javax.jms.*; 35 import javax.jms.IllegalStateException ; 36 37 38 42 public class BridgeUnifiedModule implements javax.jms.ExceptionListener , 43 javax.jms.MessageListener , 44 java.io.Serializable 45 { 46 47 protected AgentId agentId; 48 49 50 protected String jndiFactory = null; 51 52 protected String jndiUrl = null; 53 54 protected String cnxFactName; 55 56 protected String destName; 57 58 protected ConnectionFactory cnxFact = null; 59 60 protected Destination dest = null; 61 62 protected String userName = null; 63 64 protected String password = null; 65 66 protected String clientID = null; 67 68 protected String selector; 69 70 71 protected boolean usable = true; 72 73 protected String notUsableMessage; 74 75 76 protected transient Connection cnx; 77 78 protected transient Session producerSession; 79 80 protected transient Session consumerSession; 81 82 protected transient MessageProducer producer; 83 84 protected transient MessageConsumer consumer; 85 86 87 protected transient boolean listener; 88 89 protected transient Vector qout; 90 91 92 protected transient ConsumerDaemon consumerDaemon; 93 94 protected transient ReconnectionDaemon reconnectionDaemon; 95 96 97 98 public BridgeUnifiedModule() 99 {} 100 101 102 112 public void init(AgentId agentId, Properties prop) 113 { 114 this.agentId = agentId; 115 116 jndiFactory = prop.getProperty("jndiFactory"); 117 jndiUrl = prop.getProperty("jndiUrl"); 118 119 cnxFactName = prop.getProperty("connectionFactoryName"); 120 destName = prop.getProperty("destinationName"); 121 122 if (cnxFactName == null) 123 throw new IllegalArgumentException ("Missing ConnectionFactory " 124 + "JNDI name."); 125 else if (destName == null) 126 throw new IllegalArgumentException ("Missing Destination " 127 + "JNDI name."); 128 129 String userName = prop.getProperty("userName"); 130 String password = prop.getProperty("password"); 131 132 if (userName != null && password != null) { 133 this.userName = userName; 134 this.password = password; 135 } 136 137 clientID = prop.getProperty("clientId"); 138 selector = prop.getProperty("selector"); 139 } 140 141 149 public void connect() throws JMSException 150 { 151 if (! usable) 152 throw new IllegalStateException (notUsableMessage); 153 154 listener = false; 155 consumerDaemon = new ConsumerDaemon(); 157 reconnectionDaemon = new ReconnectionDaemon(); 158 159 if (cnxFact == null || dest == null) { 162 StartupDaemon startup = new StartupDaemon(); 163 startup.start(); 164 } 165 else { 167 try { 168 doConnect(); 169 } 170 catch (JMSException exc) { 171 reconnectionDaemon.reconnect(); 172 } 173 } 174 } 175 176 182 public void setMessageListener() throws IllegalStateException 183 { 184 if (! usable) 185 throw new IllegalStateException (notUsableMessage); 186 187 listener = true; 188 try { 189 setConsumer(); 190 consumer.setMessageListener(this); 191 cnx.start(); 192 } 193 catch (JMSException exc) {} 194 } 195 196 199 public void unsetMessageListener() 200 { 201 try { 202 cnx.stop(); 203 consumer.setMessageListener(null); 204 unsetConsumer(); 205 } 206 catch (JMSException exc) {} 207 listener = false; 208 } 209 210 221 public Message receiveNoWait() throws IllegalStateException 222 { 223 if (! usable) 224 throw new IllegalStateException (notUsableMessage); 225 226 Message momMessage = null; 227 try { 228 setConsumer(); 229 cnx.start(); 230 try { 231 momMessage = MessageConverterModule.convert(consumer.receiveNoWait()); 232 consumerSession.commit(); 233 } 234 catch (MessageFormatException exc) { 236 consumerSession.rollback(); 237 } 238 } 239 catch (JMSException commitExc) { 242 momMessage = null; 243 } 244 return momMessage; 245 } 246 247 254 public void receive() throws IllegalStateException 255 { 256 if (! usable) 257 throw new IllegalStateException (notUsableMessage); 258 259 consumerDaemon.receive(); 260 } 261 262 270 public void send(org.objectweb.joram.shared.messages.Message message) 271 throws JMSException 272 { 273 if (! usable) 274 throw new IllegalStateException (notUsableMessage); 275 276 try { 277 producer.send(MessageConverterModule.convert(producerSession, message)); 278 acknowledge(message); 279 } 280 catch (javax.jms.MessageFormatException exc) { 281 throw exc; 282 } 283 catch (javax.jms.JMSException exc) { 285 qout.add(message); 286 } 287 } 288 289 292 public void close() 293 { 294 try { 295 cnx.stop(); 296 } 297 catch (JMSException exc) {} 298 299 unsetMessageListener(); 300 301 try { 302 consumerDaemon.interrupt(); 303 } 304 catch (Exception exc) {} 305 try { 306 reconnectionDaemon.interrupt(); 307 } 308 catch (Exception exc) {} 309 310 try { 311 cnx.close(); 312 } 313 catch (JMSException exc) {} 314 } 315 316 322 public void onException(JMSException exc) 323 { 324 reconnectionDaemon.reconnect(); 325 } 326 327 332 public void onMessage(javax.jms.Message jmsMessage) 333 { 334 try { 335 try { 336 Message momMessage = MessageConverterModule.convert(jmsMessage); 337 consumerSession.commit(); 338 Channel.sendTo(agentId, new BridgeDeliveryNot(momMessage)); 339 } 340 catch (MessageFormatException conversionExc) { 342 consumerSession.rollback(); 343 } 344 } 345 catch (JMSException exc) {} 347 } 348 349 355 protected void doConnect() throws JMSException 356 { 357 if (userName != null && password != null) 358 cnx = cnxFact.createConnection(userName, password); 359 else 360 cnx = cnxFact.createConnection(); 361 cnx.setExceptionListener(this); 362 363 if (clientID != null) 364 cnx.setClientID(clientID); 365 366 producerSession = cnx.createSession(false, Session.AUTO_ACKNOWLEDGE); 367 producer = producerSession.createProducer(dest); 368 369 consumerSession = cnx.createSession(true, 0); 370 } 371 372 377 protected void setConsumer() throws JMSException 378 { 379 if (consumer != null) 380 return; 381 382 try { 383 if (dest instanceof Queue) 384 consumer = consumerSession.createConsumer(dest, selector); 385 else 386 consumer = consumerSession.createDurableSubscriber((Topic) dest, 387 agentId.toString(), 388 selector, 389 false); 390 } 391 catch (JMSException exc) { 392 throw exc; 393 } 394 catch (Exception exc) { 395 throw new JMSException("JMS resources do not allow to create consumer: " 396 + exc); 397 } 398 } 399 400 403 protected void unsetConsumer() 404 { 405 try { 406 if (dest instanceof Topic) 407 consumerSession.unsubscribe(agentId.toString()); 408 409 consumer.close(); 410 } 411 catch (Exception exc) {} 412 413 consumer = null; 414 } 415 416 419 protected void acknowledge(Message message) 420 { 421 Channel.sendTo(agentId, new BridgeAckNot(message.getIdentifier())); 422 } 423 424 425 429 protected class StartupDaemon extends Daemon 430 { 431 432 protected StartupDaemon() 433 { 434 super(agentId.toString() + ":StartupDaemon"); 435 setDaemon(false); 436 } 437 438 439 public void run() 440 { 441 javax.naming.Context jndiCtx = null; 442 try { 443 canStop = true; 444 445 if (cnxFact == null || dest == null) { 448 if (jndiFactory == null || jndiUrl == null) 449 jndiCtx = new javax.naming.InitialContext (); 450 else { 451 java.util.Hashtable env = new java.util.Hashtable (); 452 env.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, jndiFactory); 453 env.put(javax.naming.Context.PROVIDER_URL, jndiUrl); 454 jndiCtx = new javax.naming.InitialContext (env); 455 } 456 cnxFact = (ConnectionFactory) jndiCtx.lookup(cnxFactName); 457 dest = (Destination) jndiCtx.lookup(destName); 458 } 459 try { 460 doConnect(); 461 } 462 catch (AbstractMethodError exc) { 463 usable = false; 464 notUsableMessage = "Retrieved administered objects types not " 465 + "compatible with the 'unified' communication " 466 + " mode: " + exc; 467 } 468 catch (ClassCastException exc) { 469 usable = false; 470 notUsableMessage = "Retrieved administered objects types not " 471 + "compatible with the chosen communication mode: " 472 + exc; 473 } 474 catch (JMSSecurityException exc) { 475 usable = false; 476 notUsableMessage = "Provided user identification does not allow " 477 + "to connect to the foreign JMS server: " 478 + exc; 479 } 480 catch (JMSException exc) { 481 reconnectionDaemon.reconnect(); 482 } 483 catch (Throwable exc) { 484 usable = false; 485 notUsableMessage = "" + exc; 486 } 487 } 488 catch (javax.naming.NameNotFoundException exc) { 489 usable = false; 490 if (cnxFact == null) 491 notUsableMessage = "Could not retrieve ConnectionFactory [" 492 + cnxFactName 493 + "] from JNDI: " + exc; 494 else if (dest == null) 495 notUsableMessage = "Could not retrieve Destination [" 496 + destName 497 + "] from JNDI: " + exc; 498 } 499 catch (javax.naming.NamingException exc) { 500 usable = false; 501 notUsableMessage = "Could not access JNDI: " + exc; 502 } 503 catch (ClassCastException exc) { 504 usable = false; 505 notUsableMessage = "Error while retrieving administered objects " 506 + "through JNDI possibly because of missing " 507 + "foreign JMS client libraries in classpath: " 508 + exc; 509 } 510 catch (Exception exc) { 511 usable = false; 512 notUsableMessage = "Error while retrieving administered objects " 513 + "through JNDI: " 514 + exc; 515 } 516 finally { 517 try { 519 jndiCtx.close(); 520 } 521 catch (Exception exc) {} 522 523 finish(); 524 } 525 } 526 527 528 public void shutdown() 529 {} 530 531 532 public void close() 533 {} 534 } 535 536 540 protected class ReconnectionDaemon extends Daemon 541 { 542 543 private int attempts1 = 30; 544 545 private long interval1 = 1000L; 546 547 private int attempts2 = 55; 548 549 private long interval2 = 5000L; 550 551 private long interval3 = 60000L; 552 553 554 protected ReconnectionDaemon() 555 { 556 super(agentId.toString() + ":ReconnectionDaemon"); 557 setDaemon(false); 558 } 559 560 561 protected void reconnect() { 562 if (running) 563 return; 564 565 consumer = null; 566 start(); 567 } 568 569 570 public void run() 571 { 572 int attempts = 0; 573 long interval; 574 Message msg; 575 576 try { 577 while (running) { 578 canStop = true; 579 580 attempts++; 581 582 if (attempts <= 30) 583 interval = interval1; 584 else if (attempts <= 55) 585 interval = interval2; 586 else 587 interval = interval3; 588 589 try { 590 Thread.sleep(interval); 591 doConnect(); 592 593 if (listener) 595 setMessageListener(); 596 consumerDaemon.start(); 598 while (! qout.isEmpty()) 600 send((Message) qout.remove(0)); 601 } 602 catch (Exception exc) { 603 continue; 604 } 605 canStop = false; 606 break; 607 } 608 } 609 finally { 610 finish(); 611 } 612 } 613 614 615 public void shutdown() 616 {} 617 618 619 public void close() 620 {} 621 } 622 623 628 protected class ConsumerDaemon extends Daemon 629 { 630 631 private int requests = 0; 632 633 634 635 protected ConsumerDaemon() 636 { 637 super(agentId.toString() + ":ConsumerDaemon"); 638 setDaemon(false); 639 } 640 641 642 protected synchronized void receive() 643 { 644 requests++; 645 646 if (running) 647 return; 648 649 start(); 650 } 651 652 653 public void run() 654 { 655 try { 656 Message momMessage; 657 BridgeDeliveryNot notif; 658 659 setConsumer(); 660 cnx.start(); 661 662 while (requests > 0 && running) { 663 canStop = true; 664 665 try { 667 momMessage = MessageConverterModule.convert(consumer.receive()); 668 consumerSession.commit(); 669 } 670 catch (MessageFormatException messageExc) { 672 consumerSession.rollback(); 673 continue; 674 } 675 canStop = false; 677 notif = new BridgeDeliveryNot(momMessage); 678 Channel.sendTo(agentId, notif); 679 requests--; 680 } 681 } 682 catch (JMSException exc) {} 684 finally { 685 finish(); 686 } 687 } 688 689 690 public void shutdown() 691 {} 692 693 694 public void close() 695 {} 696 } 697 698 699 700 private void readObject(java.io.ObjectInputStream in) 701 throws java.io.IOException , ClassNotFoundException 702 { 703 in.defaultReadObject(); 704 qout = new Vector (); 705 } 706 } 707 | Popular Tags |