KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jahia > services > cache > JMSHubPublisherHandler


1 /*
2  * ____.
3  * __/\ ______| |__/\. _______
4  * __ .____| | \ | +----+ \
5  * _______| /--| | | - \ _ | : - \_________
6  * \\______: :---| : : | : | \________>
7  * |__\---\_____________:______: :____|____:_____\
8  * /_____|
9  *
10  * . . . i n j a h i a w e t r u s t . . .
11  *
12  *
13  *
14  * ----- BEGIN LICENSE BLOCK -----
15  * Version: JCSL 1.0
16  *
17  * The contents of this file are subject to the Jahia Community Source License
18  * 1.0 or later (the "License"); you may not use this file except in
19  * compliance with the License. You may obtain a copy of the License at
20  * http://www.jahia.org/license
21  *
22  * Software distributed under the License is distributed on an "AS IS" basis,
23  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
24  * for the rights, obligations and limitations governing use of the contents
25  * of the file. The Original and Upgraded Code is the Jahia CMS and Portal
26  * Server. The developer of the Original and Upgraded Code is JAHIA Ltd. JAHIA
27  * Ltd. owns the copyrights in the portions it created. All Rights Reserved.
28  *
29  * The Developer of the Shared Modifications is Jahia Solution Sarl.
30  * Portions created by the Initial Developer are Copyright (C) 2002 by the
31  * Initial Developer. All Rights Reserved.
32  *
33  * Contributor(s):
34  * 10-SEP-2003, Jahia Solutions Sarl, Fulco Houkes : Initial version
35  *
36  * ----- END LICENSE BLOCK -----
37  */

38
39 package org.jahia.services.cache;
40
41 import javax.jms.*;
42 import java.io.ByteArrayOutputStream JavaDoc;
43 import java.io.ObjectOutputStream JavaDoc;
44 import java.io.IOException JavaDoc;
45 import org.jahia.settings.SettingsBean;
46
47
48 /** <p>This class implements the Message Publisher's core process.</p>
49  * <p>Messages will be sent to multiple cache destinations, therefore a message queue is
50  * associate to each cache name. Those queues will only hold messages destinated to the
51  * associated cache. Each new message will be sent to the associated queue, which will
52  * add the message to the queue as well as performing some elimination rules on the
53  * existing stored messages (ie
54  * {@link org.jahia.services.cache.JMSMessageQueue JMSMessageQueue} class description).</p>
55  *
56  * <p>This handler must be initialized through the
57  * {@link org.jahia.services.cache.JMSHubPublisherHandler#init init()} method <b>before</b>
58  * starting it. Attempts to start this handler without proper initialization will not provide
59  * the publishing connection to the JMS server and the handler will not start.</p>
60  * <p>Stopping the handler has to be done with the
61  * {@link org.jahia.services.cache.JMSHubPublisherHandler#stop stop()} method, which will
62  * properly terminate the existing publishings processes and close the publishing connection
63  * to the JMS server.</p>
64  *
65  * @author Fulco Houkes, Copyright (c) 2003 by Jahia Ltd.
66  * @version 1.0
67  * @since Jahia 4.0
68  */

69 class JMSHubPublisherHandler implements Runnable JavaDoc {
70
71     /** logging. */
72     final private static org.apache.log4j.Logger logger =
73             org.apache.log4j.Logger.getLogger (JMSHubPublisherHandler.class);
74
75     private int maxMessagesPerPackage = 1000;
76
77     /** message queues, one queue per cache. */
78     // private Map queues;
79
private JMSMessageQueue jmsMessageQueue;
80
81     /** the TopicPublisher reference. */
82     private TopicPublisher topicPublisher;
83
84     /** <code>true</code> when the handler is running. */
85     private boolean running;
86
87     /** <code>true</code> when the handler has been initialized. */
88     private boolean initialized;
89
90     /** the reference to the JMSHub. */
91     private JMSHub jmsHub = null;
92
93
94
95     /** Default constructor, creates a new <code>JMSHubPublisherHandler</code> instance.*/
96     public JMSHubPublisherHandler () {
97
98         // queues = new InsertionSortedMap();
99
jmsMessageQueue = new JMSMessageQueue();
100
101         // log the instanciation
102
logger.debug ("JMSHubPublisherHandler successfully instanciated!");
103     }
104
105     /** This initialization method has to be called <b>before</b> the handler is started.
106      *
107      * @param hub the JMSHub reference where to retrieve the JMS Topic and Session.
108      *
109      * @throws JMSConnectionException
110      * when no publishing connection could me initiated with the JMS Server.
111      */

112     public synchronized void init (JMSHub hub)
113             throws JMSConnectionException
114     {
115         if (hub == null) {
116             throw new JMSConnectionException (
117                     "Cannont initialize the Message Publisher with a null JMSHub reference!");
118         }
119
120         jmsHub = hub;
121
122         if (jmsHub.getSettings().lookupInt(SettingsBean.JMS_MAX_MESSAGES_IN_PACKAGE) > 0) {
123             maxMessagesPerPackage = jmsHub.getSettings().lookupInt(SettingsBean.JMS_MAX_MESSAGES_IN_PACKAGE);
124         }
125
126         // Initialize the publisher to the related topic
127
try {
128             TopicSession session = jmsHub.getTopicSession();
129             if (session != null) {
130
131                 Topic topic = jmsHub.getTopic();
132                 if (topic == null) {
133                     throw new JMSConnectionException (
134                             "Could not get the Topic to which the Message Publisher should connect!!");
135                 }
136
137                 topicPublisher = session.createPublisher (jmsHub.getTopic());
138                 if (topicPublisher != null) {
139                     logger.debug ("Message Publisher successfully initialized on on topic ["+
140                             topic.toString() +"]");
141
142                 } else {
143                     throw new JMSConnectionException (
144                             "Could not instanciate a TopicSubscriber instance.");
145                 }
146
147             } else {
148                 throw new JMSConnectionException ("Could not get the JMS Topic Session!!");
149             }
150
151         } catch (JMSException ex) {
152             throw new JMSConnectionException (
153                     "JMS Exception while instanciating the TopicPublisher", ex);
154         }
155
156         initialized = true;
157         logger.info("JMSHubPublisher successfully initialized!");
158     }
159
160     /** Starts the handler. This method is automatically called when associating the
161      * handler to a thread and calling the <code>start()</code> method of that thread. Do
162      * call this method directly.
163      */

164     public void run () {
165         logger.info("Starting JMS Message Publisher....");
166         running = true;
167
168         if (!initialized)
169             return;
170
171         try {
172             while (running) {
173
174                JMSCacheMessage[] messages = null;
175                do {
176                synchronized (jmsMessageQueue) {
177                    messages = jmsMessageQueue.extractMessages(maxMessagesPerPackage);
178                }
179
180                    // the resulting message
181
MapMessage message = null;
182
183                    // send a single non-packed message
184
if (messages.length == 1) {
185                        computeSingleMessage(messages);
186
187                        // pack all the messages into one single message
188
} else if (messages.length > 1) {
189                        computeMultipleMessages(messages, message);
190                    }
191                } while (messages.length > 0);
192
193                 // wait until the next notification
194
synchronized (this) {
195                     try {
196                         wait(); // wait for next notify
197

198                     } catch (InterruptedException JavaDoc ie) {
199                         // it's what we were waiting for...
200
}
201                 }
202             } // while
203

204         } catch (Throwable JavaDoc t) {
205             logger.error ("Error while publishing JMS messages", t);
206         } finally {
207             // At this point, the thread is going to be stopped. Therefore all the
208
// allocated resources should be properly closed
209
shutdown();
210         }
211
212     }
213
214
215     /** Publish the specified message.
216      *
217      * @param message the message to publish
218      */

219     public synchronized void publishMessage (JMSCacheMessage message) {
220         if (!initialized) {
221             logger.debug ("Message Publisher not initialized; skip message publishing process.");
222             return;
223         }
224
225         if (!running) {
226             logger.debug ("Message Publisher is not running. Drop the message.");
227         }
228
229         if (message == null) {
230             logger.debug("Cannot publish a null message!");
231             return;
232         }
233
234         /*
235         // get the related cache queue, or create a new one if not already present
236         JMSMessageQueue queue = (JMSMessageQueue)queues.get (message.getCacheName());
237         if (queue == null) {
238             // the queue associate to the cache name does not exists, try to find
239             // the cache in the CacheFactory. If it is not existing, drop the message.
240             Cache tmpCache = jmsHub.getCacheFactory().getCache (message.getCacheName());
241             if (tmpCache != null) {
242                 queue = new JMSMessageQueue();
243                 synchronized (queues) {
244                     queues.put(tmpCache.getName(), queue);
245                 }
246
247             } else {
248                 // the cache could not be found, drop the message
249                 logger.debug("Couldn't find cache " + message.getCacheName() + " in cache factory, dropping packet !");
250                 return;
251             }
252         }
253        queue.add (message);
254         */

255
256        jmsMessageQueue.add(message);
257
258
259         // notify the thread there is a new message
260
// notifyAll();
261
}
262
263     public synchronized void sendNow() {
264         notifyAll();
265     }
266
267     /** Stop the Message Publisher.
268      */

269     public synchronized void stop () {
270         running = false;
271         notifyAll();
272     }
273
274     /** Check if the Message Publisher is still running.
275      *
276      * @return <code>true</code> when the Message Publisher is still running, otherwise
277      * <code>false</code>.
278      */

279     public boolean isRunning () {
280         return running;
281     }
282
283
284     /** Properly closes all the handler's associated resources and JMS publishing connection.
285      */

286     private void shutdown () {
287
288         initialized = false;
289
290         // no need to proceed if there is not publisher instance
291
if (topicPublisher == null)
292             return;
293
294         // try to close the connection
295
try {
296             topicPublisher.close();
297             logger.info("JMS Message Publisher successfully shut down.");
298
299         } catch (JMSException ex) {
300             logger.warn ("Could not shut down the JMS Message Publisher", ex);
301
302         } finally {
303             // let's kill definitively the beast
304
topicPublisher = null;
305             jmsHub = null;
306         }
307         logger.info("JMS Message Publisher successfully shutted down.");
308     }
309
310     /** Packs all the messages in <code>messages</code> together in a single
311      * <code>MapMessage</code> instance. Once this message has been created and initialized,
312      * it will be published to the JMS Server.
313      *
314      * @param messages the messages to be packed together
315      * @param message the <code>MapMessage</code> instance to be used.
316      */

317     private void computeMultipleMessages (JMSCacheMessage[] messages,
318                                           MapMessage message)
319     {
320         logger.debug("Packing "+ messages.length +
321                          " messages");
322
323         MapMessage[] tmp = new MapMessage[messages.length];
324         for (int i=0; i<tmp.length; i++) {
325             tmp[i] = messages[i].computeMapMessage (jmsHub);
326         }
327
328         // create the message
329
try {
330             message = jmsHub.getTopicSession().createMapMessage();
331
332             // stream the array of MapMessage
333
ByteArrayOutputStream JavaDoc byteArrayOutputStream = null;
334             try {
335                 byteArrayOutputStream = new ByteArrayOutputStream JavaDoc();
336                 ObjectOutputStream JavaDoc objectOutputStream =
337                         new ObjectOutputStream JavaDoc (byteArrayOutputStream);
338                 objectOutputStream.writeObject (tmp);
339
340             } catch (IOException JavaDoc ex) {
341                 logger.warn ("Could not stream the packed MapMessage array :(", ex);
342             }
343             message.setObject (JMSHub.PACKED_MSG_KEY,
344                     byteArrayOutputStream.toByteArray());
345             message.setString (JMSCacheMessage.CLIENT_KEY,
346                     jmsHub.getTopicConnection().getClientID());
347
348             // flag to true as we handle a packed message
349
message.setBoolean (JMSHub.PACKED_FLAG, true);
350
351         } catch (JMSException ex) {
352             logger.warn ("JMS exception while creating the packed message", ex);
353         }
354
355
356         // CAUTION: Keep the publishing of the message in a
357
// separate try-catch statement, as to detect JMS server deconnection
358
// or JMS session timeouts !!
359
try {
360             topicPublisher.publish (message, DeliveryMode.NON_PERSISTENT, 1, 0);
361             logger.debug ("Sent packed messages ("+ tmp.length +
362                           ") to the JMS server");
363
364         } catch (JMSException ex) {
365             logger.warn ("Could not send message to JMS server, try to reconnect to the server!");
366             if (ex.getLinkedException() != null) {
367                 logger.warn ("Root exception: "+ ex.getLinkedException().getClass().getName());
368             } else {
369                 logger.warn ("No linked exception");
370             }
371             jmsHub.jmsConnectionFailure();
372             running = false;
373             throw new RuntimeException JavaDoc(ex);
374         }
375     }
376
377     /** Publish a single message to the JMS Server.
378      *
379      * @param messages the message to be published
380      * @param cacheName the cache name
381      */

382     private void computeSingleMessage (JMSCacheMessage[] messages) {
383         MapMessage message = messages[0].computeMapMessage (jmsHub);
384
385         try {
386             // flag to false as we handle a single message
387
message.setBoolean (JMSHub.PACKED_FLAG, false);
388
389             topicPublisher.publish (message, DeliveryMode.NON_PERSISTENT, 1, 0);
390             logger.debug ("Sent single "+ JMSHub.messageToString (message) +
391                     " to JMS server");
392
393
394         } catch (JMSException ex) {
395             logger.warn("Could not publish the message", ex);
396         }
397     }
398
399 }
400
Popular Tags