1 21 22 package com.rift.coad.daemon.messageservice.named; 23 24 import java.rmi.Remote ; 26 import java.rmi.RemoteException ; 27 import java.util.ArrayList ; 28 import java.util.Date ; 29 import java.util.List ; 30 import java.util.HashMap ; 31 import java.util.Map ; 32 import java.util.Vector ; 33 import java.util.concurrent.ConcurrentHashMap ; 34 import java.util.concurrent.ConcurrentLinkedQueue ; 35 import javax.naming.Context ; 36 import javax.naming.InitialContext ; 37 import javax.transaction.SystemException ; 38 import javax.transaction.UserTransaction ; 39 import javax.transaction.Status ; 40 import javax.transaction.xa.XAException ; 41 import javax.transaction.xa.XAResource ; 42 import javax.transaction.xa.Xid ; 43 44 import org.apache.log4j.Logger; 46 47 import org.hibernate.*; 49 import org.hibernate.cfg.*; 50 51 import com.rift.coad.daemon.messageservice.db.*; 53 import com.rift.coad.daemon.messageservice.Message; 54 import com.rift.coad.daemon.messageservice.MessageManager; 55 import com.rift.coad.daemon.messageservice.MessageQueue; 56 import com.rift.coad.daemon.messageservice.MessageQueueManager; 57 import com.rift.coad.daemon.messageservice.MessageService; 58 import com.rift.coad.daemon.messageservice.MessageServiceException; 59 import com.rift.coad.daemon.messageservice.MessageServiceImpl; 60 import com.rift.coad.daemon.messageservice.NamedQueue; 61 import com.rift.coad.daemon.messageservice.TimeoutException; 62 import com.rift.coad.daemon.messageservice.message.MessageImpl; 63 import com.rift.coad.daemon.messageservice.message.MessageManagerFactory; 64 import com.rift.coad.daemon.messageservice.message.MessageManagerImpl; 65 import com.rift.coad.daemon.servicebroker.ServiceBroker; 66 import com.rift.coad.hibernate.util.HibernateUtil; 67 import com.rift.coad.lib.configuration.Configuration; 68 import com.rift.coad.lib.configuration.ConfigurationFactory; 69 import com.rift.coad.lib.Resource; 70 import com.rift.coad.lib.ResourceIndex; 71 import com.rift.coad.util.transaction.TransactionManager; 72 import com.rift.coad.util.lock.LockRef; 73 import com.rift.coad.util.lock.ObjectLockFactory; 74 import com.rift.coad.util.connection.ConnectionManager; 75 76 82 public class NamedQueueImpl implements NamedQueue,ResourceIndex,Resource { 83 84 private final static String TIMEOUT = "QUEUE_TIMEOUT"; 86 private final static long DEFAULT_TIMEOUT = 30000; 87 88 89 protected Logger log = 91 Logger.getLogger(NamedQueueImpl.class.getName()); 92 93 94 private String queueName = null; 96 private long maxTimeout = DEFAULT_TIMEOUT; 97 98 99 105 public NamedQueueImpl(String queueName) throws MessageServiceException { 106 try { 107 this.queueName = queueName; 108 Configuration config = ConfigurationFactory.getInstance().getConfig( 109 this.getClass()); 110 maxTimeout = config.getLong(TIMEOUT,DEFAULT_TIMEOUT); 111 } catch (Exception ex) { 112 log.error("Failed to init the Named Queue : " + ex.getMessage(),ex); 113 throw new MessageServiceException( 114 "Failed to init the Named Queue : " + ex.getMessage(),ex); 115 } 116 } 117 118 119 130 public Message receive(long delay) throws RemoteException , 131 MessageServiceException, TimeoutException { 132 try { 133 return NamedMemoryQueue.getInstance(queueName).poll(delay); 134 } catch (Throwable ex) { 135 log.error("Failed to retrieve message : " + ex.getMessage(),ex); 136 throw new MessageServiceException( 137 "Failed to retrieve message : " + ex.getMessage(),ex); 138 } 139 } 140 141 142 150 public void addService(String service) throws RemoteException , 151 MessageServiceException { 152 try { 153 Session session = HibernateUtil.getInstance( 154 MessageServiceImpl.class).getSession(); 155 com.rift.coad.daemon.messageservice.db.MessageQueue mq = 156 (com.rift.coad.daemon.messageservice.db.MessageQueue) 157 session.get(com.rift.coad.daemon.messageservice.db. 158 MessageQueue.class,queueName); 159 MessageQueueService messageQueueService = new MessageQueueService( 160 service,mq); 161 session.persist(messageQueueService); 162 ServiceBroker broker = (ServiceBroker)ConnectionManager. 163 getInstance().getConnection(ServiceBroker.class, 164 "ServiceBroker"); 165 List serviceList = new ArrayList (); 166 serviceList.add(service); 167 broker.registerService(MessageService.JNDI_URL,serviceList); 168 } catch (Exception ex) { 169 log.error("Failed to add a service : " + ex.getMessage(),ex); 170 throw new MessageServiceException( 171 "Failed to add a service : " + ex.getMessage(),ex); 172 } 173 } 174 175 176 185 public List listServices() throws RemoteException , 186 MessageServiceException { 187 try { 188 Session session = HibernateUtil.getInstance( 189 MessageServiceImpl.class).getSession(); 190 List entries = session.createQuery( 191 "SELECT mqs.service FROM MessageQueueService as mqs " + 192 "WHERE mqs.messageQueue.named = ?"). 193 setString(0,this.queueName).list(); 194 List result = new ArrayList (); 195 for (int index = 0; index < entries.size(); index++) { 196 result.add(((Object [])entries.get(index))[index]); 197 } 198 return result; 199 } catch (Exception ex) { 200 log.error("Failed to add a service : " + ex.getMessage(),ex); 201 throw new MessageServiceException( 202 "Failed to add a service : " + ex.getMessage(),ex); 203 } 204 } 205 206 207 214 public void removeService(String service) throws RemoteException , 215 MessageServiceException { 216 try { 217 Session session = HibernateUtil.getInstance( 218 MessageServiceImpl.class).getSession(); 219 session.createQuery( 220 "DELETE FROM MessageQueueService as mqs " + 221 "WHERE mqs.service = ? AND mqs.messageQueue.named = ?"). 222 setString(0,service).setString(1,this.queueName). 223 executeUpdate(); 224 ServiceBroker broker = (ServiceBroker)ConnectionManager. 225 getInstance().getConnection(ServiceBroker.class, 226 "ServiceBroker"); 227 List mqs = session.createQuery( 228 "FROM MessageQueueService as mqs " + 229 "WHERE mqs.service = ?").setString(0,service).list(); 230 if (mqs.size() == 0) { 231 List serviceList = new ArrayList (); 232 serviceList.add(service); 233 broker.removeServiceProviders(MessageService.JNDI_URL, 234 serviceList); 235 } 236 } catch (Exception ex) { 237 log.error("Failed to remove a service : " + ex.getMessage(),ex); 238 throw new MessageServiceException( 239 "Failed to remove a service : " + ex.getMessage(),ex); 240 } 241 } 242 243 244 250 public Object getPrimaryKey() { 251 return queueName; 252 } 253 254 255 260 public String getResourceName() { 261 return queueName; 262 } 263 264 265 271 public void releaseResource() { 272 273 } 274 275 } 276 | Popular Tags |