1 38 39 package org.jahia.services.cache; 40 41 import javax.jms.*; 42 import java.io.ByteArrayOutputStream ; 43 import java.io.ObjectOutputStream ; 44 import java.io.IOException ; 45 import org.jahia.settings.SettingsBean; 46 47 48 69 class JMSHubPublisherHandler implements Runnable { 70 71 72 final private static org.apache.log4j.Logger logger = 73 org.apache.log4j.Logger.getLogger (JMSHubPublisherHandler.class); 74 75 private int maxMessagesPerPackage = 1000; 76 77 78 private JMSMessageQueue jmsMessageQueue; 80 81 82 private TopicPublisher topicPublisher; 83 84 85 private boolean running; 86 87 88 private boolean initialized; 89 90 91 private JMSHub jmsHub = null; 92 93 94 95 96 public JMSHubPublisherHandler () { 97 98 jmsMessageQueue = new JMSMessageQueue(); 100 101 logger.debug ("JMSHubPublisherHandler successfully instanciated!"); 103 } 104 105 112 public synchronized void init (JMSHub hub) 113 throws JMSConnectionException 114 { 115 if (hub == null) { 116 throw new JMSConnectionException ( 117 "Cannont initialize the Message Publisher with a null JMSHub reference!"); 118 } 119 120 jmsHub = hub; 121 122 if (jmsHub.getSettings().lookupInt(SettingsBean.JMS_MAX_MESSAGES_IN_PACKAGE) > 0) { 123 maxMessagesPerPackage = jmsHub.getSettings().lookupInt(SettingsBean.JMS_MAX_MESSAGES_IN_PACKAGE); 124 } 125 126 try { 128 TopicSession session = jmsHub.getTopicSession(); 129 if (session != null) { 130 131 Topic topic = jmsHub.getTopic(); 132 if (topic == null) { 133 throw new JMSConnectionException ( 134 "Could not get the Topic to which the Message Publisher should connect!!"); 135 } 136 137 topicPublisher = session.createPublisher (jmsHub.getTopic()); 138 if (topicPublisher != null) { 139 logger.debug ("Message Publisher successfully initialized on on topic ["+ 140 topic.toString() +"]"); 141 142 } else { 143 throw new JMSConnectionException ( 144 "Could not instanciate a TopicSubscriber instance."); 145 } 146 147 } else { 148 throw new JMSConnectionException ("Could not get the JMS Topic Session!!"); 149 } 150 151 } catch (JMSException ex) { 152 throw new JMSConnectionException ( 153 "JMS Exception while instanciating the TopicPublisher", ex); 154 } 155 156 initialized = true; 157 logger.info("JMSHubPublisher successfully initialized!"); 158 } 159 160 164 public void run () { 165 logger.info("Starting JMS Message Publisher...."); 166 running = true; 167 168 if (!initialized) 169 return; 170 171 try { 172 while (running) { 173 174 JMSCacheMessage[] messages = null; 175 do { 176 synchronized (jmsMessageQueue) { 177 messages = jmsMessageQueue.extractMessages(maxMessagesPerPackage); 178 } 179 180 MapMessage message = null; 182 183 if (messages.length == 1) { 185 computeSingleMessage(messages); 186 187 } else if (messages.length > 1) { 189 computeMultipleMessages(messages, message); 190 } 191 } while (messages.length > 0); 192 193 synchronized (this) { 195 try { 196 wait(); 198 } catch (InterruptedException ie) { 199 } 201 } 202 } 204 } catch (Throwable t) { 205 logger.error ("Error while publishing JMS messages", t); 206 } finally { 207 shutdown(); 210 } 211 212 } 213 214 215 219 public synchronized void publishMessage (JMSCacheMessage message) { 220 if (!initialized) { 221 logger.debug ("Message Publisher not initialized; skip message publishing process."); 222 return; 223 } 224 225 if (!running) { 226 logger.debug ("Message Publisher is not running. Drop the message."); 227 } 228 229 if (message == null) { 230 logger.debug("Cannot publish a null message!"); 231 return; 232 } 233 234 255 256 jmsMessageQueue.add(message); 257 258 259 } 262 263 public synchronized void sendNow() { 264 notifyAll(); 265 } 266 267 269 public synchronized void stop () { 270 running = false; 271 notifyAll(); 272 } 273 274 279 public boolean isRunning () { 280 return running; 281 } 282 283 284 286 private void shutdown () { 287 288 initialized = false; 289 290 if (topicPublisher == null) 292 return; 293 294 try { 296 topicPublisher.close(); 297 logger.info("JMS Message Publisher successfully shut down."); 298 299 } catch (JMSException ex) { 300 logger.warn ("Could not shut down the JMS Message Publisher", ex); 301 302 } finally { 303 topicPublisher = null; 305 jmsHub = null; 306 } 307 logger.info("JMS Message Publisher successfully shutted down."); 308 } 309 310 317 private void computeMultipleMessages (JMSCacheMessage[] messages, 318 MapMessage message) 319 { 320 logger.debug("Packing "+ messages.length + 321 " messages"); 322 323 MapMessage[] tmp = new MapMessage[messages.length]; 324 for (int i=0; i<tmp.length; i++) { 325 tmp[i] = messages[i].computeMapMessage (jmsHub); 326 } 327 328 try { 330 message = jmsHub.getTopicSession().createMapMessage(); 331 332 ByteArrayOutputStream byteArrayOutputStream = null; 334 try { 335 byteArrayOutputStream = new ByteArrayOutputStream (); 336 ObjectOutputStream objectOutputStream = 337 new ObjectOutputStream (byteArrayOutputStream); 338 objectOutputStream.writeObject (tmp); 339 340 } catch (IOException ex) { 341 logger.warn ("Could not stream the packed MapMessage array :(", ex); 342 } 343 message.setObject (JMSHub.PACKED_MSG_KEY, 344 byteArrayOutputStream.toByteArray()); 345 message.setString (JMSCacheMessage.CLIENT_KEY, 346 jmsHub.getTopicConnection().getClientID()); 347 348 message.setBoolean (JMSHub.PACKED_FLAG, true); 350 351 } catch (JMSException ex) { 352 logger.warn ("JMS exception while creating the packed message", ex); 353 } 354 355 356 try { 360 topicPublisher.publish (message, DeliveryMode.NON_PERSISTENT, 1, 0); 361 logger.debug ("Sent packed messages ("+ tmp.length + 362 ") to the JMS server"); 363 364 } catch (JMSException ex) { 365 logger.warn ("Could not send message to JMS server, try to reconnect to the server!"); 366 if (ex.getLinkedException() != null) { 367 logger.warn ("Root exception: "+ ex.getLinkedException().getClass().getName()); 368 } else { 369 logger.warn ("No linked exception"); 370 } 371 jmsHub.jmsConnectionFailure(); 372 running = false; 373 throw new RuntimeException (ex); 374 } 375 } 376 377 382 private void computeSingleMessage (JMSCacheMessage[] messages) { 383 MapMessage message = messages[0].computeMapMessage (jmsHub); 384 385 try { 386 message.setBoolean (JMSHub.PACKED_FLAG, false); 388 389 topicPublisher.publish (message, DeliveryMode.NON_PERSISTENT, 1, 0); 390 logger.debug ("Sent single "+ JMSHub.messageToString (message) + 391 " to JMS server"); 392 393 394 } catch (JMSException ex) { 395 logger.warn("Could not publish the message", ex); 396 } 397 } 398 399 } 400 | Popular Tags |