KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > kahadaptor > KahaReferenceStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.store.kahadaptor;
16
17 import java.io.IOException JavaDoc;
18 import java.util.Set JavaDoc;
19 import org.apache.activemq.broker.ConnectionContext;
20 import org.apache.activemq.command.ActiveMQDestination;
21 import org.apache.activemq.command.Message;
22 import org.apache.activemq.command.MessageAck;
23 import org.apache.activemq.command.MessageId;
24 import org.apache.activemq.kaha.MapContainer;
25 import org.apache.activemq.kaha.StoreEntry;
26 import org.apache.activemq.memory.UsageManager;
27 import org.apache.activemq.store.MessageRecoveryListener;
28 import org.apache.activemq.store.ReferenceStore;
29
30 public class KahaReferenceStore implements ReferenceStore{
31
32     protected final ActiveMQDestination destination;
33     protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
34     protected KahaReferenceStoreAdapter adapter;
35     private StoreEntry batchEntry=null;
36
37     public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination destination) throws IOException JavaDoc{
38         this.adapter = adapter;
39         this.messageContainer=container;
40         this.destination=destination;
41     }
42
43     public void start(){
44     }
45
46     public void stop(){
47     }
48
49     protected MessageId getMessageId(Object JavaDoc object){
50         return new MessageId(((ReferenceRecord)object).getMessageId());
51     }
52
53     public synchronized void addMessage(ConnectionContext context,Message message) throws IOException JavaDoc{
54         throw new RuntimeException JavaDoc("Use addMessageReference instead");
55     }
56
57     public synchronized Message getMessage(MessageId identity) throws IOException JavaDoc{
58         throw new RuntimeException JavaDoc("Use addMessageReference instead");
59     }
60
61     protected void recover(MessageRecoveryListener listener,Object JavaDoc msg) throws Exception JavaDoc{
62         ReferenceRecord record=(ReferenceRecord)msg;
63         listener.recoverMessageReference(new MessageId(record.getMessageId()));
64     }
65
66     public synchronized void recover(MessageRecoveryListener listener) throws Exception JavaDoc{
67         for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
68             ReferenceRecord record=messageContainer.getValue(entry);
69             recover(listener,new MessageId(record.getMessageId()));
70         }
71         listener.finished();
72     }
73
74     public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception JavaDoc{
75         StoreEntry entry=batchEntry;
76         if(entry==null){
77             entry=messageContainer.getFirst();
78         }else{
79             entry=messageContainer.refresh(entry);
80             if (entry != null) {
81             entry=messageContainer.getNext(entry);
82             }
83         }
84         if(entry!=null){
85             int count=0;
86             do{
87                 Object JavaDoc msg=messageContainer.getValue(entry);
88                 if(msg!=null){
89                     recover(listener,msg);
90                     count++;
91                 }
92                 batchEntry=entry;
93                 entry=messageContainer.getNext(entry);
94             }while(entry!=null&&count<maxReturned&&listener.hasSpace());
95         }
96         listener.finished();
97     }
98
99     public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data)
100             throws IOException JavaDoc{
101         ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
102         messageContainer.put(messageId,record);
103         addInterest(record);
104     }
105
106     public ReferenceData getMessageReference(MessageId identity) throws IOException JavaDoc{
107         ReferenceRecord result=messageContainer.get(identity);
108         if(result==null)
109             return null;
110         return result.getData();
111     }
112
113     public void addReferenceFileIdsInUse(){
114         for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
115             ReferenceRecord msg=(ReferenceRecord)messageContainer.getValue(entry);
116             addInterest(msg);
117         }
118     }
119
120     public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException JavaDoc{
121         removeMessage(ack.getLastMessageId());
122     }
123
124     public synchronized void removeMessage(MessageId msgId) throws IOException JavaDoc{
125         StoreEntry entry=messageContainer.getEntry(msgId);
126         if(entry!=null){
127             ReferenceRecord rr=messageContainer.remove(msgId);
128             if(rr!=null){
129                 removeInterest(rr);
130                 if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
131                     resetBatching();
132                 }
133             }
134         }
135     }
136
137     public synchronized void removeAllMessages(ConnectionContext context) throws IOException JavaDoc{
138         messageContainer.clear();
139     }
140
141     public ActiveMQDestination getDestination(){
142         return destination;
143     }
144
145     public synchronized void delete(){
146         messageContainer.clear();
147     }
148
149     public void resetBatching(){
150         batchEntry=null;
151     }
152
153     public int getMessageCount(){
154         return messageContainer.size();
155     }
156
157     public void setUsageManager(UsageManager usageManager){
158     }
159
160     public boolean isSupportForCursors(){
161         return true;
162     }
163
164     
165     public boolean supportsExternalBatchControl(){
166         return true;
167     }
168     
169     void removeInterest(ReferenceRecord rr) {
170         adapter.removeInterestInRecordFile(rr.getData().getFileId());
171     }
172     
173     void addInterest(ReferenceRecord rr) {
174         adapter.addInterestInRecordFile(rr.getData().getFileId());
175     }
176
177     /**
178      * @param startAfter
179      * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
180      */

181     public void setBatch(MessageId startAfter){
182     }
183 }
184
Popular Tags