KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > memory > list > DestinationBasedMessageList


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.memory.list;
19
20 import java.util.ArrayList JavaDoc;
21 import java.util.HashMap JavaDoc;
22 import java.util.Iterator JavaDoc;
23 import java.util.List JavaDoc;
24 import java.util.Map JavaDoc;
25 import java.util.Set JavaDoc;
26 import org.apache.activemq.broker.region.MessageReference;
27 import org.apache.activemq.broker.region.Subscription;
28 import org.apache.activemq.command.ActiveMQDestination;
29 import org.apache.activemq.command.ActiveMQMessage;
30 import org.apache.activemq.command.Message;
31 import org.apache.activemq.filter.DestinationMap;
32 import org.apache.activemq.memory.buffer.MessageBuffer;
33 import org.apache.activemq.memory.buffer.MessageQueue;
34 import org.apache.activemq.memory.buffer.OrderBasedMessageBuffer;
35
36 /**
37  * An implementation of {@link MessageList} which maintains a separate message
38  * list for each destination to reduce contention on the list and to speed up
39  * recovery times by only recovering the interested topics.
40  *
41  * @version $Revision: 1.1 $
42  */

43 public class DestinationBasedMessageList implements MessageList {
44
45     private MessageBuffer messageBuffer;
46     private Map JavaDoc queueIndex = new HashMap JavaDoc();
47     private DestinationMap subscriptionIndex = new DestinationMap();
48     private Object JavaDoc lock = new Object JavaDoc();
49
50     public DestinationBasedMessageList(int maximumSize) {
51         this(new OrderBasedMessageBuffer(maximumSize));
52     }
53     
54     public DestinationBasedMessageList(MessageBuffer buffer) {
55         messageBuffer = buffer;
56     }
57
58     public void add(MessageReference node) {
59         ActiveMQMessage message = (ActiveMQMessage) node.getMessageHardRef();
60         ActiveMQDestination destination = message.getDestination();
61         MessageQueue queue = null;
62         synchronized (lock) {
63             queue = (MessageQueue) queueIndex.get(destination);
64             if (queue == null) {
65                 queue = messageBuffer.createMessageQueue();
66                 queueIndex.put(destination, queue);
67                 subscriptionIndex.put(destination, queue);
68             }
69         }
70         queue.add(node);
71     }
72
73     public List JavaDoc getMessages(Subscription sub) {
74         return getMessages(sub.getConsumerInfo().getDestination());
75     }
76     
77     public List JavaDoc getMessages(ActiveMQDestination destination) {
78         Set JavaDoc set = null;
79         synchronized (lock) {
80             set = subscriptionIndex.get(destination);
81         }
82         List JavaDoc answer = new ArrayList JavaDoc();
83         for (Iterator JavaDoc iter = set.iterator(); iter.hasNext();) {
84             MessageQueue queue = (MessageQueue) iter.next();
85             queue.appendMessages(answer);
86         }
87         return answer;
88     }
89     
90     public Message[] browse(ActiveMQDestination destination) {
91         List JavaDoc result = getMessages(destination);
92         return (Message[])result.toArray(new Message[result.size()]);
93     }
94
95
96     public void clear() {
97         messageBuffer.clear();
98     }
99 }
100
Popular Tags