1 6 7 package org.jfox.jms.connector; 8 9 import java.util.ArrayList ; 10 import java.util.Collections ; 11 import java.util.HashMap ; 12 import java.util.Iterator ; 13 import java.util.List ; 14 import java.util.Map ; 15 import java.util.Random ; 16 import javax.jms.Destination ; 17 import javax.jms.IllegalStateException ; 18 import javax.jms.InvalidDestinationException ; 19 import javax.jms.JMSException ; 20 import javax.jms.JMSSecurityException ; 21 import javax.jms.Message ; 22 import javax.jms.Queue ; 23 import javax.jms.TemporaryQueue ; 24 import javax.jms.TemporaryTopic ; 25 import javax.jms.Topic ; 26 27 import org.jfox.ioc.connector.AbstractContainer; 28 import org.jfox.ioc.ext.ActiveComponent; 29 import org.jfox.ioc.util.Marshaller; 30 import org.jfox.jms.JMSConnectionRemote; 31 import org.jfox.jms.message.JMSMessage; 32 33 34 37 38 public class JMSContainerImpl extends AbstractContainer implements JMSContainer, ActiveComponent { 39 42 private static Random rand = new Random (); 43 46 private JMSDestinations destinations = JMSDestinations.getInstance(); 47 48 51 private static List <JMSMessage> tempMessages = new ArrayList <JMSMessage>(); 52 53 57 private Map <String , ConnectionMeta> connections = new HashMap <String , ConnectionMeta>(); 58 59 private String user = ""; 60 private String password = ""; 61 62 public JMSContainerImpl() { 63 64 } 65 66 public Class getHandlerClass() { 67 return JMSHandler.class; 68 } 69 70 public boolean auth(String userName, String password) throws JMSException { 71 if (user.equals("") && this.password.equals("")) { 72 return true; 73 } else { 74 if (user.equals(userName) && this.password.equals(password)) { 75 return true; 76 } else { 77 throw new JMSSecurityException ("auth error, used user=" + user + ", password=" + password); 78 } 79 } 80 } 81 82 public void setUser(String user) { 83 this.user = user; 84 } 85 86 public void setPassword(String password) { 87 this.password = password; 88 } 89 90 public Queue createQueue(java.lang.String queueName) throws JMSException { 91 return null; 92 } 93 94 public Topic createTopic(String topicName) throws JMSException { 95 return null; 96 } 97 98 public TemporaryQueue createTemporaryQueue() throws JMSException { 99 return null; 100 } 101 102 public TemporaryTopic createTemporaryTopic() throws JMSException { 103 return null; 104 } 105 106 public synchronized void registerConnection(String clientId, Object conn) throws JMSException { 107 JMSConnectionRemote jmsconn = (JMSConnectionRemote) Marshaller.unmarshall(conn); 108 logger.debug("register jms client connection: " + clientId + " => " + jmsconn); 109 if (connections.containsKey(clientId)) { 110 throw new JMSException ("connection clientId " + clientId + " already registered."); 111 } 112 ConnectionMeta meta = new ConnectionMeta(clientId, jmsconn); 113 connections.put(clientId, meta); 114 } 115 116 public synchronized void unregisterConnection(String clientId) throws JMSException { 117 logger.debug("unregisterConnection clientId=" + clientId); 118 ConnectionMeta meta = connections.remove(clientId); 119 if (meta != null) { 120 meta.close(); 121 } 122 } 123 124 public synchronized boolean isConnectionRegistered(String clientId) { 125 return connections.containsKey(clientId); 126 } 127 128 public void registerSession(String connectionId, String sessionId) throws JMSException { 129 logger.debug("register jms session, sessionId = " + sessionId + ", clientId = " + connectionId); 130 if (!connections.containsKey(connectionId)) { 131 throw new JMSException ("connection clientId " + connectionId + " not registered."); 132 } 133 connections.get(connectionId).registerSession(sessionId); 134 } 135 136 public void registerConsumer(String clientId, String sessionId, String consumerId, Destination destnation) throws JMSException { 137 logger.debug("register jms consumer, consumerId =" + consumerId + ", sessionId =" + sessionId + " ,clientId = " + clientId); 138 if (!connections.containsKey(clientId)) { 139 throw new JMSException ("connection clientId " + clientId + " not registered."); 140 } 141 SessionMeta sessionMeta = getSession(clientId, sessionId); 142 ConsumerMeta meta = sessionMeta.registerCunsumer(consumerId); 143 destinations.registerConsumer(destnation, meta); 144 synchronized (this) { 145 notifyAll(); 146 } 147 } 148 149 156 public void sendMessage(JMSMessage message) throws JMSException { 157 logger.debug("receive message: " + message); 158 Destination destination = message.getJMSDestination(); 159 if (!destinations.isDestinationRegistered(destination)) { 160 throw new InvalidDestinationException ("Destination " + destination + " not exists."); 161 } 162 163 tempMessages.add((JMSMessage) message); 164 165 synchronized (this) { 166 notifyAll(); 167 } 168 } 169 170 175 public void sendMessageBatch(JMSMessage[] messages) throws JMSException { 176 throw new JMSException ("not support now!"); 177 } 178 179 188 public JMSMessage receiveMessage(String clientId, String sessionId, String consumerId, long timeout) throws JMSException { 189 ConnectionMeta connMeta = getConnection(clientId); 190 if (!connMeta.isStarted()) { 191 throw new IllegalStateException ("connection " + clientId + " not started, can't receive message."); 192 } 193 ConsumerMeta meta = getConsumer(clientId, sessionId, consumerId); 194 JMSMessage message = meta.popMessage(); 195 while (message == null) { 196 try { 197 if (timeout == 0) { 198 Thread.sleep(50L); 199 } else if (timeout > 0) { 200 Thread.sleep(timeout); 201 timeout = -1; } else { 203 break; 204 } 205 } catch (Exception e) { 206 e.printStackTrace(); 207 } 208 message = meta.popMessage(); 209 } 210 logger.debug("receive message " + message + ", clientId=" + clientId + ", sessionId=" + sessionId + ", consumerId=" + consumerId); 211 return message; 212 } 213 214 public void acknowledge(String clientId, String sessionId, String consumerId, String messageId) throws JMSException { 215 logger.debug("acknowledge message: messageId=" + messageId + ", clientId=" + clientId + ", sessionId=" + sessionId + ", consumerId=" + consumerId); 216 ConsumerMeta meta = getConsumer(clientId, sessionId, consumerId); 217 meta.acknowlege(messageId); 218 } 219 220 public void startConnection(String clientId) throws JMSException { 221 logger.debug("startConnection clientId=" + clientId); 222 ConnectionMeta connMeta = getConnection(clientId); 223 connMeta.start(); 224 } 225 226 public void stopConnection(String clientId) throws JMSException { 227 logger.debug("stopConnection clientId=" + clientId); 228 ConnectionMeta connMeta = getConnection(clientId); 229 connMeta.stop(); 230 } 231 232 public void setConsumerAsync(String clientId, String sessionId, String consumerId, boolean async) throws JMSException { 233 ConsumerMeta meta = getConsumer(clientId, sessionId, consumerId); 234 meta.setAsync(async); 235 } 236 237 public void closeSession(String clientId, String sessionId) throws JMSException { 238 logger.debug("closeSession sessionId=" + sessionId + ", clientId=" + clientId); 239 SessionMeta meta = getSession(clientId, sessionId); 240 meta.close(); 241 } 242 243 protected void doInit() throws Exception { 244 new Thread (this, getName()).start(); 246 } 247 248 protected void doDestroy() throws Exception { 249 super.doDestroy(); 250 } 251 252 protected void doStart() throws Exception { 253 super.doStart(); 254 } 255 256 protected void doStop() throws Exception { 257 super.doStop(); 258 } 259 260 public void run() { 261 while (isStarted()) { 262 try { 263 while (beWait()) { 264 synchronized (this) { 265 wait(); 266 } 267 } 268 Collections.sort(tempMessages); 269 for (Iterator it = tempMessages.iterator(); it.hasNext();) { 270 JMSMessage message = (JMSMessage) it.next(); 272 Destination destination = message.getJMSDestination(); 273 274 if (destinations.hashConsumer(destination)) { 275 List <ConsumerMeta> consumers = destinations.getConsumerMetas(destination); 276 boolean isQueue = (destination instanceof Queue ) ? true : false; 277 if (isQueue) { 278 int index = rand.nextInt(consumers.size()); 280 ConsumerMeta meta = consumers.get(index); 281 meta.addMessage(message); 282 logger.debug("dispatch message " + message + " to consumer " + meta.getConsumerId()); 283 } else { 284 for (ConsumerMeta meta : consumers) { 286 meta.addMessage(message); 287 } 288 } 289 it.remove(); 290 } 291 } 292 } catch (Exception e) { 293 e.printStackTrace(); 294 } 295 } 296 297 } 298 299 307 private boolean beWait() throws JMSException { 308 if (tempMessages.isEmpty()) { 309 return true; 310 } else { 311 for (Message message : tempMessages) { 312 Destination destination = message.getJMSDestination(); 313 if (destinations.hashConsumer(destination)) { 314 return false; 315 } 316 } 317 return true; 318 } 319 } 320 321 private ConnectionMeta getConnection(String clientId) throws JMSException { 322 if (!connections.containsKey(clientId)) { 323 throw new JMSException ("connection clientId " + clientId + " not registered."); 324 } 325 ConnectionMeta connMeta = connections.get(clientId); 326 return connMeta; 327 } 328 329 private SessionMeta getSession(String clientId, String sessionId) throws JMSException { 330 ConnectionMeta connMeta = getConnection(clientId); 331 SessionMeta sessionMeta = connMeta.getSession(sessionId); 332 if (sessionMeta == null) { 333 throw new JMSException ("no session " + sessionId + " in connection " + clientId); 334 } 335 return sessionMeta; 336 } 337 338 private ConsumerMeta getConsumer(String clientId, String sessionId, String consumerId) throws JMSException { 339 SessionMeta sessionMeta = getSession(clientId, sessionId); 340 ConsumerMeta consumerMeta = sessionMeta.getConsumer(consumerId); 341 if (consumerMeta == null) { 342 throw new JMSException ("no consumer " + consumerId + " in session " + sessionId + " of connection " + clientId); 343 } 344 return consumerMeta; 345 } 346 347 public static void main(String [] args) { 348 349 } 350 351 } 352 | Popular Tags |