1 24 package org.objectweb.joram.mom.dest.bridge; 25 26 import java.util.Properties ; 27 import java.util.Vector ; 28 29 import javax.jms.Connection ; 30 import javax.jms.ConnectionFactory ; 31 import javax.jms.Destination ; 32 import javax.jms.IllegalStateException ; 33 import javax.jms.JMSException ; 34 import javax.jms.JMSSecurityException ; 35 import javax.jms.MessageConsumer ; 36 import javax.jms.MessageFormatException ; 37 import javax.jms.MessageProducer ; 38 import javax.jms.Queue ; 39 import javax.jms.Session ; 40 import javax.jms.Topic ; 41 42 import org.objectweb.joram.shared.messages.Message; 43 44 import fr.dyade.aaa.agent.AgentId; 45 import fr.dyade.aaa.agent.Channel; 46 import fr.dyade.aaa.util.Daemon; 47 48 52 public class BridgeModule implements javax.jms.ExceptionListener , 53 javax.jms.MessageListener , 54 java.io.Serializable 55 { 56 57 protected AgentId agentId; 58 59 60 protected String jndiFactory = null; 61 62 protected String jndiUrl = null; 63 64 protected String cnxFactName; 65 66 protected String destName; 67 68 protected ConnectionFactory cnxFact = null; 69 70 protected Destination dest = null; 71 72 protected String userName = null; 73 74 protected String password = null; 75 76 protected String clientID = null; 77 78 protected String selector; 79 80 81 protected boolean usable = true; 82 83 protected String notUsableMessage; 84 85 86 protected transient Connection cnx; 87 88 protected transient Session producerSession; 89 90 protected transient Session consumerSession; 91 92 protected transient MessageProducer producer; 93 94 protected transient MessageConsumer consumer; 95 96 97 protected transient boolean listener; 98 99 protected transient Vector qout; 100 101 102 protected transient ConsumerDaemon consumerDaemon; 103 104 protected transient ReconnectionDaemon reconnectionDaemon; 105 106 111 private boolean automaticRequest = false; 112 113 114 public BridgeModule() 115 {} 116 117 118 128 public void init(AgentId agentId, Properties prop) { 129 this.agentId = agentId; 130 131 jndiFactory = prop.getProperty("jndiFactory"); 132 jndiUrl = prop.getProperty("jndiUrl"); 133 134 cnxFactName = prop.getProperty("connectionFactoryName"); 135 if (cnxFactName == null) 136 throw new IllegalArgumentException ("Missing ConnectionFactory JNDI name."); 137 138 destName = prop.getProperty("destinationName"); 139 if (destName == null) 140 throw new IllegalArgumentException ("Missing Destination JNDI name."); 141 142 String userName = prop.getProperty("userName"); 143 String password = prop.getProperty("password"); 144 145 if (userName != null && password != null) { 146 this.userName = userName; 147 this.password = password; 148 } 149 150 clientID = prop.getProperty("clientId"); 151 selector = prop.getProperty("selector"); 152 automaticRequest = Boolean.valueOf( 153 prop.getProperty("automaticRequest","false")).booleanValue(); 154 } 155 156 164 public void connect() throws JMSException { 165 if (! usable) 166 throw new IllegalStateException (notUsableMessage); 167 168 listener = false; 169 consumerDaemon = new ConsumerDaemon(); 171 reconnectionDaemon = new ReconnectionDaemon(); 172 173 if (cnxFact == null || dest == null) { 176 StartupDaemon startup = new StartupDaemon(); 177 startup.start(); 178 } 179 else { 181 try { 182 doConnect(); 183 } 184 catch (JMSException exc) { 185 reconnectionDaemon.reconnect(); 186 } 187 } 188 } 189 190 196 public void setMessageListener() throws IllegalStateException { 197 if (! usable) 198 throw new IllegalStateException (notUsableMessage); 199 200 listener = true; 201 try { 202 setConsumer(); 203 consumer.setMessageListener(this); 204 cnx.start(); 205 } catch (JMSException exc) {} 206 } 207 208 211 public void unsetMessageListener() { 212 try { 213 cnx.stop(); 214 consumer.setMessageListener(null); 215 unsetConsumer(); 216 } catch (JMSException exc) {} 217 listener = false; 218 } 219 220 231 public Message receiveNoWait() throws IllegalStateException { 232 if (! usable) 233 throw new IllegalStateException (notUsableMessage); 234 235 Message momMessage = null; 236 try { 237 setConsumer(); 238 cnx.start(); 239 try { 240 org.objectweb.joram.client.jms.Message clientMessage = 241 org.objectweb.joram.client.jms.Message.convertJMSMessage(consumer.receiveNoWait()); 242 momMessage = clientMessage.getMomMsg(); 243 consumerSession.commit(); 244 } 245 catch (MessageFormatException exc) { 247 consumerSession.rollback(); 248 } 249 } 250 catch (JMSException commitExc) { 253 momMessage = null; 254 } 255 return momMessage; 256 } 257 258 265 public void receive() throws IllegalStateException { 266 if (! usable) 267 throw new IllegalStateException (notUsableMessage); 268 269 consumerDaemon.receive(); 270 } 271 272 280 public void send(org.objectweb.joram.shared.messages.Message message) 281 throws JMSException { 282 if (! usable) 283 throw new IllegalStateException (notUsableMessage); 284 285 try { 286 producer.send(org.objectweb.joram.client.jms.Message.wrapMomMessage(null, message)); 287 acknowledge(message); 288 } catch (javax.jms.JMSException exc) { 289 qout.add(message); 291 } 292 } 293 294 297 public void close() 298 { 299 try { 300 cnx.stop(); 301 } 302 catch (JMSException exc) {} 303 304 unsetMessageListener(); 305 306 try { 307 consumerDaemon.interrupt(); 308 } 309 catch (Exception exc) {} 310 try { 311 reconnectionDaemon.interrupt(); 312 } 313 catch (Exception exc) {} 314 315 try { 316 cnx.close(); 317 } 318 catch (JMSException exc) {} 319 } 320 321 327 public void onException(JMSException exc) 328 { 329 reconnectionDaemon.reconnect(); 330 } 331 332 337 public void onMessage(javax.jms.Message jmsMessage) 338 { 339 try { 340 try { 341 org.objectweb.joram.client.jms.Message clientMessage = 342 org.objectweb.joram.client.jms.Message.convertJMSMessage(jmsMessage); 343 Message momMessage = clientMessage.getMomMsg(); 344 consumerSession.commit(); 345 Channel.sendTo(agentId, new BridgeDeliveryNot(momMessage)); 346 } 347 catch (MessageFormatException conversionExc) { 349 consumerSession.rollback(); 350 } 351 } 352 catch (JMSException exc) {} 354 } 355 356 362 protected void doConnect() throws JMSException { 363 if (userName != null && password != null) 364 cnx = cnxFact.createConnection(userName, password); 365 else 366 cnx = cnxFact.createConnection(); 367 cnx.setExceptionListener(this); 368 369 if (clientID != null) 370 cnx.setClientID(clientID); 371 372 producerSession = cnx.createSession(false, Session.AUTO_ACKNOWLEDGE); 373 producer = producerSession.createProducer(dest); 374 375 consumerSession = cnx.createSession(true, 0); 376 } 377 378 383 protected void setConsumer() throws JMSException { 384 if (consumer != null) 385 return; 386 387 try { 388 if (dest instanceof Queue ) 389 consumer = consumerSession.createConsumer(dest, selector); 390 else 391 consumer = consumerSession.createDurableSubscriber((Topic ) dest, 392 agentId.toString(), 393 selector, 394 false); 395 } 396 catch (JMSException exc) { 397 throw exc; 398 } 399 catch (Exception exc) { 400 throw new JMSException ("JMS resources do not allow to create consumer: " 401 + exc); 402 } 403 } 404 405 408 protected void unsetConsumer() { 409 try { 410 if (dest instanceof Topic ) 411 consumerSession.unsubscribe(agentId.toString()); 412 413 consumer.close(); 414 } 415 catch (Exception exc) {} 416 417 consumer = null; 418 } 419 420 423 protected void acknowledge(Message message) 424 { 425 Channel.sendTo(agentId, new BridgeAckNot(message.id)); 426 } 427 428 429 433 protected class StartupDaemon extends Daemon { 434 435 protected StartupDaemon() { 436 super(agentId.toString() + ":StartupDaemon"); 437 setDaemon(false); 438 } 439 440 441 public void run() { 442 javax.naming.Context jndiCtx = null; 443 try { 444 canStop = true; 445 446 if (cnxFact == null || dest == null) { 449 if (jndiFactory == null || jndiUrl == null) 450 jndiCtx = new javax.naming.InitialContext (); 451 else { 452 java.util.Hashtable env = new java.util.Hashtable (); 453 env.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, jndiFactory); 454 env.put(javax.naming.Context.PROVIDER_URL, jndiUrl); 455 jndiCtx = new javax.naming.InitialContext (env); 456 } 457 cnxFact = (ConnectionFactory ) jndiCtx.lookup(cnxFactName); 458 dest = (Destination ) jndiCtx.lookup(destName); 459 460 if (dest instanceof Topic ) 461 automaticRequest = false; 462 } 463 try { 464 doConnect(); 465 } 466 catch (AbstractMethodError exc) { 467 usable = false; 468 notUsableMessage = "Retrieved administered objects types not " 469 + "compatible with the 'unified' communication " 470 + " mode: " + exc; 471 } 472 catch (ClassCastException exc) { 473 usable = false; 474 notUsableMessage = "Retrieved administered objects types not " 475 + "compatible with the chosen communication mode: " 476 + exc; 477 } 478 catch (JMSSecurityException exc) { 479 usable = false; 480 notUsableMessage = "Provided user identification does not allow " 481 + "to connect to the foreign JMS server: " 482 + exc; 483 } 484 catch (JMSException exc) { 485 reconnectionDaemon.reconnect(); 486 } 487 catch (Throwable exc) { 488 usable = false; 489 notUsableMessage = "" + exc; 490 } 491 } 492 catch (javax.naming.NameNotFoundException exc) { 493 usable = false; 494 if (cnxFact == null) 495 notUsableMessage = "Could not retrieve ConnectionFactory [" 496 + cnxFactName 497 + "] from JNDI: " + exc; 498 else if (dest == null) 499 notUsableMessage = "Could not retrieve Destination [" 500 + destName 501 + "] from JNDI: " + exc; 502 } 503 catch (javax.naming.NamingException exc) { 504 usable = false; 505 notUsableMessage = "Could not access JNDI: " + exc; 506 } 507 catch (ClassCastException exc) { 508 usable = false; 509 notUsableMessage = "Error while retrieving administered objects " 510 + "through JNDI possibly because of missing " 511 + "foreign JMS client libraries in classpath: " 512 + exc; 513 } 514 catch (Exception exc) { 515 usable = false; 516 notUsableMessage = "Error while retrieving administered objects " 517 + "through JNDI: " 518 + exc; 519 } 520 finally { 521 try { 523 jndiCtx.close(); 524 } 525 catch (Exception exc) {} 526 527 finish(); 528 } 529 } 530 531 532 public void shutdown() 533 {} 534 535 536 public void close() 537 {} 538 } 539 540 544 protected class ReconnectionDaemon extends Daemon 545 { 546 547 private int attempts1 = 30; 548 549 private long interval1 = 1000L; 550 551 private int attempts2 = 55; 552 553 private long interval2 = 5000L; 554 555 private long interval3 = 60000L; 556 557 558 protected ReconnectionDaemon() 559 { 560 super(agentId.toString() + ":ReconnectionDaemon"); 561 setDaemon(false); 562 } 563 564 565 protected void reconnect() { 566 if (running) 567 return; 568 569 consumer = null; 570 start(); 571 } 572 573 574 public void run() 575 { 576 int attempts = 0; 577 long interval; 578 Message msg; 579 580 try { 581 while (running) { 582 canStop = true; 583 584 attempts++; 585 586 if (attempts <= 30) 587 interval = interval1; 588 else if (attempts <= 55) 589 interval = interval2; 590 else 591 interval = interval3; 592 593 try { 594 Thread.sleep(interval); 595 doConnect(); 596 597 if (listener) 599 setMessageListener(); 600 consumerDaemon.start(); 602 while (! qout.isEmpty()) 604 send((Message) qout.remove(0)); 605 } 606 catch (Exception exc) { 607 continue; 608 } 609 canStop = false; 610 break; 611 } 612 } 613 finally { 614 finish(); 615 } 616 } 617 618 619 public void shutdown() 620 {} 621 622 623 public void close() 624 {} 625 } 626 627 632 protected class ConsumerDaemon extends Daemon { 633 634 private int requests = 0; 635 636 637 638 protected ConsumerDaemon() { 639 super(agentId.toString() + ":ConsumerDaemon"); 640 setDaemon(false); 641 } 642 643 644 protected synchronized void receive() { 645 requests++; 646 647 if (running) 648 return; 649 650 start(); 651 } 652 653 654 public void run() { 655 try { 656 Message momMessage; 657 BridgeDeliveryNot notif; 658 659 setConsumer(); 660 cnx.start(); 661 while ((requests > 0 || automaticRequest) && running) { 662 canStop = true; 663 try { 665 org.objectweb.joram.client.jms.Message clientMessage = 666 org.objectweb.joram.client.jms.Message.convertJMSMessage(consumer.receive()); 667 momMessage = clientMessage.getMomMsg(); 668 consumerSession.commit(); 669 } 670 catch (MessageFormatException messageExc) { 672 consumerSession.rollback(); 673 continue; 674 } 675 canStop = false; 677 notif = new BridgeDeliveryNot(momMessage); 678 Channel.sendTo(agentId, notif); 679 if (!automaticRequest) 680 requests--; 681 } 682 } 683 catch (JMSException exc) {} 685 finally { 686 finish(); 687 } 688 } 689 690 691 public void shutdown() 692 {} 693 694 695 public void close() 696 {} 697 } 698 699 700 701 private void readObject(java.io.ObjectInputStream in) 702 throws java.io.IOException , ClassNotFoundException 703 { 704 in.defaultReadObject(); 705 qout = new Vector (); 706 } 707 } 708 | Popular Tags |