KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jahia > services > cache > JMSHubConsumer


1 /*
2  * ____.
3  * __/\ ______| |__/\. _______
4  * __ .____| | \ | +----+ \
5  * _______| /--| | | - \ _ | : - \_________
6  * \\______: :---| : : | : | \________>
7  * |__\---\_____________:______: :____|____:_____\
8  * /_____|
9  *
10  * . . . i n j a h i a w e t r u s t . . .
11  *
12  *
13  *
14  * ----- BEGIN LICENSE BLOCK -----
15  * Version: JCSL 1.0
16  *
17  * The contents of this file are subject to the Jahia Community Source License
18  * 1.0 or later (the "License"); you may not use this file except in
19  * compliance with the License. You may obtain a copy of the License at
20  * http://www.jahia.org/license
21  *
22  * Software distributed under the License is distributed on an "AS IS" basis,
23  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
24  * for the rights, obligations and limitations governing use of the contents
25  * of the file. The Original and Upgraded Code is the Jahia CMS and Portal
26  * Server. The developer of the Original and Upgraded Code is JAHIA Ltd. JAHIA
27  * Ltd. owns the copyrights in the portions it created. All Rights Reserved.
28  *
29  * The Developer of the Shared Modifications is Jahia Solution Sarl.
30  * Portions created by the Initial Developer are Copyright (C) 2002 by the
31  * Initial Developer. All Rights Reserved.
32  *
33  * Contributor(s):
34  * 05-SEP-2003, Jahia Solutions Sarl, Fulco Houkes : Initial version
35  *
36  * ----- END LICENSE BLOCK -----
37  */

38
39
40 package org.jahia.services.cache;
41
42 import org.jahia.exceptions.JahiaInitializationException;
43 import org.jahia.services.cache.simplecache.SimpleCache;
44
45 import javax.jms.*;
46 import java.io.ByteArrayInputStream JavaDoc;
47 import java.io.ObjectInputStream JavaDoc;
48 import java.io.IOException JavaDoc;
49 import java.util.Hashtable JavaDoc;
50
51
52 /** <p>This class implements a JMS Message Consumer.</p>
53  * <p>Messages will be sent to the
54  * {@link org.jahia.services.cache.JMSHubConsumer#onMessage onMessage()} method, which will
55  * handle single or packed message and finally sent the message to the approriate cache.
56  * Cyclic messages (messages comming from the same Jahia server which sent the message) are
57  * automatically dropped.</p>
58  *
59  * @author Fulco Houkes, Copyright (c) 2003 by Jahia Ltd.
60  * @version 1.0
61  * @since Jahia 4.0
62  */

63 class JMSHubConsumer implements MessageListener {
64
65     /** logging. */
66     final private static org.apache.log4j.Logger logger =
67             org.apache.log4j.Logger.getLogger (JMSHubConsumer.class);
68
69     /** references to the caches. */
70     private Hashtable JavaDoc caches;
71
72     private String JavaDoc hubClientID;
73     private CacheFactory cacheFactory;
74
75     /** the consumer connection to the server. */
76     private TopicSubscriber topicSubscriber;
77
78     /** Default constructor, creates a new <code>JMSHubConsumer</code> instance.*/
79     public JMSHubConsumer () {
80         // instanciate the caches hashtable
81
caches = new Hashtable JavaDoc();
82
83         // log the instanciation
84
logger.debug ("JMSHubConsumer successfully instanciated!");
85     }
86
87
88     /** Initializes the Message Consumer by initializing a new consumer connection with
89      * the JMS Server.
90      *
91      * @param hub the reference to the <code>JMSHub</code>, which holds the JMS
92      * connection session.
93      *
94      * @throws JahiaInitializationException
95      */

96     protected void init (JMSHub hub)
97             throws JahiaInitializationException
98     {
99         logger.info("Initializing the JMS Message Consumer...");
100         if (hub == null) {
101             throw new JahiaInitializationException(
102                     "Cannot initialize the JMS Message Consumer with a null JMSHub instance!");
103         }
104
105         cacheFactory = hub.getCacheFactory();
106
107         // Initialize the subscription to the related topic
108
try {
109             hubClientID = hub.getTopicConnection().getClientID();
110
111             // get the subscriber instance from the session
112
topicSubscriber = hub.getTopicSession().createSubscriber (
113                     hub.getTopic(), null, false);
114             if (topicSubscriber != null) {
115                 topicSubscriber.setMessageListener (this);
116                 logger.debug ("JMS Hub successfully set as a listenenr on topic ["+
117                         hub.getTopic().toString() +"]");
118
119             } else {
120                 throw new JahiaInitializationException(
121                         "Could not instanciate a TopicSubscriber instance.");
122             }
123
124         } catch (JMSException ex) {
125             logger.warn ("JMSException while initializing the JMS Consumer Worker", ex);
126         }
127
128         logger.info("JMS Message Consumer successfully initialized! Ready to receive message.");
129     }
130
131     /** This message is called automatically by the JMS client when a message was
132      * received from the JMS Server.
133      *
134      * @param message the received message to be processed.
135      */

136     public void onMessage (Message message) {
137         // don't know what to do with null messages
138
if (message == null)
139             return;
140
141         // only MapMessages are allowed in this current implementation
142
if (!(message instanceof MapMessage)) {
143             logger.debug("Non MapMessage mesages are not supported yet.");
144             return;
145         }
146
147         // cast once for all
148
MapMessage msg = (MapMessage)message;
149
150         // check if it is a packed of single message
151
try {
152             // first get the client ID to know if we can skip all the rest:
153
// check if the received message was not been sent by this client
154
String JavaDoc clientID = msg.getString (JMSCacheMessage.CLIENT_KEY);
155             if (clientID != null) {
156                 if (clientID.equals (hubClientID)) {
157                     logger.debug("ignoring message, cyclic message!");
158                      return;
159                 }
160             }
161
162             boolean isPacked = msg.getBoolean (JMSHub.PACKED_FLAG);
163
164             if (isPacked)
165                 processPackedMessage (msg);
166             else
167                 processSingleMessage (msg);
168
169         } catch (JMSException ex) {
170             logger.warn ("Could not extract the pack flag from the message", ex);
171         }
172     }
173
174     private void processPackedMessage (MapMessage message) {
175
176         try {
177             logger.debug("Unpacking messages... ");
178             long processingStartTime = System.currentTimeMillis();
179             // extract all the MapMessage messages
180
byte[] bytes = message.getBytes (JMSHub.PACKED_MSG_KEY);
181             ByteArrayInputStream JavaDoc byteArrayInputStream = new ByteArrayInputStream JavaDoc (bytes);
182             ObjectInputStream JavaDoc objectInputStream = new ObjectInputStream JavaDoc (byteArrayInputStream);
183
184             MapMessage[] messages = (MapMessage[])objectInputStream.readObject();
185
186             logger.debug("Found "+ messages.length +" messages!");
187
188             if (messages.length > 0) {
189                 for (int i=0; i<messages.length; i++) {
190                     processSingleMessage (messages[i]);
191                 }
192             }
193
194             long processingElapsedTime = System.currentTimeMillis() - processingStartTime;
195
196             if (logger.isInfoEnabled()) {
197                 logger.info("Processed " + messages.length +
198                             " incoming JMS messages in " +
199                             processingElapsedTime + " ms");
200             }
201
202         } catch (JMSException e) {
203             logger.warn ("got an JMSException, skipping message parsing", e);
204             return;
205
206         } catch (IOException JavaDoc ioe) {
207             logger.warn ("got an IOException, skipping message parsing", ioe);
208             return;
209
210         } catch (ClassNotFoundException JavaDoc cnfe) {
211             logger.warn ("got an ClassNotFoundException, skipping message parsing", cnfe);
212             return;
213         }
214     }
215
216     private void processSingleMessage (MapMessage message) {
217
218         // First get the message properties
219
String JavaDoc cacheName;
220         int eventType;
221         Object JavaDoc entryKey;
222         Object JavaDoc entryValue = null;
223
224         if (message == null) {
225             return;
226         }
227         try {
228             cacheName = message.getString (JMSCacheMessage.CACHE_NAME_KEY);
229             eventType = message.getInt (JMSCacheMessage.EVENT_NAME_KEY);
230
231             byte[] serializedEntryKey = message.getBytes (JMSCacheMessage.ENTRY_KEY);
232             ByteArrayInputStream JavaDoc byteArrayInputStream = new ByteArrayInputStream JavaDoc (serializedEntryKey);
233             ObjectInputStream JavaDoc objectInputStream = new ObjectInputStream JavaDoc (byteArrayInputStream);
234             entryKey = objectInputStream.readObject();
235
236             byte[] serializedEntryValue = message.getBytes (JMSCacheMessage.ENTRY_VALUE);
237             if (serializedEntryValue != null) {
238                 ByteArrayInputStream JavaDoc byteArrayInputStreamValue = new
239                     ByteArrayInputStream JavaDoc(serializedEntryValue);
240                 ObjectInputStream JavaDoc objectInputStreamValue = new
241                     ObjectInputStream JavaDoc(byteArrayInputStreamValue);
242                 entryValue = objectInputStreamValue.readObject();
243             }
244
245             logger.debug("Received "+ JMSHub.messageToString (message));
246
247
248         } catch (JMSException e) {
249             logger.warn ("got an JMSException, skipping message parsing", e);
250             return;
251
252         } catch (IOException JavaDoc ioe) {
253             logger.warn ("got an IOException, skipping message parsing", ioe);
254             return;
255
256         } catch (ClassNotFoundException JavaDoc cnfe) {
257             logger.warn ("got an ClassNotFoundException, skipping message parsing", cnfe);
258             return;
259         }
260
261
262         // check the entryKey is valid
263
if ((eventType != JMSCacheMessage.FLUSH_EVENT) && (entryKey == null)) {
264             logger.debug("null entryKey, skip process");
265             return;
266         }
267
268         // retrieve the associated cache
269
SimpleCache cache = (SimpleCache)caches.get (cacheName);
270
271         // skip the process if the cache could not be found
272
if (cache == null) {
273             logger.debug("Cache ["+ cacheName +"] is not registered in the JMS Hub. Try to register it");
274
275             // Retrieve the cache from the Cache Factory
276
cache = (SimpleCache)cacheFactory.getCache (cacheName);
277             if (cache != null) {
278                 caches.put (cache.getName(), cache);
279
280             } else {
281                 logger.debug("Could not found the cache in the Cache Factory. Skip message.");
282                 return;
283             }
284         }
285
286         // call the cache's method associated to the message's event
287
switch (eventType) {
288             case JMSCacheMessage.REMOVE_EVENT:
289                 cache.onRemove (entryKey);
290                 break;
291
292             case JMSCacheMessage.PUT_EVENT:
293                 cache.onPut (entryKey, entryValue);
294                 break;
295
296             case JMSCacheMessage.FLUSH_EVENT:
297                 cache.onFlush ();
298                 break;
299         }
300     }
301
302
303     /** Properly closes the consumer connection with the JMS Server.
304      */

305     public void shutdown () {
306         logger.info("Shutting down the JMS Message Consumer..");
307
308         // no need to proceed if there is not subscriber instance
309
if (topicSubscriber == null)
310             return;
311
312         // try to close the connection
313
try {
314             topicSubscriber.close();
315             logger.info("JMS Message Consumer successfully shut down.");
316
317         } catch (JMSException ex) {
318             logger.warn ("Could not shut down the JMS Message Consumer", ex);
319
320         } finally {
321             // let's kill definitively the beast
322
topicSubscriber = null;
323             cacheFactory = null;
324             hubClientID = null;
325         }
326     }
327 }
328
Popular Tags