1 22 package org.jboss.mq.server; 23 24 import java.util.ArrayList ; 25 import java.util.Iterator ; 26 import java.util.TreeMap ; 27 28 import javax.jms.IllegalStateException ; 29 import javax.jms.JMSException ; 30 31 import org.jboss.mq.DestinationFullException; 32 import org.jboss.mq.DurableSubscriptionID; 33 import org.jboss.mq.SpyDestination; 34 import org.jboss.mq.SpyJMSException; 35 import org.jboss.mq.SpyMessage; 36 import org.jboss.mq.SpyTopic; 37 import org.jboss.mq.Subscription; 38 import org.jboss.mq.pm.NewPersistenceManager; 39 import org.jboss.mq.pm.PersistenceManager; 40 import org.jboss.mq.pm.Tx; 41 42 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap; 43 44 55 public class JMSTopic extends JMSDestination 56 { 57 58 ConcurrentReaderHashMap durQueues = new ConcurrentReaderHashMap(); 60 ConcurrentReaderHashMap tempQueues = new ConcurrentReaderHashMap(); 61 62 public JMSTopic(SpyDestination dest, ClientConsumer temporary, JMSDestinationManager server, BasicQueueParameters parameters) throws JMSException 63 { 64 super(dest, temporary, server, parameters); 65 66 PersistenceManager pm = server.getPersistenceManager(); 68 parameters.lateClone = (pm instanceof NewPersistenceManager); 69 } 70 71 public void addSubscriber(Subscription sub) throws JMSException 72 { 73 SpyTopic topic = (SpyTopic) sub.destination; 74 DurableSubscriptionID id = topic.getDurableSubscriptionID(); 75 76 if (id == null) 77 { 78 ExclusiveQueue q = new ExclusiveQueue(server, destination, sub, parameters); 80 81 q.createMessageCounter(destination.getName(), q.getDescription(), true, false, parameters.messageCounterHistoryDayLimit); 83 84 tempQueues.put(sub, q); 85 q.addSubscriber(sub); 86 } 87 else 88 { 89 PersistentQueue q = (PersistentQueue) durQueues.get(id); 90 91 if (q != null && q.isInUse()) 93 throw new IllegalStateException ("The durable subscription is already in use. " + id); 94 95 boolean selectorChanged = false; 97 if (q != null) 98 { 99 String newSelector = sub.messageSelector; 100 String oldSelector = null; 101 if (q instanceof SelectorPersistentQueue) 102 oldSelector = ((SelectorPersistentQueue) q).selectorString; 103 if ((newSelector == null && oldSelector != null) 104 || (newSelector != null && newSelector.equals(oldSelector) == false)) 105 selectorChanged = true; 106 } 107 108 if (q == null || !q.destination.equals(topic) || selectorChanged) 110 { 111 server.getStateManager().setDurableSubscription(server, id, topic); 113 114 synchronized (durQueues) 116 { 117 q = (PersistentQueue) durQueues.get(id); 118 } 119 } 120 q.addSubscriber(sub); 121 } 122 } 123 124 public void removeSubscriber(Subscription sub) throws JMSException 125 { 126 BasicQueue queue = null; 127 SpyTopic topic = (SpyTopic) sub.destination; 128 DurableSubscriptionID id = topic.getDurableSubscriptionID(); 129 if (id == null) 130 queue = (BasicQueue) tempQueues.get(sub); 131 else 132 queue = (BasicQueue) durQueues.get(id); 133 if (queue == null) 136 ((ClientConsumer) sub.clientConsumer).removeRemovedSubscription(sub.subscriptionId); 137 else 138 queue.removeSubscriber(sub); 139 } 140 141 public void nackMessages(Subscription sub) throws JMSException 142 { 143 BasicQueue queue = null; 144 SpyTopic topic = (SpyTopic) sub.destination; 145 DurableSubscriptionID id = topic.getDurableSubscriptionID(); 146 if (id == null) 147 queue = (BasicQueue) tempQueues.get(sub); 148 else 149 queue = (BasicQueue) durQueues.get(id); 150 if (queue != null) 151 { 152 queue.nackMessages(sub); 153 } 154 } 155 156 void cleanupSubscription(Subscription sub) 157 { 158 BasicQueue queue = (BasicQueue) tempQueues.remove(sub); 160 try 161 { 162 if (queue != null) 163 queue.removeAllMessages(); 164 } 165 catch (JMSException e) 166 { 167 cat.debug("Error removing messages for subscription " + sub, e); 168 } 169 } 170 171 public void addReceiver(Subscription sub) throws JMSException 172 { 173 getQueue(sub).addReceiver(sub); 174 } 175 176 public void removeReceiver(Subscription sub) 177 { 178 try 179 { 180 getQueue(sub).removeReceiver(sub); 181 } 182 catch (JMSException e) 183 { 184 cat.trace("Subscription is not registered: " + sub, e); 185 } 186 } 187 188 public void restoreMessage(MessageReference messageRef) 189 { 190 try 191 { 192 SpyMessage spyMessage = messageRef.getMessage(); 193 updateNextMessageId(spyMessage); 194 if (spyMessage.header.durableSubscriberID == null) 195 { 196 cat.debug("Trying to restore message with null durableSubscriberID"); 197 } 198 else 199 { 200 BasicQueue queue = ((BasicQueue) durQueues.get(spyMessage.header.durableSubscriberID)); 201 messageRef.queue = queue; 202 queue.restoreMessage(messageRef); 203 } 204 } 205 catch (JMSException e) 206 { 207 cat.error("Could not restore message:", e); 208 } 209 } 210 211 public void restoreMessage(SpyMessage message, Tx txid, int type) 212 { 213 try 214 { 215 updateNextMessageId(message); 216 if (message.header.durableSubscriberID == null) 217 { 218 cat.debug("Trying to restore message with null durableSubscriberID"); 219 } 220 else 221 { 222 BasicQueue queue = (BasicQueue) durQueues.get(message.header.durableSubscriberID); 223 MessageReference messageRef = server.getMessageCache().add(message, queue, MessageReference.STORED); 224 queue.restoreMessage(messageRef, txid, type); 225 } 226 } 227 catch (JMSException e) 228 { 229 cat.error("Could not restore message:", e); 230 } 231 } 232 233 public void restoreMessage(SpyMessage message, DurableSubscriptionID id) 234 { 235 try 236 { 237 updateNextMessageId(message); 238 if (id == null) 239 { 240 cat.debug("Trying to restore message with null durableSubscriberID"); 241 } 242 else 243 { 244 BasicQueue queue = (BasicQueue) durQueues.get(id); 245 MessageReference messageRef = server.getMessageCache().add(message, queue, MessageReference.STORED, id); 246 queue.restoreMessage(messageRef); 247 } 248 } 249 catch (JMSException e) 250 { 251 cat.error("Could not restore message:", e); 252 } 253 } 254 255 public void createDurableSubscription(DurableSubscriptionID id) throws JMSException 257 { 258 if (temporaryDestination != null) 259 throw new JMSException ("Not a valid operation on a temporary topic"); 260 261 SpyTopic dstopic = new SpyTopic((SpyTopic) destination, id); 262 263 Throwable error = null; 264 for (int i = 0; i <= parameters.recoveryRetries; ++i) 265 { 266 BasicQueue queue; 268 if (id.getSelector() == null) 269 queue = new PersistentQueue(server, dstopic, parameters); 270 else 271 queue = new SelectorPersistentQueue(server, dstopic, id.getSelector(), parameters); 272 273 queue.createMessageCounter(destination.getName(), id.toString(), true, true, parameters.messageCounterHistoryDayLimit); 275 276 durQueues.put(id, queue); 277 278 try 279 { 280 server.getPersistenceManager().restoreQueue(this, dstopic); 282 283 break; 285 } 286 catch (Throwable t) 287 { 288 if (i < parameters.recoveryRetries) 289 cat.warn("Error restoring topic subscription " + queue + " retries=" + i + " of " + parameters.recoveryRetries, t); 290 else 291 error = t; 292 try 293 { 294 queue.stop(); 295 } 296 catch (Throwable ignored) 297 { 298 cat.trace("Ignored error stopping topic subscription " + queue, ignored); 299 } 300 finally 301 { 302 durQueues.remove(id); 303 queue = null; 304 } 305 } 306 } 307 308 if (error != null) 309 SpyJMSException.rethrowAsJMSException("Unable to recover topic subscription " + id + " retries=" + parameters.recoveryRetries, error); 310 } 311 312 public void close() throws JMSException 314 { 315 if (temporaryDestination != null) 316 throw new JMSException ("Not a valid operation on a temporary topic"); 317 318 Iterator i = tempQueues.values().iterator(); 319 while (i.hasNext()) 320 { 321 ExclusiveQueue queue = (ExclusiveQueue) i.next(); 322 queue.stop(); 323 } 324 325 i = durQueues.values().iterator(); 326 while (i.hasNext()) 327 { 328 PersistentQueue queue = (PersistentQueue) i.next(); 329 queue.stop(); 330 server.getPersistenceManager().closeQueue(this, queue.getSpyDestination()); 331 } 332 } 333 334 public void destroyDurableSubscription(DurableSubscriptionID id) throws JMSException 336 { 337 BasicQueue queue = (BasicQueue) durQueues.remove(id); 338 queue.removeAllMessages(); 339 } 340 341 public SpyMessage receive(Subscription sub, boolean wait) throws javax.jms.JMSException 342 { 343 return getQueue(sub).receive(sub, wait); 344 } 345 346 public void acknowledge(org.jboss.mq.AcknowledgementRequest req, Subscription sub, org.jboss.mq.pm.Tx txId) 347 throws JMSException 348 { 349 getQueue(sub).acknowledge(req, txId); 350 } 351 352 public void addMessage(SpyMessage message, org.jboss.mq.pm.Tx txId) throws JMSException 353 { 354 StringBuffer errorMessage = null; 355 356 boolean added = false; 358 359 long messageId = nextMessageId(); 361 362 if (parameters.lateClone) 363 message.header.messageId = messageId; 364 365 Iterator iter = durQueues.keySet().iterator(); 366 while (iter.hasNext()) 367 { 368 DurableSubscriptionID id = (DurableSubscriptionID) iter.next(); 369 PersistentQueue q = (PersistentQueue) durQueues.get(id); 370 MessageReference ref; 371 if (parameters.lateClone) 372 { 373 ref = server.getMessageCache().add(message, q, MessageReference.NOT_STORED, id); 374 } 375 else 376 { 377 SpyMessage clone = message.myClone(); 378 clone.header.durableSubscriberID = id; 379 clone.header.messageId = messageId; 380 clone.setJMSDestination(q.getSpyDestination()); 381 ref = server.getMessageCache().add(clone, q, MessageReference.NOT_STORED); 382 } 383 try 384 { 385 if (added == false && parameters.lateClone && ref.isPersistent()) 387 { 388 NewPersistenceManager pm = (NewPersistenceManager) server.getPersistenceManager(); 389 pm.addMessage(message); 390 added = true; 391 } 392 q.addMessage(ref, txId); 393 } 394 catch (DestinationFullException e) 395 { 396 if (errorMessage == null) 397 errorMessage = new StringBuffer (e.getText()); 398 else 399 errorMessage.append(", ").append(e.getText()); 400 } 401 } 402 403 iter = tempQueues.values().iterator(); 404 while (iter.hasNext()) 405 { 406 BasicQueue q = (BasicQueue) iter.next(); 407 MessageReference ref; 408 if (parameters.lateClone) 409 { 410 ref = server.getMessageCache().add(message, q, MessageReference.NOT_STORED); 411 } 412 else 413 { 414 SpyMessage clone = message.myClone(); 415 clone.header.messageId = messageId; 416 ref = server.getMessageCache().add(clone, q, MessageReference.NOT_STORED); 417 } 418 try 419 { 420 q.addMessage(ref, txId); 421 } 422 catch (DestinationFullException e) 423 { 424 if (errorMessage == null) 425 errorMessage = new StringBuffer (e.getText()); 426 else 427 errorMessage.append(", ").append(e.getText()); 428 } 429 } 430 431 if (errorMessage != null) 432 throw new DestinationFullException(errorMessage.toString()); 433 } 434 435 public int getAllMessageCount() 436 { 437 return calculateMessageCount(getAllQueues()); 438 } 439 440 public int getDurableMessageCount() 441 { 442 return calculateMessageCount(getPersistentQueues()); 443 } 444 445 public int getNonDurableMessageCount() 446 { 447 return calculateMessageCount(getTemporaryQueues()); 448 } 449 450 public ArrayList getAllQueues() 451 { 452 ArrayList result = new ArrayList (getAllSubscriptionsCount()); 453 result.addAll(getPersistentQueues()); 454 result.addAll(getTemporaryQueues()); 455 return result; 456 } 457 458 public ArrayList getTemporaryQueues() 459 { 460 return new ArrayList (tempQueues.values()); 461 } 462 463 public ArrayList getPersistentQueues() 464 { 465 return new ArrayList (durQueues.values()); 466 } 467 468 public int getAllSubscriptionsCount() 469 { 470 return durQueues.size() + tempQueues.size(); 471 } 472 473 public int getDurableSubscriptionsCount() 474 { 475 return durQueues.size(); 476 } 477 478 public int getNonDurableSubscriptionsCount() 479 { 480 return tempQueues.size(); 481 } 482 483 public ArrayList getAllSubscriptions() 484 { 485 ArrayList result = new ArrayList (getAllSubscriptionsCount()); 486 result.addAll(getDurableSubscriptions()); 487 result.addAll(getNonDurableSubscriptions()); 488 return result; 489 } 490 491 public ArrayList getDurableSubscriptions() 492 { 493 return new ArrayList (durQueues.keySet()); 494 } 495 496 public ArrayList getNonDurableSubscriptions() 497 { 498 return new ArrayList (tempQueues.keySet()); 499 } 500 501 public PersistentQueue getDurableSubscription(DurableSubscriptionID id) 502 { 503 return (PersistentQueue) durQueues.get(id); 504 } 505 506 public BasicQueue getQueue(Subscription sub) throws JMSException 507 { 508 SpyTopic topic = (SpyTopic) sub.destination; 509 DurableSubscriptionID id = topic.getDurableSubscriptionID(); 510 BasicQueue queue = null; 511 if (id != null) 512 queue = getDurableSubscription(id); 513 else 514 queue = (BasicQueue) tempQueues.get(sub); 515 516 if (queue != null) 517 return queue; 518 else 519 throw new JMSException ("Subscription not found: " + sub); 520 } 521 522 524 527 public boolean isInUse() 528 { 529 if (tempQueues.size() > 0) 530 return true; 531 Iterator iter = durQueues.values().iterator(); 532 while (iter.hasNext()) 533 { 534 PersistentQueue q = (PersistentQueue) iter.next(); 535 if (q.isInUse()) 536 return true; 537 } 538 return false; 539 } 540 543 public void removeAllMessages() throws JMSException 544 { 545 Iterator i = durQueues.values().iterator(); 546 while (i.hasNext()) 547 { 548 PersistentQueue queue = (PersistentQueue) i.next(); 549 queue.removeAllMessages(); 550 } 551 } 552 553 private int calculateMessageCount(ArrayList queues) 554 { 555 int count = 0; 556 for (Iterator i = queues.listIterator(); i.hasNext();) 557 { 558 BasicQueue queue = (BasicQueue) i.next(); 559 count += queue.getQueueDepth(); 560 } 561 return count; 562 } 563 564 569 public MessageCounter[] getMessageCounter() 570 { 571 TreeMap map = new TreeMap (); 572 573 Iterator i = durQueues.values().iterator(); 574 575 while (i.hasNext()) 576 { 577 BasicQueue queue = (BasicQueue) i.next(); 578 MessageCounter counter = queue.getMessageCounter(); 579 580 if (counter != null) 581 { 582 String key = counter.getDestinationName() + counter.getDestinationSubscription(); 583 map.put(key, counter); 584 } 585 } 586 587 i = tempQueues.values().iterator(); 588 589 while (i.hasNext()) 590 { 591 BasicQueue queue = (BasicQueue) i.next(); 592 MessageCounter counter = queue.getMessageCounter(); 593 594 if (counter != null) 595 { 596 String key = counter.getDestinationName() + counter.getDestinationSubscription(); 597 map.put(key, counter); 598 } 599 } 600 601 return (MessageCounter[]) map.values().toArray(new MessageCounter[0]); 602 } 603 } 604 | Popular Tags |