KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > rift > coad > daemon > messageservice > MessageServiceImpl


1 /*
2  * MessageService: The message service daemon
3  * Copyright (C) 2006 Rift IT Contracting
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18  *
19  * MessageServiceImpl.java
20  */

21
22 // package path
23
package com.rift.coad.daemon.messageservice;
24
25
26 // java imports
27
import com.rift.coad.lib.thread.pool.PoolException;
28 import java.util.List JavaDoc;
29 import java.util.ArrayList JavaDoc;
30 import java.util.Iterator JavaDoc;
31 import java.rmi.Remote JavaDoc;
32 import java.rmi.RemoteException JavaDoc;
33 import javax.naming.InitialContext JavaDoc;
34 import javax.naming.Context JavaDoc;
35 import javax.transaction.UserTransaction JavaDoc;
36 import javax.transaction.TransactionManager JavaDoc;
37 import javax.transaction.Transaction JavaDoc;
38
39 // logging import
40
import org.apache.log4j.Logger;
41
42 // hibernate imports
43
import org.hibernate.*;
44 import org.hibernate.cfg.*;
45
46
47 // coadunation imports
48
import com.rift.coad.lib.bean.BeanRunnable;
49 import com.rift.coad.lib.configuration.Configuration;
50 import com.rift.coad.lib.configuration.ConfigurationFactory;
51 import com.rift.coad.lib.thread.ThreadStateMonitor;
52 import com.rift.coad.lib.thread.pool.ThreadPoolManager;
53 import com.rift.coad.util.change.ChangeLog;
54 import com.rift.coad.util.transaction.UserTransactionWrapper;
55 import com.rift.coad.hibernate.util.HibernateUtil;
56 import com.rift.coad.daemon.messageservice.db.*;
57 import com.rift.coad.daemon.messageservice.message.MessageManagerFactory;
58 import com.rift.coad.daemon.messageservice.message.MessageManagerImpl;
59 import com.rift.coad.daemon.messageservice.named.NamedMemoryQueue;
60 /**
61  * The implementation of the Message Service management interface.
62  *
63  * @author Brett Chaldecott
64  */

65 public class MessageServiceImpl implements MessageService, BeanRunnable {
66     
67     // class constants
68
private final static String JavaDoc THREAD_POOL_SIZE = "thread_pool_size";
69     private final static int DEFAULT_THREAD_POOL_SIZE = 10;
70     private final static String JavaDoc THREAD_POOL_USER = "thread_pool_user";
71     
72     // the logger reference
73
protected Logger log =
74         Logger.getLogger(MessageServiceImpl.class.getName());
75     
76     // private member variables
77
private ThreadStateMonitor state = new ThreadStateMonitor();
78     private Configuration config = null;
79     private Context JavaDoc context = null;
80     private ThreadPoolManager threadPoolManager = null;
81     private UserTransactionWrapper utw = null;
82     private List JavaDoc initialEntries = null;
83     
84     /**
85      * Creates a new instance of MessageServiceImpl
86      *
87      * @exception MessageServiceException
88      */

89     public MessageServiceImpl() throws MessageServiceException {
90         try {
91             config = ConfigurationFactory.getInstance().getConfig(
92                     MessageServiceImpl.class);
93             context = new InitialContext JavaDoc();
94             utw = new UserTransactionWrapper();
95             log.info("Reading in and applying change log information, " +
96                     "this may take some time");
97             ChangeLog.init(MessageServiceImpl.class);
98             initialEntries = getDbMessageList();
99             threadPoolManager = new ThreadPoolManager((int)
100                     config.getLong(THREAD_POOL_SIZE,DEFAULT_THREAD_POOL_SIZE),
101                     MessageProcessor.class, config.getString(THREAD_POOL_USER));
102         } catch (Exception JavaDoc ex) {
103             log.error("Failed to instanciate the " +
104                     "message service : " + ex.getMessage(),ex);
105             throw new MessageServiceException("Failed to instanciate the " +
106                     "message service : " + ex.getMessage(),ex);
107         }
108         
109     }
110     
111     
112     /**
113      * This method returns the thread pool size.
114      *
115      * @return The size of the thread pool.
116      * @exception RemoteException
117      * @exception MessageServiceException
118      */

119     public int getThreadPoolSize() throws RemoteException JavaDoc,
120             MessageServiceException {
121         return threadPoolManager.getSize();
122     }
123     
124     
125     /**
126      * This method sets the size of the thread pool.
127      *
128      * @param size The new size of the thread pool.
129      * @exception RemoteException
130      * @exception MessageServiceException
131      */

132     public void setThreadPoolSize(int size) throws RemoteException JavaDoc,
133             MessageServiceException {
134         try {
135             threadPoolManager.setSize(size);
136         } catch (Exception JavaDoc ex) {
137             log.error("Failed to set the size : " + ex.getMessage(),ex);
138             throw new MessageServiceException("Failed to set the size : " +
139                     ex.getMessage(),ex);
140         }
141     }
142     
143     
144     /**
145      * This method lists the named queues.
146      *
147      * @return The list of named queues.
148      * @exception RemoteException
149      * @exception MessageServiceException
150      */

151     public List JavaDoc listNamedQueues() throws RemoteException JavaDoc,
152             MessageServiceException {
153         return NamedMemoryQueue.listQueues();
154     }
155     
156     
157     /**
158      * This method returns the list of messages in the named queue.
159      *
160      * @return The list of messages for this queue.
161      * @param queueName The name of the queue to list messages for.
162      * @exception RemoteException
163      * @exception MessageServiceException
164      */

165     public List JavaDoc listMessagesForNamedQueue(String JavaDoc queueName) throws
166             RemoteException JavaDoc, MessageServiceException {
167         List JavaDoc namedQueues = NamedMemoryQueue.listQueues();
168         if (!namedQueues.contains(queueName)) {
169             throw new MessageServiceException("The queue [" + queueName +
170                     "] does not exist.");
171         }
172         return NamedMemoryQueue.getInstance(queueName).getMessages();
173     }
174     
175     
176     /**
177      * This purges the messages from the named queue
178      *
179      * @param queueName The name of the queue to purge.
180      * @exception RemoteException
181      * @exception MessageServiceException
182      */

183     public void purgeNamedQueue(String JavaDoc queueName) throws RemoteException JavaDoc,
184             MessageServiceException {
185         List JavaDoc namedQueues = NamedMemoryQueue.listQueues();
186         if (!namedQueues.contains(queueName)) {
187             throw new MessageServiceException("The queue [" + queueName +
188                     "] does not exist.");
189         }
190         NamedMemoryQueue.getInstance(queueName).purge();
191     }
192     
193     
194     /**
195      * This method is responsible for performing the processing.
196      */

197     public void process() {
198         try {
199             for (Iterator JavaDoc iter = initialEntries.iterator(); iter.hasNext();) {
200                 String JavaDoc messageId = (String JavaDoc)iter.next();
201                 try {
202                     log.info("Load message : " + messageId);
203                     utw.begin();
204                     Session session = HibernateUtil.getInstance(
205                             MessageServiceImpl.class).getSession();
206                     com.rift.coad.daemon.messageservice.db.Message message =
207                             (com.rift.coad.daemon.messageservice.db.Message)
208                             session.get(com.rift.coad.daemon.messageservice.
209
                            db.Message.class,messageId);
210                     MessageManager messageManager =
211                             MessageManagerFactory.getInstance().
212                             getMessageManager(messageId);
213                     MessageQueue messageQueue = MessageQueueManager.getInstance().
214                             getQueue(MessageQueueManager.UNSORTED);
215                     messageQueue.addMessage(messageManager);
216                     utw.commit();
217                     ProcessMonitor.getInstance().notifyProcessor();
218                 } catch (Exception JavaDoc ex) {
219                     log.error("Failed to retrieve the message : " +
220                             ex.getMessage(),ex);
221                 } finally {
222                     utw.release();
223                 }
224             }
225             try {
226                 ChangeLog.getInstance().start();
227             } catch (Exception JavaDoc ex) {
228                 log.error("Failed to start the change log processing : "
229                         + ex.getMessage(),ex);
230             }
231             while(!state.isTerminated()) {
232                 
233                 // wait indefinitly
234
state.monitor();
235             }
236         } catch (Exception JavaDoc ex) {
237             log.error("The processing failed in the message service because : "
238                     + ex.getMessage(),ex);
239         }
240     }
241     
242     
243     /**
244      * This method is called to terminate the processing of this object.
245      */

246     public void terminate() {
247         try {
248             threadPoolManager.terminate();
249         } catch (PoolException ex) {
250             log.error("Failed to terminate the thread pool : " + ex.getMessage(),
251                     ex);
252         }
253         state.terminate(true);
254         try {
255             ProcessMonitor.getInstance().terminate();
256         } catch (Exception JavaDoc ex) {
257             log.error("Failed to terminate the processor: " + ex.getMessage(),
258                     ex);
259         }
260         try {
261             log.info("Waiting for all changes to be dumped");
262             ChangeLog.terminate();
263             log.info("Changes have been dumped.");
264         } catch (Exception JavaDoc ex) {
265             log.error("Failed to shut down the change log : "
266                     + ex.getMessage(),ex);
267         }
268     }
269     
270     
271     /**
272      * This method returns a list db entries.
273      */

274     private List JavaDoc getDbMessageList() {
275         boolean startedTransaction = false;
276         List JavaDoc dbEntries = new ArrayList JavaDoc();
277         try {
278             utw.begin();
279             startedTransaction = true;
280             Session session = HibernateUtil.getInstance(
281                     MessageServiceImpl.class).getSession();
282             List JavaDoc messages = session.createQuery(
283                     "FROM Message as message").list();
284             for (Iterator JavaDoc iter = messages.iterator(); iter.hasNext();) {
285                 com.rift.coad.daemon.messageservice.db.Message msg =
286                         (com.rift.coad.daemon.messageservice.db.Message)
287                         iter.next();
288                 dbEntries.add(msg.getId());
289             }
290             
291             utw.commit();
292             startedTransaction = false;
293         } catch (Exception JavaDoc ex) {
294             log.error("Failed to load the list of messages from the db : " +
295                     ex.getMessage(),ex);
296         } finally {
297             utw.release();
298         }
299         return dbEntries;
300     }
301 }
302
Popular Tags