1 38 39 40 package org.jahia.services.cache; 41 42 import org.jahia.exceptions.JahiaInitializationException; 43 import org.jahia.services.cache.simplecache.SimpleCache; 44 45 import javax.jms.*; 46 import java.io.ByteArrayInputStream ; 47 import java.io.ObjectInputStream ; 48 import java.io.IOException ; 49 import java.util.Hashtable ; 50 51 52 63 class JMSHubConsumer implements MessageListener { 64 65 66 final private static org.apache.log4j.Logger logger = 67 org.apache.log4j.Logger.getLogger (JMSHubConsumer.class); 68 69 70 private Hashtable caches; 71 72 private String hubClientID; 73 private CacheFactory cacheFactory; 74 75 76 private TopicSubscriber topicSubscriber; 77 78 79 public JMSHubConsumer () { 80 caches = new Hashtable (); 82 83 logger.debug ("JMSHubConsumer successfully instanciated!"); 85 } 86 87 88 96 protected void init (JMSHub hub) 97 throws JahiaInitializationException 98 { 99 logger.info("Initializing the JMS Message Consumer..."); 100 if (hub == null) { 101 throw new JahiaInitializationException( 102 "Cannot initialize the JMS Message Consumer with a null JMSHub instance!"); 103 } 104 105 cacheFactory = hub.getCacheFactory(); 106 107 try { 109 hubClientID = hub.getTopicConnection().getClientID(); 110 111 topicSubscriber = hub.getTopicSession().createSubscriber ( 113 hub.getTopic(), null, false); 114 if (topicSubscriber != null) { 115 topicSubscriber.setMessageListener (this); 116 logger.debug ("JMS Hub successfully set as a listenenr on topic ["+ 117 hub.getTopic().toString() +"]"); 118 119 } else { 120 throw new JahiaInitializationException( 121 "Could not instanciate a TopicSubscriber instance."); 122 } 123 124 } catch (JMSException ex) { 125 logger.warn ("JMSException while initializing the JMS Consumer Worker", ex); 126 } 127 128 logger.info("JMS Message Consumer successfully initialized! Ready to receive message."); 129 } 130 131 136 public void onMessage (Message message) { 137 if (message == null) 139 return; 140 141 if (!(message instanceof MapMessage)) { 143 logger.debug("Non MapMessage mesages are not supported yet."); 144 return; 145 } 146 147 MapMessage msg = (MapMessage)message; 149 150 try { 152 String clientID = msg.getString (JMSCacheMessage.CLIENT_KEY); 155 if (clientID != null) { 156 if (clientID.equals (hubClientID)) { 157 logger.debug("ignoring message, cyclic message!"); 158 return; 159 } 160 } 161 162 boolean isPacked = msg.getBoolean (JMSHub.PACKED_FLAG); 163 164 if (isPacked) 165 processPackedMessage (msg); 166 else 167 processSingleMessage (msg); 168 169 } catch (JMSException ex) { 170 logger.warn ("Could not extract the pack flag from the message", ex); 171 } 172 } 173 174 private void processPackedMessage (MapMessage message) { 175 176 try { 177 logger.debug("Unpacking messages... "); 178 long processingStartTime = System.currentTimeMillis(); 179 byte[] bytes = message.getBytes (JMSHub.PACKED_MSG_KEY); 181 ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream (bytes); 182 ObjectInputStream objectInputStream = new ObjectInputStream (byteArrayInputStream); 183 184 MapMessage[] messages = (MapMessage[])objectInputStream.readObject(); 185 186 logger.debug("Found "+ messages.length +" messages!"); 187 188 if (messages.length > 0) { 189 for (int i=0; i<messages.length; i++) { 190 processSingleMessage (messages[i]); 191 } 192 } 193 194 long processingElapsedTime = System.currentTimeMillis() - processingStartTime; 195 196 if (logger.isInfoEnabled()) { 197 logger.info("Processed " + messages.length + 198 " incoming JMS messages in " + 199 processingElapsedTime + " ms"); 200 } 201 202 } catch (JMSException e) { 203 logger.warn ("got an JMSException, skipping message parsing", e); 204 return; 205 206 } catch (IOException ioe) { 207 logger.warn ("got an IOException, skipping message parsing", ioe); 208 return; 209 210 } catch (ClassNotFoundException cnfe) { 211 logger.warn ("got an ClassNotFoundException, skipping message parsing", cnfe); 212 return; 213 } 214 } 215 216 private void processSingleMessage (MapMessage message) { 217 218 String cacheName; 220 int eventType; 221 Object entryKey; 222 Object entryValue = null; 223 224 if (message == null) { 225 return; 226 } 227 try { 228 cacheName = message.getString (JMSCacheMessage.CACHE_NAME_KEY); 229 eventType = message.getInt (JMSCacheMessage.EVENT_NAME_KEY); 230 231 byte[] serializedEntryKey = message.getBytes (JMSCacheMessage.ENTRY_KEY); 232 ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream (serializedEntryKey); 233 ObjectInputStream objectInputStream = new ObjectInputStream (byteArrayInputStream); 234 entryKey = objectInputStream.readObject(); 235 236 byte[] serializedEntryValue = message.getBytes (JMSCacheMessage.ENTRY_VALUE); 237 if (serializedEntryValue != null) { 238 ByteArrayInputStream byteArrayInputStreamValue = new 239 ByteArrayInputStream (serializedEntryValue); 240 ObjectInputStream objectInputStreamValue = new 241 ObjectInputStream (byteArrayInputStreamValue); 242 entryValue = objectInputStreamValue.readObject(); 243 } 244 245 logger.debug("Received "+ JMSHub.messageToString (message)); 246 247 248 } catch (JMSException e) { 249 logger.warn ("got an JMSException, skipping message parsing", e); 250 return; 251 252 } catch (IOException ioe) { 253 logger.warn ("got an IOException, skipping message parsing", ioe); 254 return; 255 256 } catch (ClassNotFoundException cnfe) { 257 logger.warn ("got an ClassNotFoundException, skipping message parsing", cnfe); 258 return; 259 } 260 261 262 if ((eventType != JMSCacheMessage.FLUSH_EVENT) && (entryKey == null)) { 264 logger.debug("null entryKey, skip process"); 265 return; 266 } 267 268 SimpleCache cache = (SimpleCache)caches.get (cacheName); 270 271 if (cache == null) { 273 logger.debug("Cache ["+ cacheName +"] is not registered in the JMS Hub. Try to register it"); 274 275 cache = (SimpleCache)cacheFactory.getCache (cacheName); 277 if (cache != null) { 278 caches.put (cache.getName(), cache); 279 280 } else { 281 logger.debug("Could not found the cache in the Cache Factory. Skip message."); 282 return; 283 } 284 } 285 286 switch (eventType) { 288 case JMSCacheMessage.REMOVE_EVENT: 289 cache.onRemove (entryKey); 290 break; 291 292 case JMSCacheMessage.PUT_EVENT: 293 cache.onPut (entryKey, entryValue); 294 break; 295 296 case JMSCacheMessage.FLUSH_EVENT: 297 cache.onFlush (); 298 break; 299 } 300 } 301 302 303 305 public void shutdown () { 306 logger.info("Shutting down the JMS Message Consumer.."); 307 308 if (topicSubscriber == null) 310 return; 311 312 try { 314 topicSubscriber.close(); 315 logger.info("JMS Message Consumer successfully shut down."); 316 317 } catch (JMSException ex) { 318 logger.warn ("Could not shut down the JMS Message Consumer", ex); 319 320 } finally { 321 topicSubscriber = null; 323 cacheFactory = null; 324 hubClientID = null; 325 } 326 } 327 } 328 | Popular Tags |