KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > policy > FixedCountSubscriptionRecoveryPolicy


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.region.policy;
19
20 import java.util.ArrayList JavaDoc;
21 import java.util.List JavaDoc;
22 import org.apache.activemq.broker.ConnectionContext;
23 import org.apache.activemq.broker.region.MessageReference;
24 import org.apache.activemq.broker.region.Subscription;
25 import org.apache.activemq.broker.region.SubscriptionRecovery;
26 import org.apache.activemq.broker.region.Topic;
27 import org.apache.activemq.command.ActiveMQDestination;
28 import org.apache.activemq.command.Message;
29 import org.apache.activemq.filter.DestinationFilter;
30 import org.apache.activemq.filter.MessageEvaluationContext;
31 /**
32  * This implementation of {@link SubscriptionRecoveryPolicy} will keep a fixed count
33  * of last messages.
34  *
35  * @org.apache.xbean.XBean
36  *
37  * @version $Revision$
38  */

39 public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy{
40     volatile private MessageReference messages[];
41     private int maximumSize=100;
42     private int tail=0;
43     
44     public SubscriptionRecoveryPolicy copy() {
45         FixedCountSubscriptionRecoveryPolicy rc = new FixedCountSubscriptionRecoveryPolicy();
46         rc.setMaximumSize(maximumSize);
47         return rc;
48     }
49
50     synchronized public boolean add(ConnectionContext context,MessageReference node) throws Exception JavaDoc{
51         messages[tail++]=node;
52         if(tail>=messages.length)
53             tail=0;
54         return true;
55     }
56
57     synchronized public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception JavaDoc{
58         // Re-dispatch the last message seen.
59
int t=tail;
60         // The buffer may not have rolled over yet..., start from the front
61
if(messages[t]==null)
62             t=0;
63         // Well the buffer is really empty then.
64
if(messages[t]==null)
65             return;
66         // Keep dispatching until t hit's tail again.
67
do{
68             MessageReference node=messages[t];
69             sub.addRecoveredMessage(context,node);
70             t++;
71             if(t>=messages.length)
72                 t=0;
73         }while(t!=tail);
74     }
75
76     public void start() throws Exception JavaDoc{
77         messages=new MessageReference[maximumSize];
78     }
79
80     public void stop() throws Exception JavaDoc{
81         messages=null;
82     }
83
84     public int getMaximumSize(){
85         return maximumSize;
86     }
87
88     /**
89      * Sets the maximum number of messages that this destination will hold around in RAM
90      */

91     public void setMaximumSize(int maximumSize){
92         this.maximumSize=maximumSize;
93     }
94
95     public Message[] browse(ActiveMQDestination destination) throws Exception JavaDoc{
96         List JavaDoc result=new ArrayList JavaDoc();
97         DestinationFilter filter=DestinationFilter.parseFilter(destination);
98         int t=tail;
99         if(messages[t]==null)
100             t=0;
101         if(messages[t]!=null){
102             do{
103                 MessageReference ref=messages[t];
104                 Message message=ref.getMessage();
105                 if(filter.matches(message.getDestination())){
106                     result.add(message);
107                 }
108                 t++;
109                 if(t>=messages.length)
110                     t=0;
111             }while(t!=tail);
112         }
113         return (Message[]) result.toArray(new Message[result.size()]);
114     }
115
116 }
117
Popular Tags