KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > policy > FixedSizedSubscriptionRecoveryPolicy


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.region.policy;
19
20 import java.util.Iterator JavaDoc;
21 import java.util.List JavaDoc;
22 import org.apache.activemq.broker.ConnectionContext;
23 import org.apache.activemq.broker.region.MessageReference;
24 import org.apache.activemq.broker.region.SubscriptionRecovery;
25 import org.apache.activemq.broker.region.Topic;
26 import org.apache.activemq.command.ActiveMQDestination;
27 import org.apache.activemq.command.Message;
28 import org.apache.activemq.memory.list.DestinationBasedMessageList;
29 import org.apache.activemq.memory.list.MessageList;
30 import org.apache.activemq.memory.list.SimpleMessageList;
31
32 /**
33  * This implementation of {@link SubscriptionRecoveryPolicy} will keep a fixed
34  * amount of memory available in RAM for message history which is evicted in
35  * time order.
36  *
37  * @org.apache.xbean.XBean
38  *
39  * @version $Revision$
40  */

41 public class FixedSizedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
42
43     private MessageList buffer;
44     private int maximumSize = 100 * 64 * 1024;
45     private boolean useSharedBuffer = true;
46
47     public SubscriptionRecoveryPolicy copy() {
48         FixedSizedSubscriptionRecoveryPolicy rc = new FixedSizedSubscriptionRecoveryPolicy();
49         rc.setMaximumSize(maximumSize);
50         rc.setUseSharedBuffer(useSharedBuffer);
51         return rc;
52     }
53
54     public boolean add(ConnectionContext context, MessageReference message) throws Exception JavaDoc {
55         buffer.add(message);
56         return true;
57     }
58
59     public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception JavaDoc{
60         // Re-dispatch the messages from the buffer.
61
List JavaDoc copy=buffer.getMessages(sub.getActiveMQDestination());
62         if(!copy.isEmpty()){
63             for(Iterator JavaDoc iter=copy.iterator();iter.hasNext();){
64                 MessageReference node=(MessageReference)iter.next();
65                 sub.addRecoveredMessage(context,node);
66             }
67         }
68     }
69
70     public void start() throws Exception JavaDoc {
71         buffer = createMessageList();
72     }
73
74     public void stop() throws Exception JavaDoc {
75         buffer.clear();
76     }
77
78     // Properties
79
// -------------------------------------------------------------------------
80
public MessageList getBuffer() {
81         return buffer;
82     }
83
84     public void setBuffer(MessageList buffer) {
85         this.buffer = buffer;
86     }
87
88     public int getMaximumSize() {
89         return maximumSize;
90     }
91
92     /**
93      * Sets the maximum amount of RAM in bytes that this buffer can hold in RAM
94      */

95     public void setMaximumSize(int maximumSize) {
96         this.maximumSize = maximumSize;
97     }
98
99     public boolean isUseSharedBuffer() {
100         return useSharedBuffer;
101     }
102
103     public void setUseSharedBuffer(boolean useSharedBuffer) {
104         this.useSharedBuffer = useSharedBuffer;
105     }
106     
107     public Message[] browse(ActiveMQDestination destination) throws Exception JavaDoc{
108         return buffer.browse(destination);
109     }
110
111     // Implementation methods
112

113     // -------------------------------------------------------------------------
114
protected MessageList createMessageList() {
115         if (useSharedBuffer) {
116             return new SimpleMessageList(maximumSize);
117         }
118         else {
119             return new DestinationBasedMessageList(maximumSize);
120         }
121     }
122
123 }
124
Popular Tags