KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > memory > MemoryTopicSub


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.store.memory;
16
17 import java.util.Iterator JavaDoc;
18 import java.util.LinkedHashMap JavaDoc;
19 import java.util.Map JavaDoc;
20 import java.util.Map.Entry;
21 import org.apache.activemq.command.Message;
22 import org.apache.activemq.command.MessageId;
23 import org.apache.activemq.store.MessageRecoveryListener;
24
25 /**
26  * A holder for a durable subscriber
27  *
28  * @version $Revision: 1.7 $
29  */

30 class MemoryTopicSub{
31
32     private Map JavaDoc map=new LinkedHashMap JavaDoc();
33     private MessageId lastBatch;
34
35     void addMessage(MessageId id,Message message){
36         map.put(id,message);
37     }
38
39     void removeMessage(MessageId id){
40         map.remove(id);
41         if (map.isEmpty()) {
42             lastBatch=null;
43         }
44     }
45
46     int size(){
47         return map.size();
48     }
49
50     void recoverSubscription(MessageRecoveryListener listener) throws Exception JavaDoc{
51         for(Iterator JavaDoc iter=map.entrySet().iterator();iter.hasNext();){
52             Map.Entry JavaDoc entry=(Entry)iter.next();
53             Object JavaDoc msg=entry.getValue();
54             if(msg.getClass()==MessageId.class){
55                 listener.recoverMessageReference((MessageId)msg);
56             }else{
57                 listener.recoverMessage((Message)msg);
58             }
59         }
60         listener.finished();
61     }
62
63     void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception JavaDoc{
64         boolean pastLackBatch=lastBatch==null;
65         MessageId lastId=null;
66         // the message table is a synchronizedMap - so just have to synchronize here
67
int count=0;
68         for(Iterator JavaDoc iter=map.entrySet().iterator();iter.hasNext()&&count<maxReturned;){
69             Map.Entry JavaDoc entry=(Entry)iter.next();
70             if(pastLackBatch){
71                 count++;
72                 Object JavaDoc msg=entry.getValue();
73                 lastId=(MessageId)entry.getKey();
74                 if(msg.getClass()==MessageId.class){
75                     listener.recoverMessageReference((MessageId)msg);
76                 }else{
77                     listener.recoverMessage((Message)msg);
78                 }
79             }else{
80                 pastLackBatch=entry.getKey().equals(lastBatch);
81             }
82         }
83         if(lastId!=null){
84             lastBatch=lastId;
85         }
86         listener.finished();
87     }
88
89     void resetBatching(){
90         lastBatch=null;
91     }
92 }
93
Popular Tags