KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > AbstractSubscription


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.region;
19
20 import javax.jms.InvalidSelectorException JavaDoc;
21 import javax.jms.JMSException JavaDoc;
22 import javax.management.ObjectName JavaDoc;
23
24 import java.io.IOException JavaDoc;
25
26 import org.apache.activemq.broker.Broker;
27 import org.apache.activemq.broker.ConnectionContext;
28 import org.apache.activemq.command.ActiveMQDestination;
29 import org.apache.activemq.command.ConsumerId;
30 import org.apache.activemq.command.ConsumerInfo;
31 import org.apache.activemq.filter.BooleanExpression;
32 import org.apache.activemq.filter.DestinationFilter;
33 import org.apache.activemq.filter.LogicExpression;
34 import org.apache.activemq.filter.MessageEvaluationContext;
35 import org.apache.activemq.filter.NoLocalExpression;
36 import org.apache.activemq.selector.SelectorParser;
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39
40 import java.util.concurrent.CopyOnWriteArrayList JavaDoc;
41
42 abstract public class AbstractSubscription implements Subscription {
43     
44     static private final Log log = LogFactory.getLog(AbstractSubscription.class);
45     
46     protected Broker broker;
47     protected ConnectionContext context;
48     protected ConsumerInfo info;
49     final protected DestinationFilter destinationFilter;
50     private BooleanExpression selectorExpression;
51     private ObjectName JavaDoc objectName;
52    
53     final protected CopyOnWriteArrayList JavaDoc destinations = new CopyOnWriteArrayList JavaDoc();
54
55     public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException JavaDoc {
56         this.broker = broker;
57         this.context = context;
58         this.info = info;
59         this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
60         this.selectorExpression = parseSelector(info);
61     }
62     
63     static private BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException JavaDoc {
64         BooleanExpression rc=null;
65         if( info.getSelector() !=null ) {
66             rc = new SelectorParser().parse(info.getSelector());
67         }
68         if( info.isNoLocal() ) {
69             if( rc == null ) {
70                 rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
71             } else {
72                 rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
73             }
74         }
75         if( info.getAdditionalPredicate() != null ) {
76             if( rc == null ) {
77                 rc = info.getAdditionalPredicate();
78             } else {
79                 rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
80             }
81         }
82         return rc;
83     }
84
85     public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException JavaDoc {
86         ConsumerId targetConsumerId = node.getTargetConsumerId();
87         if ( targetConsumerId!=null) {
88             if( !targetConsumerId.equals(info.getConsumerId()) )
89                 return false;
90         }
91         try {
92             return (selectorExpression == null || selectorExpression.matches(context)) && this.context.isAllowedToConsume(node);
93         } catch (JMSException JavaDoc e) {
94             log.info("Selector failed to evaluate: " + e.getMessage(), e);
95             return false;
96         }
97     }
98     
99     public boolean matches(ActiveMQDestination destination) {
100         return destinationFilter.matches(destination);
101     }
102
103     public void add(ConnectionContext context, Destination destination) throws Exception JavaDoc {
104         destinations.add(destination);
105     }
106
107     public void remove(ConnectionContext context, Destination destination) throws Exception JavaDoc {
108         destinations.remove(destination);
109     }
110     
111     public ConsumerInfo getConsumerInfo() {
112         return info;
113     }
114     
115     public void gc() {
116     }
117     
118     public boolean isSlaveBroker(){
119         return broker.isSlaveBroker();
120     }
121
122     public ConnectionContext getContext() {
123         return context;
124     }
125
126     public ConsumerInfo getInfo() {
127         return info;
128     }
129
130     public BooleanExpression getSelectorExpression() {
131         return selectorExpression;
132     }
133     
134     public String JavaDoc getSelector() {
135         return info.getSelector();
136     }
137     
138     public void setSelector(String JavaDoc selector) throws InvalidSelectorException JavaDoc {
139         ConsumerInfo copy = info.copy();
140         copy.setSelector(selector);
141         BooleanExpression newSelector = parseSelector(copy);
142         // its valid so lets actually update it now
143
info.setSelector(selector);
144         this.selectorExpression = newSelector;
145     }
146
147     public ObjectName JavaDoc getObjectName() {
148         return objectName;
149     }
150
151     public void setObjectName(ObjectName JavaDoc objectName) {
152         this.objectName = objectName;
153     }
154     
155     public int getPrefetchSize() {
156         return info.getPrefetchSize();
157     }
158     
159     public boolean isRecoveryRequired(){
160         return true;
161     }
162     
163     public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception JavaDoc{
164         boolean result = false;
165         MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
166         try {
167             msgContext.setDestination(message.getRegionDestination().getActiveMQDestination());
168             msgContext.setMessageReference(message);
169             result = matches(message,msgContext);
170             if (result) {
171                 doAddRecoveredMessage(message);
172             }
173             
174         }finally {
175             msgContext.clear();
176         }
177         return result;
178     }
179     
180     public ActiveMQDestination getActiveMQDestination() {
181         return info != null ? info.getDestination() : null;
182     }
183     
184     protected void doAddRecoveredMessage(MessageReference message) throws Exception JavaDoc {
185         add(message);
186     }
187 }
188
Popular Tags