1 18 package org.apache.activemq.memory.list; 19 20 import java.util.ArrayList ; 21 import java.util.HashMap ; 22 import java.util.Iterator ; 23 import java.util.List ; 24 import java.util.Map ; 25 import java.util.Set ; 26 import org.apache.activemq.broker.region.MessageReference; 27 import org.apache.activemq.broker.region.Subscription; 28 import org.apache.activemq.command.ActiveMQDestination; 29 import org.apache.activemq.command.ActiveMQMessage; 30 import org.apache.activemq.command.Message; 31 import org.apache.activemq.filter.DestinationMap; 32 import org.apache.activemq.memory.buffer.MessageBuffer; 33 import org.apache.activemq.memory.buffer.MessageQueue; 34 import org.apache.activemq.memory.buffer.OrderBasedMessageBuffer; 35 36 43 public class DestinationBasedMessageList implements MessageList { 44 45 private MessageBuffer messageBuffer; 46 private Map queueIndex = new HashMap (); 47 private DestinationMap subscriptionIndex = new DestinationMap(); 48 private Object lock = new Object (); 49 50 public DestinationBasedMessageList(int maximumSize) { 51 this(new OrderBasedMessageBuffer(maximumSize)); 52 } 53 54 public DestinationBasedMessageList(MessageBuffer buffer) { 55 messageBuffer = buffer; 56 } 57 58 public void add(MessageReference node) { 59 ActiveMQMessage message = (ActiveMQMessage) node.getMessageHardRef(); 60 ActiveMQDestination destination = message.getDestination(); 61 MessageQueue queue = null; 62 synchronized (lock) { 63 queue = (MessageQueue) queueIndex.get(destination); 64 if (queue == null) { 65 queue = messageBuffer.createMessageQueue(); 66 queueIndex.put(destination, queue); 67 subscriptionIndex.put(destination, queue); 68 } 69 } 70 queue.add(node); 71 } 72 73 public List getMessages(Subscription sub) { 74 return getMessages(sub.getConsumerInfo().getDestination()); 75 } 76 77 public List getMessages(ActiveMQDestination destination) { 78 Set set = null; 79 synchronized (lock) { 80 set = subscriptionIndex.get(destination); 81 } 82 List answer = new ArrayList (); 83 for (Iterator iter = set.iterator(); iter.hasNext();) { 84 MessageQueue queue = (MessageQueue) iter.next(); 85 queue.appendMessages(answer); 86 } 87 return answer; 88 } 89 90 public Message[] browse(ActiveMQDestination destination) { 91 List result = getMessages(destination); 92 return (Message[])result.toArray(new Message[result.size()]); 93 } 94 95 96 public void clear() { 97 messageBuffer.clear(); 98 } 99 } 100 | Popular Tags |