KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > kernel > services > topics > TopicService


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Amir Shevat.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 /*
47  * Created on Feb 2, 2004
48  * Manta LTD
49  */

50 package org.mr.kernel.services.topics;
51
52 import java.io.IOException JavaDoc;
53 import java.util.ArrayList JavaDoc;
54 import java.util.Iterator JavaDoc;
55 import java.util.List JavaDoc;
56
57 import org.apache.commons.logging.Log;
58 import org.apache.commons.logging.LogFactory;
59
60 import org.mr.MantaAgent;
61 import org.mr.MantaAgentConstants;
62 import org.mr.MantaException;
63 import org.mr.kernel.delivery.PostOffice;
64 import org.mr.kernel.delivery.PostOfficeBox;
65 import org.mr.kernel.services.MantaService;
66 import org.mr.kernel.services.PayLoadSelector;
67 import org.mr.kernel.services.SelectorsManager;
68 import org.mr.kernel.services.ServiceActorControlCenter;
69 import org.mr.kernel.services.ServiceConsumer;
70 import org.mr.kernel.services.ServiceProducer;
71 import org.mr.core.configuration.ConfigManager;
72 import org.mr.core.persistent.PersistentMap;
73 import org.mr.core.persistent.PersistentManager;
74 import org.mr.core.protocol.MantaBusMessage;
75 import org.mr.core.protocol.MantaBusMessageConsts;
76 import org.mr.core.protocol.RecipientAddress;
77 import org.mr.core.util.StringUtils;
78
79 /**
80  * Created Feb 2, 2004
81  * Ver 1.0
82  * TopicService - is a meny to meny messageing service
83  * where N consumers listen to a topic and N producers send messages on the topic
84  * all messages sent by the producers are received by the consumers (see JMS spec)
85  * @author Amir Shevat
86  *
87  *
88  */

89  class TopicService extends MantaService implements TopicServiceMBean{
90
91     private static String JavaDoc HIERARCHY_DELIMITER = "~";
92     PersistentMap subscribers;
93     protected Log log;
94     private boolean pause = false;
95
96     static {
97         ConfigManager config = MantaAgent.getInstance().getSingletonRepository().getConfigManager();
98         HIERARCHY_DELIMITER = config.getStringProperty("persistency.hierarchy_delimiter", HIERARCHY_DELIMITER);
99     }
100     /**
101      *
102      * Constractor for TopicService
103      * @param serviceName the name of the topic
104      */

105     public TopicService(String JavaDoc serviceName) {
106         super(serviceName);
107         log=LogFactory.getLog("Topic:"+serviceName);
108         subscribers = new PersistentMap(PersistentManager.SUBSCRIBERS_PERSISTENT_PREFIX + cleanupServiceName(serviceName), false, true);
109         ArrayList JavaDoc subscribersList = new ArrayList JavaDoc();
110         synchronized(subscribers){
111             subscribersList.addAll(subscribers.values());
112         }
113         int size = subscribersList.size();
114         for (int i = 0; i < size; i++) {
115             ServiceConsumer durable = (ServiceConsumer) subscribersList.get(i);
116             consumers.add(durable);
117             serviceActorMap.put(durable.getId() ,durable );
118         }
119         try {
120             if(!serviceName.startsWith(VirtualTopicManager.HIERARCHICAL_TOPIC_DELIMITER)){
121                 MantaAgent.getInstance().getSingletonRepository().getMantaJMXManagment().addManagedObject(this, "MantaRay:topic="+this.getServiceName());
122             }
123
124         } catch (MantaException e) {
125             if(log.isErrorEnabled()){
126                 log.error("Could not create the JMX MBean.",e);
127             }
128         }
129     }
130
131
132     private String JavaDoc cleanupServiceName(String JavaDoc serviceName){
133         String JavaDoc result = serviceName;
134         result = StringUtils.replace(result,VirtualTopicManager.HIERARCHICAL_TOPIC_DELIMITER, HIERARCHY_DELIMITER);
135         return result;
136     }
137
138     /**
139      * Paused the topic. Pausing means producers will still be able to produce
140      * messages, but those messages will be held by the producers and not sent
141      * to the consumers until the topic is resumed again.
142      */

143     public synchronized void pause(){
144         if(pause ==false){
145             PostOffice po= MantaAgent.getInstance().getSingletonRepository()
146             .getPostOffice();
147             List JavaDoc consumers = this.getConsumers();
148             Iterator JavaDoc consumersIter = consumers.iterator();
149             while(consumersIter.hasNext()){
150                 ServiceConsumer sub = (ServiceConsumer) consumersIter.next();
151                 PostOfficeBox pob = po.getPostOfficeBox(sub.getId());
152                 pob.pause();
153             }
154             pause = true;
155         }
156
157     }
158
159     /**
160      * @return true if the queue is paused at the moment, false otherwise
161      */

162     public boolean isPaused(){
163         return pause;
164     }
165     /**
166      * resumes a topic that was paused.
167      */

168     public synchronized void resume(){
169         if(pause !=false){
170             PostOffice po= MantaAgent.getInstance().getSingletonRepository()
171             .getPostOffice();
172             List JavaDoc consumers = this.getConsumers();
173             Iterator JavaDoc consumersIter = consumers.iterator();
174             while(consumersIter.hasNext()){
175                 ServiceConsumer sub = (ServiceConsumer) consumersIter.next();
176                 PostOfficeBox pob = po.getPostOfficeBox(sub.getId());
177                 pob.resume();
178             }
179             pause = false;
180         }
181
182     }
183
184     /**
185      * Purges all the messages currently held by the topic producers. This method
186      * can be operated only after the topic has been paused.
187      */

188     public void purge(){
189         PostOffice po= MantaAgent.getInstance().getSingletonRepository()
190         .getPostOffice();
191         List JavaDoc consumers = this.getConsumers();
192         Iterator JavaDoc consumersIter = consumers.iterator();
193         while(consumersIter.hasNext()){
194             ServiceConsumer sub = (ServiceConsumer) consumersIter.next();
195             PostOfficeBox pob = po.getPostOfficeBox(sub.getId());
196             pob.purge();
197         }
198     }
199
200
201
202     /**
203      * overload super addConsumer in order to same durable consumer
204      */

205     public void addConsumer(ServiceConsumer consumer){
206
207         ServiceConsumer s = (ServiceConsumer)subscribers.get(consumer.getId());
208         if (s != null) {
209             if (//s.getNoLocal() != consumer.getNoLocal() ||
210
!s.getServiceName().equals(consumer.getServiceName()) ||
211                 !checkEqual(s.getSelectorStatment(), consumer.getSelectorStatment())) {
212                 subscribers.remove(consumer.getId());
213                 super.removeConsumer(consumer);
214             }
215         }
216         subscribers.put(consumer.getId(), consumer, consumer.isDurable());
217         super.addConsumer(consumer);
218     }
219
220     private boolean checkEqual(Object JavaDoc o1, Object JavaDoc o2) {
221         if (o1 == null) {
222             return o2 == null;
223         }
224         if (o2 != null) {
225             return o1.equals(o2);
226         }
227         return false;
228     }
229
230     /**
231      * overload super removeConsumer in order to same durable consumer
232      */

233     public void removeConsumer(ServiceConsumer consumer){
234         if(!consumer.isDurable()){
235             subscribers.remove(consumer.getId());
236             super.removeConsumer(consumer);
237         }else{
238             ServiceActorControlCenter.removeUpConsumer(consumer);
239         }
240
241     }
242
243     /**
244      * @return byte SERVICE_TYPE_TOPIC
245      */

246     public byte getServiceType() {
247         return MantaService.SERVICE_TYPE_TOPIC;
248     }
249
250     /**
251      * send a message to the topic subscribes
252      * @param message the message to be sent
253      * @param producer the address info of the generator of the message
254      * @param deliveryMode see MantaBusMessageConsts for types
255      * @param priority see MantaBusMessageConsts for types
256      * @param ackType see MantaBusMessageConsts for types
257      * @param expiration after this time the message will not be sent (milli GMT)
258      * @throws IOException
259      */

260     public void publish(MantaBusMessage message, ServiceProducer producer, byte deliveryMode, byte priority, long expiration) throws IOException JavaDoc {
261
262         if(log.isDebugEnabled()){
263             log.debug("Message arrived. Message ID="+message.getMessageId());
264         }
265
266         List JavaDoc currentConsumers = new ArrayList JavaDoc();
267         synchronized (this.getConsumers()) {
268             currentConsumers.addAll(this.getConsumers());
269         }
270         int size = currentConsumers.size();
271         // if no one is listening to this topic we should not send it
272
if(size == 0 ){
273             if(log.isDebugEnabled()){
274                 log.debug("No consumer found for message "+message.getMessageId()+". The message was not sent");
275             }
276             return;
277         }
278         // prepere the selecotor objects
279
boolean sentToConsumer =true;
280         SelectorsManager manager = MantaAgent.getInstance().getSingletonRepository().getSelectorsManager();
281         String JavaDoc payloadType =message.getHeader(MantaBusMessageConsts.HEADER_NAME_PAYLOAD_TYPE);
282         PayLoadSelector select = manager.getSelector(payloadType);
283
284         // we have multi consumers create the list
285
for (int i = 0; i < size; i++) {
286             sentToConsumer =true;
287             ServiceConsumer consumer = (ServiceConsumer) currentConsumers.get(i);
288             if(consumer == null){
289                 continue;
290             }
291             // fillter consumers
292
if(select!= null){
293                 //here we look at the payload select a payload selector that will
294
// inspect the payload with the consumer select statment
295
sentToConsumer =select.accept(consumer.getSelectorStatment() ,message );
296             }
297             // if ok send to consumer
298
if(sentToConsumer){
299                 MantaBusMessage copy;
300                 if(size ==1){
301                     copy = message;
302                 }else{
303                     copy = PostOffice.prepareMessageShallowCopy(message);
304                 }
305                 RecipientAddress adderss = consumer;
306                 copy.setRecipient(adderss);
307                 copy.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION ,consumer.getId() );
308                 // if message or topic are persistent the message is persistent
309
if(this.getPersistentMode() == MantaAgentConstants.PERSISTENT)
310                     deliveryMode = MantaAgentConstants.PERSISTENT;
311
312                 if(log.isDebugEnabled()){
313                     log.debug("Sending message "+copy.getMessageId()+" to consumer "+consumer.getId());
314                 }
315                 // send to regular send message
316
MantaAgent.getInstance().send(copy,producer , deliveryMode , priority, expiration );
317             }
318         }
319
320
321     }//publish
322

323
324
325     /**
326      * calls super.removeProducer(producer)
327      * and then check if this topic is no longger needed, if so removes it
328      */

329     public void removeProducer(ServiceProducer producer){
330         super.removeProducer(producer);
331     }
332
333
334     public void removeDurableConsumer(ServiceConsumer consumer) {
335
336             subscribers.remove(consumer.getId());
337             super.removeConsumer(consumer);
338             ServiceActorControlCenter.removeUpConsumer(consumer);
339             MantaAgent.getInstance().getSingletonRepository().getPostOffice().closeBox(consumer);
340     }
341
342 }
343
Popular Tags