1 38 39 40 package org.jahia.services.cache; 41 42 import java.util.Hashtable ; 43 import javax.jms.ExceptionListener ; 44 import javax.jms.JMSException ; 45 import javax.jms.MapMessage ; 46 import javax.jms.Session ; 47 import javax.jms.Topic ; 48 import javax.jms.TopicConnection ; 49 import javax.jms.TopicConnectionFactory ; 50 import javax.jms.TopicSession ; 51 import javax.naming.Context ; 52 import javax.naming.InitialContext ; 53 import javax.naming.NamingException ; 54 55 import org.jahia.exceptions.JahiaInitializationException; 56 import org.jahia.settings.SettingsBean; 57 58 59 70 public class JMSHub implements ExceptionListener { 71 72 final protected static String PACKED_MSG_KEY = "packedmsgKey"; 73 final protected static String PACKED_FLAG = "isPacked"; 74 75 76 final private static org.apache.log4j.Logger logger = 77 org.apache.log4j.Logger.getLogger (JMSHub.class); 78 79 private CacheFactory cacheFactory; 80 private TopicConnection topicConnection; 81 private TopicSession topicSession; 82 private Topic topic; 83 private JMSHubPublisher publisher; 84 private JMSHubConsumer consumer; 85 private SettingsBean settings; 86 87 88 private boolean isConnected; 89 90 93 private boolean isReconnecting; 94 95 98 private boolean isInitialized = false; 99 100 102 private JMSHubLookup jmsServerLookup; 103 104 107 private Thread jmsServerLookupThread; 108 109 110 protected JMSHub () { 111 publisher = new JMSHubPublisher(); 112 logger.debug ("JMSHub singleton successfully instanciated!"); 113 } 114 115 119 public void onException (JMSException ex) { 120 logger.warn (ex); 121 122 jmsConnectionFailure (); 124 } 125 126 131 public void reconnect () 132 throws JahiaInitializationException { 133 if (!isInitialized) { 134 logger.debug ("JMSHub not initialized! Skip reconnection process"); 135 return; 136 } 137 138 if (isReconnecting) { 139 logger.debug ("There is already a reconnection pending!"); 140 return; 141 } 142 143 disconnect (); 144 connect (); 145 } 146 147 153 public synchronized void connect () 154 throws JahiaInitializationException { 155 if (!isInitialized) { 156 logger.debug ("JMS not initialized! Skip connection process."); 157 return; 158 } 159 160 if (isReconnecting) 162 return; 163 164 try { 165 internalConnect (); 166 167 } catch (JMSConnectionException e) { 168 logger.info ("Could not get a connection to the JMS Server on host [" + 169 settings.lookupString (SettingsBean.JMS_CONTEXT_PROVIDER_URL) + 170 "]. Try to connect later"); 171 172 startJMSServerLookupThread (); 175 } 176 } 177 178 179 185 protected void internalConnect () 186 throws JahiaInitializationException, JMSConnectionException { 187 logger.info ("Connecting to the JMS Server..."); 188 if (settings == null) { 189 throw new JahiaInitializationException ( 190 "Need the Jahia settings to connect to the JMS server."); 191 } 192 193 Context context = getContext (settings); 195 logger.debug("Context is " + context.toString()); 196 197 TopicConnectionFactory factory = getConnectionFactory (context); 199 topic = getTopic (context); 200 201 topicConnection = getTopicConnection (factory); 203 topicSession = getSession (); 204 205 try { 208 topicConnection.setExceptionListener (this); 209 210 logger.debug("Session is " + topicSession.toString() + ", client ID=" + topicConnection.getClientID()); 211 } catch (JMSException ex) { 212 logger.warn ("Could not set the JMSHub as a ExceptionListener. Exceptions will not be catched!", ex); 213 } 214 215 connectConsumer (); 217 publisher.connect (this); 218 219 isConnected = true; 220 isReconnecting = false; 221 logger.info ("Successfully connected to the JMS Server."); 222 } 223 224 226 public synchronized void disconnect () { 227 if (!isInitialized) { 228 logger.debug ("JMSHub not initialized! Skip disconnection process."); 229 return; 230 } 231 232 logger.info ("Disconnecting from the JMS Server..."); 233 234 if (isReconnecting) { 235 stopJMSServerLookupThread (); 236 } 237 238 try { 239 isConnected = false; 240 241 publisher.disconnect (); 243 disconnectConsumer (); 244 245 if (topicSession != null) 247 topicSession.close (); 248 249 if (topicConnection != null) { 251 topicConnection.close (); 252 } 253 254 logger.info ("Successfully disconnected from JMS Server"); 255 256 } catch (JMSException ex) { 257 logger.warn ("Could not close the connection with the JMS Server.", ex); 258 259 } finally { 260 topicSession = null; 263 topicConnection = null; 264 topic = null; 265 } 266 } 267 268 269 275 public boolean sendFlushMessage (Cache cache) { 276 return sendMessage (cache, JMSCacheMessage.FLUSH_EVENT, null, null); 277 } 278 279 280 287 public boolean sendRemoveMessage (Cache cache, Object entryKey) { 288 return sendMessage (cache, JMSCacheMessage.REMOVE_EVENT, entryKey, null); 289 } 290 291 292 300 public boolean sendPutMessage (Cache cache, Object entryKey, Object entryValue) { 301 return sendMessage (cache, JMSCacheMessage.PUT_EVENT, entryKey, entryValue); 302 } 303 304 305 312 public static String messageToString (MapMessage message) { 313 314 try { 315 String cacheName = message.getString (JMSCacheMessage.CACHE_NAME_KEY); 316 String eventName = message.getString (JMSCacheMessage.EVENT_NAME_KEY); 317 Object entryKey = message.getObject (JMSCacheMessage.ENTRY_KEY); 318 String clientID = message.getString (JMSCacheMessage.CLIENT_KEY); 319 320 StringBuffer buffer = new StringBuffer ("message (cache:"); 321 buffer.append (cacheName); 322 buffer.append (", event:"); 323 buffer.append (eventName); 324 buffer.append (", entryKey:"); 325 buffer.append ((entryKey == null ? "null" : entryKey.toString ())); 326 buffer.append (", clientID:"); 327 buffer.append (clientID); 328 buffer.append (")"); 329 return buffer.toString (); 330 331 } catch (JMSException e) { 332 return ""; 333 } 334 } 335 336 337 341 public TopicSession getTopicSession () { 342 return topicSession; 343 } 344 345 346 350 public Topic getTopic () { 351 return topic; 352 } 353 354 355 359 public TopicConnection getTopicConnection () { 360 return topicConnection; 361 } 362 363 364 368 protected void jmsConnectionFailure () { 369 disconnect (); 370 startJMSServerLookupThread (); 371 } 372 373 384 public void init (SettingsBean settings, CacheFactory cacheFactory) 385 throws JahiaInitializationException { 386 this.cacheFactory = cacheFactory; 387 if (settings == null) { 389 logger.warn ("Cannot use null settings to initialize the cache!!"); 390 throw new JahiaInitializationException ("null settings!"); 391 } 392 this.settings = settings; 393 394 isConnected = false; 397 isReconnecting = false; 398 isInitialized = true; 399 400 logger.debug ("JMS Hub successfully initialized!"); 402 } 403 404 405 411 private void connectConsumer () 412 throws JahiaInitializationException { 413 if (!isInitialized) { 414 logger.debug ("JMS not initialized! Skip Message Consumer connection."); 415 return; 416 } 417 418 consumer = new JMSHubConsumer (); 421 consumer.init (this); 422 } 423 424 425 427 private void disconnectConsumer () { 428 consumer = null; 430 } 431 432 433 443 private boolean sendMessage (Cache cache, int messageType, Object entryKey, Object entryValue) { 444 if (!isInitialized) { 445 logger.debug ("JMS not initialized! Skip sending message."); 446 return false; 447 } 448 449 if (cache == null) { 450 logger.debug ("cannot handle null cache"); 451 return false; 452 } 453 454 if ((messageType != JMSCacheMessage.FLUSH_EVENT) && (entryKey == null)) { 455 logger.debug ("cannot handle null entryKey"); 456 return false; 457 } 458 459 if (isConnected) { 460 if (publisher != null) { 461 462 JMSCacheMessage message = null; 463 switch (messageType) { 464 case JMSCacheMessage.PUT_EVENT: 466 message = JMSCacheMessage.createPutMessage (cache, entryKey, entryValue); 467 break; 468 469 case JMSCacheMessage.REMOVE_EVENT: 471 message = JMSCacheMessage.createRemoveMessage (cache, entryKey); 472 break; 473 474 case JMSCacheMessage.FLUSH_EVENT: 476 message = JMSCacheMessage.createFlushMessage (cache); 477 break; 478 } 479 publisher.publishMessage (message); 480 return true; 481 482 } else { 483 logger.error ("JMS Publisher thread is null even if synchronization is enabled. This is really bad news!!!"); 484 return false; 485 } 486 487 } else { 488 } 490 491 return true; 493 } 494 495 public void sendMessagesNow() { 496 publisher.sendNow(); 497 } 498 499 500 507 private TopicSession getSession () 508 throws JahiaInitializationException { 509 try { 510 TopicSession topicSession = topicConnection.createTopicSession ( 512 false, Session.AUTO_ACKNOWLEDGE); 513 514 if (topicSession == null) { 515 throw new JahiaInitializationException ( 516 "Could not get the TopicSession instance!"); 517 } 518 519 return topicSession; 520 521 } catch (JMSException ex) { 522 logger.fatal (ex); 523 throw new JahiaInitializationException ( 524 "JMS Exception while getting the Topic Session", ex); 525 } 526 } 527 528 529 537 private TopicConnection getTopicConnection (TopicConnectionFactory factory) 538 throws JahiaInitializationException { 539 try { 540 TopicConnection topicConnection = factory.createTopicConnection (); 542 if (topicConnection == null) { 543 throw new JahiaInitializationException ( 544 "Could not get the TopicConnection instance!"); 545 } 546 topicConnection.start (); 547 return topicConnection; 548 549 } catch (JMSException ex) { 550 logger.fatal (ex); 551 throw new JahiaInitializationException ( 552 "JMS Exception while initializing JMS Hub", ex); 553 } 554 } 555 556 557 565 private Topic getTopic (Context context) 566 throws JMSConnectionException 567 { 568 String topicName = settings.lookupString (SettingsBean.JMS_TOPIC_NAME); 569 try { 570 return (Topic )context.lookup (topicName); 572 573 } catch (NamingException ex) { 574 575 logger.warn ("Could not lookup the specified [" + topicName + 576 "] topic. Check if the has been previously configured in the JMS Server!", ex); 577 578 throw new JMSConnectionException ( 579 "Could not get the topic " + topicName + "] instance."); 580 } 581 } 582 583 584 593 private TopicConnectionFactory getConnectionFactory (Context context) 594 throws JMSConnectionException 595 { 596 String factoryName = 597 settings.lookupString (SettingsBean.JMS_TOPIC_CONNECTION_FACTORY_NAME); 598 try { 599 return (TopicConnectionFactory )context.lookup (factoryName); 601 602 } catch (NamingException ex) { 603 logger.warn ("Could not lookup the connection factor name [" + factoryName + 604 "]. Check if the connection factory has been correctly configured in the jahia.properties file.", ex); 605 606 throw new JMSConnectionException ( 607 "Could not get the factory [" + factoryName + "] instance."); 608 } 609 } 610 611 612 622 private Context getContext (SettingsBean settings) 623 throws JMSConnectionException 624 { 625 String initalContextName = settings.lookupString( 626 SettingsBean.JMS_INITIAL_CONTEXT); 627 String providerUrl = settings.lookupString (SettingsBean.JMS_CONTEXT_PROVIDER_URL); 628 629 try { 630 Hashtable properties = new Hashtable (); 631 632 properties.put (Context.INITIAL_CONTEXT_FACTORY,initalContextName); 633 634 properties.put (Context.PROVIDER_URL,providerUrl); 635 636 logger.debug ("Try to connect to initial context (factory: " + 637 initalContextName + ", provider: " + providerUrl + ")"); 638 639 Context context = new InitialContext (properties); 640 logger.debug ("Could successfully retrieve the initial context"); 641 return context; 642 643 } catch (NamingException ex) { 644 645 667 throw new JMSConnectionException ("Could not get the InitialContext [" + 668 initalContextName + "] on provider [" + providerUrl + "].", ex); 669 } 670 671 } 672 673 674 677 private void startJMSServerLookupThread () { 678 if (!isReconnecting) { 679 isReconnecting = true; 680 jmsServerLookup = new JMSHubLookup (this, 681 settings.lookupLong(SettingsBean.JMS_SERVER_LOOKUP_SLEEP_TIME)); 682 jmsServerLookupThread = new Thread (jmsServerLookup); 683 jmsServerLookupThread.setName ("JMS Reconnection"); 684 jmsServerLookupThread.start (); 685 } 686 } 687 688 689 692 private synchronized void stopJMSServerLookupThread () { 693 694 if ((jmsServerLookup != null) && (jmsServerLookupThread != null)) { 695 696 logger.info ("Stopping JMS Server lookup thread"); 697 jmsServerLookup.stop (); 698 synchronized (jmsServerLookupThread) { 699 jmsServerLookupThread.notify(); 700 } 701 702 try { 703 jmsServerLookupThread.join (); 704 } catch (InterruptedException ex) { 705 } 707 708 jmsServerLookupThread = null; 709 jmsServerLookup = null; 710 } 711 isReconnecting = false; 712 } 713 714 public CacheFactory getCacheFactory() { 715 return cacheFactory; 716 } 717 718 public SettingsBean getSettings () { 719 return settings; 720 } 721 722 } 723 | Popular Tags |