KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > memory > MemoryMessageStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.store.memory;
16
17 import java.io.IOException JavaDoc;
18 import java.util.Collections JavaDoc;
19 import java.util.Iterator JavaDoc;
20 import java.util.LinkedHashMap JavaDoc;
21 import java.util.Map JavaDoc;
22 import java.util.Map.Entry;
23
24 import org.apache.activemq.broker.ConnectionContext;
25 import org.apache.activemq.command.ActiveMQDestination;
26 import org.apache.activemq.command.Message;
27 import org.apache.activemq.command.MessageAck;
28 import org.apache.activemq.command.MessageId;
29 import org.apache.activemq.memory.UsageManager;
30 import org.apache.activemq.store.MessageRecoveryListener;
31 import org.apache.activemq.store.MessageStore;
32
33 /**
34  * An implementation of {@link org.apache.activemq.store.MessageStore} which uses a
35  *
36  * @version $Revision: 1.7 $
37  */

38 public class MemoryMessageStore implements MessageStore{
39
40     protected final ActiveMQDestination destination;
41     protected final Map JavaDoc messageTable;
42     protected MessageId lastBatchId;
43
44     public MemoryMessageStore(ActiveMQDestination destination){
45         this(destination,new LinkedHashMap JavaDoc());
46     }
47
48     public MemoryMessageStore(ActiveMQDestination destination,Map JavaDoc messageTable){
49         this.destination=destination;
50         this.messageTable=Collections.synchronizedMap(messageTable);
51     }
52
53     public synchronized void addMessage(ConnectionContext context,Message message) throws IOException JavaDoc{
54         synchronized(messageTable){
55             messageTable.put(message.getMessageId(),message);
56         }
57     }
58
59 // public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
60
// throws IOException{
61
// synchronized(messageTable){
62
// messageTable.put(messageId,messageRef);
63
// }
64
// }
65

66     public Message getMessage(MessageId identity) throws IOException JavaDoc{
67         return (Message)messageTable.get(identity);
68     }
69
70 // public String getMessageReference(MessageId identity) throws IOException{
71
// return (String)messageTable.get(identity);
72
// }
73

74     public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException JavaDoc{
75         removeMessage(ack.getLastMessageId());
76     }
77
78     public void removeMessage(MessageId msgId) throws IOException JavaDoc{
79         synchronized(messageTable){
80             messageTable.remove(msgId);
81             if((lastBatchId!=null && lastBatchId.equals(msgId)) || messageTable.isEmpty()){
82                 lastBatchId=null;
83             }
84         }
85     }
86
87     public void recover(MessageRecoveryListener listener) throws Exception JavaDoc{
88         // the message table is a synchronizedMap - so just have to synchronize here
89
synchronized(messageTable){
90             for(Iterator JavaDoc iter=messageTable.values().iterator();iter.hasNext();){
91                 Object JavaDoc msg=(Object JavaDoc)iter.next();
92                 if(msg.getClass()==MessageId.class){
93                     listener.recoverMessageReference((MessageId)msg);
94                 }else{
95                     listener.recoverMessage((Message)msg);
96                 }
97             }
98             listener.finished();
99         }
100     }
101
102     public void start(){
103     }
104
105     public void stop(){
106     }
107
108     public void removeAllMessages(ConnectionContext context) throws IOException JavaDoc{
109         synchronized(messageTable){
110             messageTable.clear();
111         }
112     }
113
114     public ActiveMQDestination getDestination(){
115         return destination;
116     }
117
118     public void delete(){
119         synchronized(messageTable){
120             messageTable.clear();
121         }
122     }
123
124     /**
125      * @param usageManager The UsageManager that is controlling the destination's memory usage.
126      */

127     public void setUsageManager(UsageManager usageManager){
128     }
129
130     public int getMessageCount(){
131         return messageTable.size();
132     }
133
134     public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception JavaDoc{
135         synchronized(messageTable){
136             boolean pastLackBatch=lastBatchId==null;
137             int count=0;
138             for(Iterator JavaDoc iter=messageTable.entrySet().iterator();iter.hasNext();){
139                 Map.Entry JavaDoc entry=(Entry)iter.next();
140                 if(pastLackBatch){
141                     count++;
142                     Object JavaDoc msg=entry.getValue();
143                     lastBatchId=(MessageId)entry.getKey();
144                     if(msg.getClass()==MessageId.class){
145                         listener.recoverMessageReference((MessageId)msg);
146                     }else{
147                         listener.recoverMessage((Message)msg);
148                     }
149                 }else{
150                     pastLackBatch=entry.getKey().equals(lastBatchId);
151                 }
152             }
153             listener.finished();
154         }
155     }
156
157     public void resetBatching(){
158         lastBatchId=null;
159     }
160 }
161
Popular Tags