KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > policy > TimedSubscriptionRecoveryPolicy


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.region.policy;
19
20 import java.util.ArrayList JavaDoc;
21 import java.util.Collections JavaDoc;
22 import java.util.Iterator JavaDoc;
23 import java.util.LinkedList JavaDoc;
24 import java.util.List JavaDoc;
25 import org.apache.activemq.broker.ConnectionContext;
26 import org.apache.activemq.broker.region.MessageReference;
27 import org.apache.activemq.broker.region.SubscriptionRecovery;
28 import org.apache.activemq.broker.region.Topic;
29 import org.apache.activemq.command.ActiveMQDestination;
30 import org.apache.activemq.command.Message;
31 import org.apache.activemq.filter.DestinationFilter;
32 import org.apache.activemq.filter.MessageEvaluationContext;
33 import org.apache.activemq.thread.Scheduler;
34
35 /**
36  * This implementation of {@link SubscriptionRecoveryPolicy} will keep a timed
37  * buffer of messages around in memory and use that to recover new
38  * subscriptions.
39  *
40  * @org.apache.xbean.XBean
41  *
42  * @version $Revision$
43  */

44 public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
45
46     private static final int GC_INTERVAL = 1000;
47
48     // TODO: need to get a better synchronized linked list that has little
49
// contention between enqueuing and dequeuing
50
private final List JavaDoc buffer = Collections.synchronizedList(new LinkedList JavaDoc());
51     private volatile long lastGCRun = System.currentTimeMillis();
52
53     private long recoverDuration = 60 * 1000; // Buffer for 1 min.
54

55     static class TimestampWrapper {
56         public MessageReference message;
57         public long timestamp;
58
59         public TimestampWrapper(MessageReference message, long timestamp) {
60             this.message = message;
61             this.timestamp = timestamp;
62         }
63     }
64     
65     private final Runnable JavaDoc gcTask = new Runnable JavaDoc() {
66         public void run() {
67             gc();
68         }
69     };
70
71     public SubscriptionRecoveryPolicy copy() {
72         TimedSubscriptionRecoveryPolicy rc = new TimedSubscriptionRecoveryPolicy();
73         rc.setRecoverDuration(recoverDuration);
74         return rc;
75     }
76
77     public boolean add(ConnectionContext context, MessageReference message) throws Exception JavaDoc {
78         buffer.add(new TimestampWrapper(message, lastGCRun));
79         return true;
80     }
81
82     public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception JavaDoc{
83         // Re-dispatch the messages from the buffer.
84
ArrayList JavaDoc copy=new ArrayList JavaDoc(buffer);
85         if(!copy.isEmpty()){
86             MessageEvaluationContext msgContext=context.getMessageEvaluationContext();
87             for(Iterator JavaDoc iter=copy.iterator();iter.hasNext();){
88                 TimestampWrapper timestampWrapper=(TimestampWrapper)iter.next();
89                 MessageReference message=timestampWrapper.message;
90                 sub.addRecoveredMessage(context,message);
91             }
92         }
93     }
94
95     public void start() throws Exception JavaDoc {
96         Scheduler.executePeriodically(gcTask, GC_INTERVAL);
97     }
98
99     public void stop() throws Exception JavaDoc {
100         Scheduler.cancel(gcTask);
101     }
102
103     public void gc() {
104         lastGCRun = System.currentTimeMillis();
105         while (buffer.size() > 0) {
106             TimestampWrapper timestampWrapper = (TimestampWrapper) buffer.get(0);
107             if( lastGCRun > timestampWrapper.timestamp+recoverDuration ) {
108                 // GC it.
109
buffer.remove(0);
110             }
111             else {
112                 break;
113             }
114         }
115     }
116
117     public long getRecoverDuration() {
118         return recoverDuration;
119     }
120
121     public void setRecoverDuration(long recoverDuration) {
122         this.recoverDuration = recoverDuration;
123     }
124     
125     public Message[] browse(ActiveMQDestination destination) throws Exception JavaDoc{
126         List JavaDoc result = new ArrayList JavaDoc();
127         ArrayList JavaDoc copy = new ArrayList JavaDoc(buffer);
128         DestinationFilter filter=DestinationFilter.parseFilter(destination);
129         for (Iterator JavaDoc iter = copy.iterator(); iter.hasNext();) {
130             TimestampWrapper timestampWrapper = (TimestampWrapper) iter.next();
131             MessageReference ref = timestampWrapper.message;
132             Message message=ref.getMessage();
133             if (filter.matches(message.getDestination())){
134                 result.add(message);
135             }
136         }
137         return (Message[]) result.toArray(new Message[result.size()]);
138     }
139
140 }
141
Popular Tags