KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > policy > QueryBasedSubscriptionRecoveryPolicy


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.region.policy;
19
20 import org.apache.activemq.ActiveMQMessageTransformation;
21 import org.apache.activemq.broker.ConnectionContext;
22 import org.apache.activemq.broker.region.Destination;
23 import org.apache.activemq.broker.region.MessageReference;
24 import org.apache.activemq.broker.region.SubscriptionRecovery;
25 import org.apache.activemq.broker.region.Topic;
26 import org.apache.activemq.command.*;
27 import org.apache.activemq.util.IdGenerator;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30
31 import javax.jms.JMSException JavaDoc;
32 import javax.jms.Message JavaDoc;
33 import javax.jms.MessageListener JavaDoc;
34 import java.util.concurrent.atomic.AtomicLong JavaDoc;
35
36 /**
37  * This implementation of {@link SubscriptionRecoveryPolicy} will perform a user
38  * specific query mechanism to load any messages they may have missed.
39  *
40  * @org.apache.xbean.XBean
41  *
42  * @version $Revision: 511045 $
43  */

44 public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
45
46     private static final Log log = LogFactory.getLog(QueryBasedSubscriptionRecoveryPolicy.class);
47
48     private MessageQuery query;
49     private AtomicLong JavaDoc messageSequence = new AtomicLong JavaDoc(0);
50     private IdGenerator idGenerator = new IdGenerator();
51     private ProducerId producerId = createProducerId();
52
53     public SubscriptionRecoveryPolicy copy() {
54         QueryBasedSubscriptionRecoveryPolicy rc = new QueryBasedSubscriptionRecoveryPolicy();
55         rc.setQuery(query);
56         return rc;
57     }
58
59     public boolean add(ConnectionContext context, MessageReference message) throws Exception JavaDoc {
60         return query.validateUpdate(message.getMessage());
61     }
62
63     public void recover(final ConnectionContext context,final Topic topic,final SubscriptionRecovery sub)
64             throws Exception JavaDoc{
65         if(query!=null){
66             ActiveMQDestination destination=sub.getActiveMQDestination();
67             query.execute(destination,new MessageListener JavaDoc(){
68
69                 public void onMessage(Message JavaDoc message){
70                     dispatchInitialMessage(message,topic,context,sub);
71                 }
72             });
73         }
74     }
75
76     public void start() throws Exception JavaDoc {
77         if (query == null) {
78             throw new IllegalArgumentException JavaDoc("No query property configured");
79         }
80     }
81
82     public void stop() throws Exception JavaDoc {
83     }
84
85     public MessageQuery getQuery() {
86         return query;
87     }
88
89     /**
90      * Sets the query strategy to load initial messages
91      */

92     public void setQuery(MessageQuery query) {
93         this.query = query;
94     }
95     
96     public org.apache.activemq.command.Message[] browse(ActiveMQDestination dest) throws Exception JavaDoc{
97         return new org.apache.activemq.command.Message[0];
98     }
99
100     protected void dispatchInitialMessage(Message JavaDoc message, Destination regionDestination, ConnectionContext context, SubscriptionRecovery sub) {
101         try {
102             ActiveMQMessage activeMessage = ActiveMQMessageTransformation.transformMessage(message, null);
103             ActiveMQDestination destination = activeMessage.getDestination();
104             if (destination == null) {
105                 destination = sub.getActiveMQDestination();
106                 activeMessage.setDestination(destination);
107             }
108             activeMessage.setRegionDestination(regionDestination);
109             configure(activeMessage);
110             sub.addRecoveredMessage(context,activeMessage);
111         }
112         catch (Throwable JavaDoc e) {
113             log.warn("Failed to dispatch initial message: " + message + " into subscription. Reason: " + e, e);
114         }
115     }
116
117     protected void configure(ActiveMQMessage msg) throws JMSException JavaDoc {
118         long sequenceNumber = messageSequence.incrementAndGet();
119         msg.setMessageId(new MessageId(producerId, sequenceNumber));
120         msg.onSend();
121         msg.setProducerId(producerId);
122     }
123
124     protected ProducerId createProducerId() {
125         String JavaDoc id = idGenerator.generateId();
126         ConnectionId connectionId = new ConnectionId(id);
127         SessionId sessionId = new SessionId(connectionId, 1);
128         return new ProducerId(sessionId, 1);
129     }
130 }
131
Popular Tags