1 22 package org.jboss.mq.server; 23 24 import java.util.HashMap ; 25 import java.util.Iterator ; 26 import java.util.LinkedList ; 27 28 import javax.jms.InvalidDestinationException ; 29 import javax.jms.JMSException ; 30 31 import org.jboss.logging.Logger; 32 import org.jboss.util.threadpool.ThreadPool; 33 import org.jboss.mq.AcknowledgementRequest; 34 import org.jboss.mq.ConnectionToken; 35 import org.jboss.mq.ReceiveRequest; 36 import org.jboss.mq.SpyMessage; 37 import org.jboss.mq.Subscription; 38 39 48 public class ClientConsumer implements Runnable 49 { 50 private static Logger log = Logger.getLogger(ClientConsumer.class); 51 JMSDestinationManager server; 53 ConnectionToken connectionToken; 55 boolean enabled; 57 boolean closed = false; 59 HashMap subscriptions = new HashMap (); 61 HashMap removedSubscriptions = new HashMap (); 63 64 LinkedList blockedSubscriptions = new LinkedList (); 65 66 private LinkedList messages = new LinkedList (); 68 69 72 private boolean enqueued = false; 73 74 76 80 private ThreadPool threadPool = null; 81 82 84 public ClientConsumer(JMSDestinationManager server, ConnectionToken connectionToken) throws JMSException 85 { 86 this.server = server; 87 this.connectionToken = connectionToken; 88 this.threadPool = server.getThreadPool(); 89 } 90 91 public void setEnabled(boolean enabled) throws JMSException 92 { 93 if (log.isTraceEnabled()) 94 log.trace("" + this +"->setEnabled(enabled=" + enabled + ")"); 95 96 synchronized (blockedSubscriptions) 98 { 99 this.enabled = enabled; 100 if (enabled) 101 { 102 for (Iterator it = blockedSubscriptions.iterator(); it.hasNext();) 103 { 104 Subscription sub = (Subscription) it.next(); 105 JMSDestination dest = server.getJMSDestination(sub.destination); 106 if (dest != null) 107 dest.addReceiver(sub); 108 } 109 blockedSubscriptions.clear(); 110 } 111 } 112 } 113 114 public void queueMessageForSending(RoutedMessage r) 115 { 116 117 synchronized (messages) 118 { 119 if (closed) 120 return; 122 messages.add(r); 123 if (!enqueued) 124 { 125 threadPool.run(this); 126 enqueued = true; 127 } 128 } 129 } 130 131 public void addSubscription(Subscription req) throws JMSException 132 { 133 if (log.isTraceEnabled()) 134 log.trace("Adding subscription for: " + req); 135 req.connectionToken = connectionToken; 136 req.clientConsumer = this; 137 138 JMSDestination jmsdest = server.getJMSDestination(req.destination); 139 if (jmsdest == null) 140 throw new InvalidDestinationException ("The destination " + req.destination + " does not exist !"); 141 142 jmsdest.addSubscriber(req); 143 144 synchronized (subscriptions) 145 { 146 subscriptions.put(new Integer (req.subscriptionId), req); 147 } 148 } 149 150 public void close() 151 { 152 boolean trace = log.isTraceEnabled(); 153 if (trace) 154 log.trace("" + this +"->close()"); 155 156 synchronized (messages) 157 { 158 closed = true; 159 if (enqueued) 160 { 161 enqueued = false; 162 } 163 messages.clear(); 164 } 165 166 HashMap subscriptionsClone = null; 168 synchronized (subscriptions) 169 { 170 subscriptionsClone = (HashMap ) subscriptions.clone(); 171 } 172 Iterator i = subscriptionsClone.keySet().iterator(); 173 while (i.hasNext()) 174 { 175 Integer subscriptionId = (Integer ) i.next(); 176 try 177 { 178 removeSubscription(subscriptionId.intValue()); 179 } 180 catch (JMSException ignore) 181 { 182 } 183 } 184 185 HashMap removedSubsClone = null; 187 synchronized (subscriptions) 188 { 189 removedSubsClone = (HashMap ) removedSubscriptions.clone(); 190 } 191 i = removedSubsClone.values().iterator(); 192 while (i.hasNext()) 193 { 194 Subscription removed = (Subscription) i.next(); 195 JMSDestination queue = server.getJMSDestination(removed.destination); 196 if (queue == null) 197 log.warn("The subscription was registered with a destination that does not exist: " + removed); 198 else 199 { 200 try 201 { 202 queue.nackMessages(removed); 203 } 204 catch (JMSException e) 205 { 206 log.warn("Unable to nack removed subscription: " + removed, e); 207 } 208 } 209 removeRemovedSubscription(removed.subscriptionId); 211 } 212 } 213 214 public SpyMessage receive(int subscriberId, long wait) throws JMSException 215 { 216 Subscription req = getSubscription(subscriberId); 217 if (req == null) 218 { 219 throw new JMSException ("The provided subscription does not exist"); 220 } 221 222 JMSDestination queue = server.getJMSDestination(req.destination); 223 if (queue == null) 224 throw new InvalidDestinationException ("The subscription's destination " + req.destination + " does not exist"); 225 226 if (addBlockedSubscription(req, wait)) 228 return queue.receive(req, (wait != -1)); 229 230 return null; 231 } 232 233 public void removeSubscription(int subscriptionId) throws JMSException 234 { 235 if (log.isTraceEnabled()) 236 log.trace("" + this +"->removeSubscription(subscriberId=" + subscriptionId + ")"); 237 238 Integer subId = new Integer (subscriptionId); 239 Subscription req; 240 synchronized (subscriptions) 241 { 242 req = (Subscription) subscriptions.remove(subId); 243 if (req != null) 244 removedSubscriptions.put(subId, req); 245 } 246 247 if (req == null) 248 throw new JMSException ("The subscription had not been previously registered"); 249 250 JMSDestination queue = server.getPossiblyClosingJMSDestination(req.destination); 251 if (queue == null) 252 throw new InvalidDestinationException ("The subscription was registered with a destination that does not exist !"); 253 254 queue.removeSubscriber(req); 255 256 } 257 258 261 public void run() 262 { 263 try 264 { 265 266 ReceiveRequest[] job; 267 268 synchronized (messages) 269 { 270 if (closed) 271 return; 272 273 job = new ReceiveRequest[messages.size()]; 274 Iterator iter = messages.iterator(); 275 for (int i = 0; iter.hasNext(); i++) 276 { 277 RoutedMessage rm = (RoutedMessage) iter.next(); 278 job[i] = rm.toReceiveRequest(); 279 iter.remove(); 280 } 281 enqueued = false; 282 } 283 284 connectionToken.clientIL.receive(job); 285 286 } 287 catch (Throwable t) 288 { 289 synchronized (messages) 290 { 291 if (closed) 292 log.warn("Could not send messages to a receiver.", t); 293 else 294 log.trace("Could not send messages to a receiver. It is closed.", t); 295 } 296 try 297 { 298 server.connectionFailure(connectionToken); 299 } 300 catch (Throwable ignore) 301 { 302 log.warn("Could not close the client connection..", ignore); 303 } 304 } 305 } 306 307 public String toString() 308 { 309 return "ClientConsumer:" + connectionToken.getClientID(); 310 } 311 312 public void acknowledge(AcknowledgementRequest request, org.jboss.mq.pm.Tx txId) throws JMSException 313 { 314 Subscription sub = retrieveSubscription(request.subscriberId); 315 316 if (sub == null) 317 { 318 synchronized (subscriptions) 320 { 321 sub = (Subscription) removedSubscriptions.get(new Integer (request.subscriberId)); 322 } 323 } 324 325 if (sub == null) 326 { 327 throw new JMSException ("The provided subscription does not exist"); 328 } 329 330 JMSDestination queue = server.getJMSDestination(sub.destination); 331 if (queue == null) 332 throw new InvalidDestinationException ("The subscription's destination " + sub.destination + " does not exist"); 333 334 queue.acknowledge(request, sub, txId); 335 } 336 337 boolean addBlockedSubscription(Subscription sub, long wait) 338 { 339 synchronized (blockedSubscriptions) 340 { 341 if (enabled == false && wait != -1) 342 blockedSubscriptions.add(sub); 343 return enabled; 344 } 345 } 346 347 void removeRemovedSubscription(int subId) 348 { 349 Subscription sub = null; 350 synchronized (subscriptions) 351 { 352 sub = (Subscription) removedSubscriptions.remove(new Integer (subId)); 353 } 354 if (sub != null) 355 { 356 JMSDestination topic = server.getPossiblyClosingJMSDestination(sub.destination); 357 if (topic != null && topic instanceof JMSTopic) 358 ((JMSTopic) topic).cleanupSubscription(sub); 359 } 360 } 361 362 367 public Subscription getSubscription(int subscriberId) throws JMSException 368 { 369 Subscription req = retrieveSubscription(subscriberId); 370 if (req == null) 371 throw new JMSException ("The provided subscription does not exist"); 372 373 return req; 374 } 375 376 379 private Subscription retrieveSubscription(int subscriberId) throws JMSException 380 { 381 Integer id = new Integer (subscriberId); 382 synchronized (subscriptions) 383 { 384 return (Subscription) subscriptions.get(id); 385 } 386 } 387 } 388 | Popular Tags |