1 21 22 package com.rift.coad.daemon.messageservice; 24 25 26 import com.rift.coad.lib.thread.pool.PoolException; 28 import java.util.List ; 29 import java.util.ArrayList ; 30 import java.util.Iterator ; 31 import java.rmi.Remote ; 32 import java.rmi.RemoteException ; 33 import javax.naming.InitialContext ; 34 import javax.naming.Context ; 35 import javax.transaction.UserTransaction ; 36 import javax.transaction.TransactionManager ; 37 import javax.transaction.Transaction ; 38 39 import org.apache.log4j.Logger; 41 42 import org.hibernate.*; 44 import org.hibernate.cfg.*; 45 46 47 import com.rift.coad.lib.bean.BeanRunnable; 49 import com.rift.coad.lib.configuration.Configuration; 50 import com.rift.coad.lib.configuration.ConfigurationFactory; 51 import com.rift.coad.lib.thread.ThreadStateMonitor; 52 import com.rift.coad.lib.thread.pool.ThreadPoolManager; 53 import com.rift.coad.util.change.ChangeLog; 54 import com.rift.coad.util.transaction.UserTransactionWrapper; 55 import com.rift.coad.hibernate.util.HibernateUtil; 56 import com.rift.coad.daemon.messageservice.db.*; 57 import com.rift.coad.daemon.messageservice.message.MessageManagerFactory; 58 import com.rift.coad.daemon.messageservice.message.MessageManagerImpl; 59 import com.rift.coad.daemon.messageservice.named.NamedMemoryQueue; 60 65 public class MessageServiceImpl implements MessageService, BeanRunnable { 66 67 private final static String THREAD_POOL_SIZE = "thread_pool_size"; 69 private final static int DEFAULT_THREAD_POOL_SIZE = 10; 70 private final static String THREAD_POOL_USER = "thread_pool_user"; 71 72 protected Logger log = 74 Logger.getLogger(MessageServiceImpl.class.getName()); 75 76 private ThreadStateMonitor state = new ThreadStateMonitor(); 78 private Configuration config = null; 79 private Context context = null; 80 private ThreadPoolManager threadPoolManager = null; 81 private UserTransactionWrapper utw = null; 82 private List initialEntries = null; 83 84 89 public MessageServiceImpl() throws MessageServiceException { 90 try { 91 config = ConfigurationFactory.getInstance().getConfig( 92 MessageServiceImpl.class); 93 context = new InitialContext (); 94 utw = new UserTransactionWrapper(); 95 log.info("Reading in and applying change log information, " + 96 "this may take some time"); 97 ChangeLog.init(MessageServiceImpl.class); 98 initialEntries = getDbMessageList(); 99 threadPoolManager = new ThreadPoolManager((int) 100 config.getLong(THREAD_POOL_SIZE,DEFAULT_THREAD_POOL_SIZE), 101 MessageProcessor.class, config.getString(THREAD_POOL_USER)); 102 } catch (Exception ex) { 103 log.error("Failed to instanciate the " + 104 "message service : " + ex.getMessage(),ex); 105 throw new MessageServiceException("Failed to instanciate the " + 106 "message service : " + ex.getMessage(),ex); 107 } 108 109 } 110 111 112 119 public int getThreadPoolSize() throws RemoteException , 120 MessageServiceException { 121 return threadPoolManager.getSize(); 122 } 123 124 125 132 public void setThreadPoolSize(int size) throws RemoteException , 133 MessageServiceException { 134 try { 135 threadPoolManager.setSize(size); 136 } catch (Exception ex) { 137 log.error("Failed to set the size : " + ex.getMessage(),ex); 138 throw new MessageServiceException("Failed to set the size : " + 139 ex.getMessage(),ex); 140 } 141 } 142 143 144 151 public List listNamedQueues() throws RemoteException , 152 MessageServiceException { 153 return NamedMemoryQueue.listQueues(); 154 } 155 156 157 165 public List listMessagesForNamedQueue(String queueName) throws 166 RemoteException , MessageServiceException { 167 List namedQueues = NamedMemoryQueue.listQueues(); 168 if (!namedQueues.contains(queueName)) { 169 throw new MessageServiceException("The queue [" + queueName + 170 "] does not exist."); 171 } 172 return NamedMemoryQueue.getInstance(queueName).getMessages(); 173 } 174 175 176 183 public void purgeNamedQueue(String queueName) throws RemoteException , 184 MessageServiceException { 185 List namedQueues = NamedMemoryQueue.listQueues(); 186 if (!namedQueues.contains(queueName)) { 187 throw new MessageServiceException("The queue [" + queueName + 188 "] does not exist."); 189 } 190 NamedMemoryQueue.getInstance(queueName).purge(); 191 } 192 193 194 197 public void process() { 198 try { 199 for (Iterator iter = initialEntries.iterator(); iter.hasNext();) { 200 String messageId = (String )iter.next(); 201 try { 202 log.info("Load message : " + messageId); 203 utw.begin(); 204 Session session = HibernateUtil.getInstance( 205 MessageServiceImpl.class).getSession(); 206 com.rift.coad.daemon.messageservice.db.Message message = 207 (com.rift.coad.daemon.messageservice.db.Message) 208 session.get(com.rift.coad.daemon.messageservice. 209 db.Message.class,messageId); 210 MessageManager messageManager = 211 MessageManagerFactory.getInstance(). 212 getMessageManager(messageId); 213 MessageQueue messageQueue = MessageQueueManager.getInstance(). 214 getQueue(MessageQueueManager.UNSORTED); 215 messageQueue.addMessage(messageManager); 216 utw.commit(); 217 ProcessMonitor.getInstance().notifyProcessor(); 218 } catch (Exception ex) { 219 log.error("Failed to retrieve the message : " + 220 ex.getMessage(),ex); 221 } finally { 222 utw.release(); 223 } 224 } 225 try { 226 ChangeLog.getInstance().start(); 227 } catch (Exception ex) { 228 log.error("Failed to start the change log processing : " 229 + ex.getMessage(),ex); 230 } 231 while(!state.isTerminated()) { 232 233 state.monitor(); 235 } 236 } catch (Exception ex) { 237 log.error("The processing failed in the message service because : " 238 + ex.getMessage(),ex); 239 } 240 } 241 242 243 246 public void terminate() { 247 try { 248 threadPoolManager.terminate(); 249 } catch (PoolException ex) { 250 log.error("Failed to terminate the thread pool : " + ex.getMessage(), 251 ex); 252 } 253 state.terminate(true); 254 try { 255 ProcessMonitor.getInstance().terminate(); 256 } catch (Exception ex) { 257 log.error("Failed to terminate the processor: " + ex.getMessage(), 258 ex); 259 } 260 try { 261 log.info("Waiting for all changes to be dumped"); 262 ChangeLog.terminate(); 263 log.info("Changes have been dumped."); 264 } catch (Exception ex) { 265 log.error("Failed to shut down the change log : " 266 + ex.getMessage(),ex); 267 } 268 } 269 270 271 274 private List getDbMessageList() { 275 boolean startedTransaction = false; 276 List dbEntries = new ArrayList (); 277 try { 278 utw.begin(); 279 startedTransaction = true; 280 Session session = HibernateUtil.getInstance( 281 MessageServiceImpl.class).getSession(); 282 List messages = session.createQuery( 283 "FROM Message as message").list(); 284 for (Iterator iter = messages.iterator(); iter.hasNext();) { 285 com.rift.coad.daemon.messageservice.db.Message msg = 286 (com.rift.coad.daemon.messageservice.db.Message) 287 iter.next(); 288 dbEntries.add(msg.getId()); 289 } 290 291 utw.commit(); 292 startedTransaction = false; 293 } catch (Exception ex) { 294 log.error("Failed to load the list of messages from the db : " + 295 ex.getMessage(),ex); 296 } finally { 297 utw.release(); 298 } 299 return dbEntries; 300 } 301 } 302 | Popular Tags |