KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > kernel > services > topics > VirtualTopicManager


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Amir Shevat.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 /*
47  * Created on Feb 2, 2004
48  * Manta LTD
49  */

50 package org.mr.kernel.services.topics;
51
52 import java.io.File JavaDoc;
53 import java.io.IOException JavaDoc;
54 import java.util.HashMap JavaDoc;
55 import java.util.Hashtable JavaDoc;
56 import java.util.Iterator JavaDoc;
57 import java.util.Map JavaDoc;
58 import java.util.StringTokenizer JavaDoc;
59 import java.util.Vector JavaDoc;
60
61 import javax.jms.JMSException JavaDoc;
62 import javax.management.NotCompliantMBeanException JavaDoc;
63
64 import org.apache.commons.logging.Log;
65 import org.apache.commons.logging.LogFactory;
66 import org.mr.MantaAgent;
67 import org.mr.MantaAgentConstants;
68 import org.mr.core.configuration.ConfigManager;
69 import org.mr.core.log.StartupLogger;
70 import org.mr.core.protocol.MantaBusMessage;
71 import org.mr.core.util.StringUtils;
72 import org.mr.core.persistent.PersistentMap;
73 import org.mr.core.persistent.PersistentManagerFactory;
74 import org.mr.kernel.services.MantaService;
75 import org.mr.kernel.services.ServiceConsumer;
76 import org.mr.kernel.services.ServiceProducer;
77 import org.mr.kernel.world.WorldModeler;
78
79 /**
80  * Created Feb 2, 2004
81  * Ver 1.0
82  * @author Amir Shevat
83  * VirtualTopicManager is a holder of active topics
84  *
85  */

86 public class VirtualTopicManager {
87
88     public static String JavaDoc HIERARCHICAL_TOPIC_DELIMITER = "/";
89     public static String JavaDoc HIERARCHICAL_TOPIC_CURRENT = "#";
90     public static String JavaDoc HIERARCHICAL_TOPIC_ANY = "*";
91     public static String JavaDoc HIERARCHICAL_TOPIC_CURRENT_ESCAPE = "#";
92     public static String JavaDoc HIERARCHICAL_TOPIC_ANY_ESCAPE = "\\*";
93     String JavaDoc regexpCurrent = "[^/]*/?";
94     String JavaDoc regexpAny = ".*";
95
96     // these characters have special meaning in regex so they need to be
97
// escaped when checking the validity of the wildcards.
98
private static final String JavaDoc metaChars = "\\,(,[,{,^,$,|,),?,*,+,.";
99
100     public static String JavaDoc wildcardMisuseLeft1 = ".*/[^/]+";
101     public static String JavaDoc wildcardMisuseLeft2 = ".*";
102     public static String JavaDoc wildcardMisuseRight1 = ".*/";
103     public static String JavaDoc wildcardMisuseRight2 = "[^/]+.*";
104
105     // holds all topics
106
//private Map topicServices = new HashMap();
107
private Hashtable JavaDoc topicServices = new Hashtable JavaDoc();
108     // holds hirarchical topics
109
private Hashtable JavaDoc hierarchicalTopicServices = new Hashtable JavaDoc();
110     // holds hirarchical Topic Subscribers
111
//Aviad changed the data structure to persistent map - in order to preserve
112
//wild card subscribers
113
// private Vector hierarchicalTopicSubscribers = new Vector();
114
private PersistentMap hierarchicalTopicSubscribers = null;
115     private Object JavaDoc hierarchicalSynchObj = new Object JavaDoc();
116     // logger
117
public Log log = null;
118
119     private boolean reportErrors = true;
120
121     // load configuration values
122
static {
123
124     }
125
126
127     public VirtualTopicManager() {
128         super();
129         log = LogFactory.getLog("VirtualTopicManager");
130         loadConfig();
131     }
132
133
134     // called whenever a topic is being generated to check validity
135
public void validateTopicName(String JavaDoc name) throws JMSException JavaDoc {
136         Log myLog = LogFactory.getLog("VirtualTopicManager");
137         if (name.startsWith(HIERARCHICAL_TOPIC_DELIMITER)) {
138             if (name.indexOf(HIERARCHICAL_TOPIC_DELIMITER+HIERARCHICAL_TOPIC_DELIMITER) != -1) {
139                 String JavaDoc msg = "Illegal topic name format: "+name;
140                 StartupLogger.log.error(msg, "VirtualTopicManager");
141                 throw new JMSException JavaDoc(msg);
142             }
143             if (!isLegalWildcardUsage(name)) {
144                 String JavaDoc msg = "Illegal topic name format: "+name;
145                 StartupLogger.log.error(msg, "VirtualTopicManager");
146                 throw new JMSException JavaDoc(msg);
147             }
148         }
149         else {
150             if (isWildCardTopic(name)) {
151                 String JavaDoc msg = "Illegal use of topic hierarchy delimiter in a non hierarchial topic: "+name;
152                 StartupLogger.log.error(msg, "VirtualTopicManager");
153                 throw new JMSException JavaDoc(msg);
154             }
155         }
156     }
157
158
159     // Loads the configuration parameters from the configuration file.
160
// If the file doesn't exist a message is written to the console.
161
private void loadConfig() {
162
163         ConfigManager cm =MantaAgent.getInstance().getSingletonRepository().getConfigManager();
164         HIERARCHICAL_TOPIC_ANY = cm.getStringProperty("jms.TopicHierarchyWildcardAny", "*");
165         HIERARCHICAL_TOPIC_CURRENT = cm.getStringProperty("jms.TopicHierarchyWildcardCurrent", "#");
166         HIERARCHICAL_TOPIC_DELIMITER = cm.getStringProperty("jms.TopicHierarchyDelimiter", "/");
167         HIERARCHICAL_TOPIC_ANY_ESCAPE = escapeWildcards(HIERARCHICAL_TOPIC_ANY);
168         HIERARCHICAL_TOPIC_CURRENT_ESCAPE = escapeWildcards(HIERARCHICAL_TOPIC_CURRENT);
169
170     }
171
172     // when checking the validity of the hierarchy wildcards, we need to
173
// escape characters with special meanings.
174
private static String JavaDoc escapeWildcards(String JavaDoc wildCard) {
175         int length = metaChars.length();
176         StringTokenizer JavaDoc st = new StringTokenizer JavaDoc(metaChars, ",");
177         while (st.hasMoreTokens()) {
178             String JavaDoc metaChar = st.nextToken();
179             if (metaChar.equals("\\")) {
180                 wildCard = wildCard.replaceAll("\\\\", "\\\\\\\\");
181             }
182             else {
183                 wildCard = wildCard.replaceAll("\\"+metaChar, "\\\\"+metaChar);
184             }
185         }
186         return wildCard;
187     }
188
189     // Returns true is file or folder exists else false
190
private static boolean fileOrFolderExists(String JavaDoc fileOrFolderName) {
191         File JavaDoc f = new File JavaDoc(fileOrFolderName);
192         return f.exists();
193     }
194
195     // returns true if wildcards appear in the topic name
196
public static final boolean isWildCardTopic(String JavaDoc topicName){
197         return (topicName.indexOf(HIERARCHICAL_TOPIC_CURRENT)!= -1 ||
198                 topicName.indexOf(HIERARCHICAL_TOPIC_ANY)!= -1);
199     }
200
201     // Returns true if usage of wildcards is legal. Checks both current and any.
202
// Illegal use can be: /a/b*/
203
private static boolean isLegalWildcardUsage(String JavaDoc topicName) {
204         return isLegalWildcardUsage(topicName, HIERARCHICAL_TOPIC_CURRENT_ESCAPE) &&
205                isLegalWildcardUsage(topicName, HIERARCHICAL_TOPIC_ANY_ESCAPE);
206     }
207
208     // Returns true if usage of the wildcard is legal.
209
// Illegal use can be: /a/b*/
210
private static boolean isLegalWildcardUsage(String JavaDoc topicName, String JavaDoc wildcard) {
211         String JavaDoc patternLeft = wildcardMisuseLeft1+wildcard+wildcardMisuseLeft2;
212         String JavaDoc patternRight = wildcardMisuseRight1+wildcard+wildcardMisuseRight2;
213         return !topicName.matches(patternLeft) && !topicName.matches(patternRight);
214     }
215
216     // Write errors to the log
217
private void reportErrors() {
218         if (!log.isErrorEnabled())
219             return;
220
221         boolean reportWildcardsEqual = HIERARCHICAL_TOPIC_ANY.equals(HIERARCHICAL_TOPIC_CURRENT);
222         boolean reportDelimiterEqualsCurrent = HIERARCHICAL_TOPIC_DELIMITER.equals(HIERARCHICAL_TOPIC_CURRENT);
223         boolean reportDelimiterEqualsAny = HIERARCHICAL_TOPIC_DELIMITER.equals(HIERARCHICAL_TOPIC_ANY);
224         if (reportWildcardsEqual)
225             log.error("Topic Hierarchy: The wildcards for both lateral and recursive inclusion are the same: "+HIERARCHICAL_TOPIC_CURRENT);
226         if (reportDelimiterEqualsCurrent)
227             log.error("Topic Hierarchy: The delimiter and the wildcard for lateral inclusion are the same: "+HIERARCHICAL_TOPIC_CURRENT);
228         if (reportDelimiterEqualsAny)
229             log.error("Topic Hierarchy: The delimiter and the wildcard for recursive inclusion are the same: "+HIERARCHICAL_TOPIC_ANY);
230     }
231
232
233     /**
234      * @param topic
235      * @return
236      */

237     public TopicService getTopicService(String JavaDoc topicName) {
238         TopicService result = null;
239         // check in the non-hierarchial topic list
240
result = (TopicService)topicServices.get(topicName);
241         if (result != null)
242             return result;
243
244         // check in the hierarchial topic list
245
result = (TopicService)hierarchicalTopicServices.get(topicName);
246         if (result != null)
247             return result;
248
249         if (!topicName.startsWith(HIERARCHICAL_TOPIC_DELIMITER)) {
250             // This is a new non-hierarcial topic name.
251
// Create a new topic and save it in the non-hierarchial topics map.
252
return createNonHierarchyTopic(topicName);
253         }
254
255         // Report here because whilw the constructor ran the logger was
256
// not yet loaded, and only if we are here the wildcards matter.
257
// Error report availability if check above.
258
if (reportErrors) {
259             reportErrors();
260             reportErrors = false;
261         }
262
263         // this is a hirarchial topic
264
if (isWildCardTopic(topicName)) {
265             if (!isLegalWildcardUsage(topicName)) {
266                 if (log.isErrorEnabled()) {
267                     log.error("Illegal use of topic hierarchy wildcard while adding a topic. Hierarchy="+topicName);
268                 }
269             }
270
271             // We don't have a specific topic to return if the name contains wildcards.
272
return null;
273         }
274         // This is a new hierarcial topic name with no wildcards.
275
// Create a new topic and save it in the hierarch topics map.
276
return createHierarchyTopic(topicName);
277     }
278
279     // creates a topic and adds it to the map
280
private TopicService createNonHierarchyTopic(String JavaDoc topicName) {
281         WorldModeler world = MantaAgent.getInstance().
282                                     getSingletonRepository().getWorldModeler();
283         TopicService service = (TopicService)world.getService(world.getDefaultDomainName(),
284                                                               topicName,
285                                                               MantaService.SERVICE_TYPE_TOPIC);
286         if (service != null) {
287             topicServices.put(topicName, service);
288             if (log.isDebugEnabled()) {
289                 log.debug("Added topic "+topicName);
290             }
291         }
292         return service;
293     }
294
295     // creates a topic and adds it to the map
296
private TopicService createHierarchyTopic(String JavaDoc topicName) {
297         WorldModeler world = MantaAgent.getInstance().
298                                     getSingletonRepository().getWorldModeler();
299         TopicService service = (TopicService)world.getService(world.getDefaultDomainName(),
300                                                               topicName,
301                                                               MantaService.SERVICE_TYPE_TOPIC);
302         if (service != null) {
303             hierarchicalTopicServices.put(topicName, service);
304             registerHierarchyConsumers(service);
305             if (log.isDebugEnabled()) {
306                 log.debug("Added topic "+topicName);
307             }
308         }
309         return service;
310     }
311
312     // register hierarchy consumers that match expression to the topic
313
private void registerHierarchyConsumers(TopicService service) {
314         ServiceConsumer subscriber;
315         String JavaDoc subscribedTo, pattern;
316
317         String JavaDoc serviceName = service.getServiceName();
318         initHierarchialTopicSubscribers();
319         synchronized (hierarchicalTopicSubscribers) {
320             Iterator JavaDoc subscribers = hierarchicalTopicSubscribers.keySet().iterator();
321             while (subscribers.hasNext()) {
322
323                 subscriber = (ServiceConsumer) hierarchicalTopicSubscribers.get(subscribers.next());
324                 subscribedTo = subscriber.getServiceName();
325                 pattern = StringUtils.replace(subscribedTo,
326                                               HIERARCHICAL_TOPIC_ANY,
327                                               regexpAny);
328                 pattern = StringUtils.replace(pattern,
329                                               HIERARCHICAL_TOPIC_CURRENT,
330                                               regexpCurrent);
331                 if (serviceName.matches(pattern)) {
332                     service.addConsumer(subscriber);
333                     //the following if was added by Lital
334
if (subscriber.getAgentName().equals(MantaAgent.getInstance().getAgentName())) {
335                         MantaAgent.getInstance().getSingletonRepository().
336                             getWorldModeler().addConsumedServices(service);
337                     }
338                 }
339             }// while
340
}
341     }
342
343     /**
344      * this method is used to restore all relevant topic service
345      * for a specific wild card subscriber
346      * @param wildCardTopicName
347      */

348     private void restoreHierarchies(String JavaDoc wildCardTopicName) {
349         String JavaDoc topicNames[] = PersistentManagerFactory.getAllServices();
350         for (int i = 0 ; i < topicNames.length; i++) {
351             if (topicNames[i].matches(wildCardTopicName)) {
352                 this.getTopicService(topicNames[i]);
353             }
354         }
355     }
356
357     /**
358      * becuse of herarchial topics the method call to addConsumer was moved from
359      * the topic to here
360      * @param consumer that wants to register to a topic/s
361      */

362     public synchronized void addConsumer(ServiceConsumer consumer){
363         String JavaDoc serviceName = consumer.getServiceName();
364         TopicService service;
365
366         service = this.getTopicService(serviceName);
367         if (service != null) {
368             service.addConsumer(consumer);
369             // the following if was added by Lital
370
if (consumer.getAgentName().equals(MantaAgent.getInstance().getAgentName())) {
371                 MantaAgent.getInstance().getSingletonRepository().
372                     getWorldModeler().addConsumedServices(service);
373             }
374             return;
375         }
376
377         // The consumer wants to register to a hierarchial topic containing wildcards.
378
// In this case we go through all existing topics that matches the consumer's
379
// regular expression, and add the consumer to the topic's consumers list.
380
String JavaDoc pattern;
381         pattern = StringUtils.replace(serviceName,
382                                       HIERARCHICAL_TOPIC_ANY,
383                                       regexpAny);
384         pattern = StringUtils.replace(pattern,
385                                       HIERARCHICAL_TOPIC_CURRENT,
386                                       regexpCurrent);
387         Map JavaDoc hierarchicalServices = null;
388         restoreHierarchies(pattern);
389         synchronized (hierarchicalTopicServices) {
390             hierarchicalServices = new HashMap JavaDoc(hierarchicalTopicServices);
391         }
392         Iterator JavaDoc topics = hierarchicalServices.keySet().iterator();
393         while (topics.hasNext()) {
394             service = (TopicService)hierarchicalServices.get(topics.next());
395             if (service.getServiceName().matches(pattern)) {
396                 service.addConsumer(consumer);
397                 //the following if was added by Lital
398
if (consumer.getAgentName().equals(MantaAgent.getInstance().getAgentName())) {
399                     MantaAgent.getInstance().getSingletonRepository().
400                         getWorldModeler().addConsumedServices(service);
401                 }
402             }
403         }
404         initHierarchialTopicSubscribers();
405         hierarchicalTopicSubscribers.put(consumer.getId(),consumer,consumer.isDurable());
406     }//addConsumer
407

408     private void initHierarchialTopicSubscribers() {
409         if (hierarchicalTopicSubscribers == null) {
410             synchronized(hierarchicalSynchObj) {
411                 if (hierarchicalTopicSubscribers == null) {
412                     hierarchicalTopicSubscribers = new PersistentMap("wildcard_map", false, true);
413                 }
414             }
415         }
416     }
417
418     /**
419      * remove producer refrences when recalled on a remote layer
420      * @param producer
421      */

422     //Aviad - i dont think this should be synchronized
423
public synchronized void removeProducer(ServiceProducer producer){
424         String JavaDoc serviceName = producer.getServiceName();
425         TopicService service;
426
427         // The producer tries to unregister from a non-hierarcial topic.
428
service = (TopicService)topicServices.get(serviceName);
429         if (service != null) {
430             service.removeProducer(producer);
431             // the following if was added by Lital
432
if (producer.getAgentName().equals(MantaAgent.getInstance().getAgentName()) &&
433                 service.getProducersByAgentId(MantaAgent.getInstance().getAgentName()).isEmpty()) {
434                 MantaAgent.getInstance().getSingletonRepository().
435                                 getWorldModeler().removeProducedService(service);
436             }
437             return;
438         }
439
440         // The producer tries to unregister from a non-hierarcial topic.
441
service = (TopicService)hierarchicalTopicServices.get(serviceName);
442         if (service != null) {
443             service.removeProducer(producer);
444             // the following if was added by Lital
445
if (producer.getAgentName().equals(MantaAgent.getInstance().getAgentName()) &&
446                 service.getProducersByAgentId(MantaAgent.getInstance().getAgentName()).isEmpty()) {
447                 MantaAgent.getInstance().getSingletonRepository().
448                                 getWorldModeler().removeProducedService(service);
449             }
450             return;
451         }
452     }
453
454     /**
455      * overload super removeConsumer in order to same durable consumer
456      */

457     public synchronized void removeConsumer(ServiceConsumer consumer){
458         String JavaDoc serviceName = consumer.getServiceName();
459         TopicService service;
460
461         // The consumer tries to unregister from a non-hierarcial topic.
462
service = (TopicService)topicServices.get(serviceName);
463         if (service != null) {
464             service.removeConsumer(consumer);
465             // the following if was added by Lital
466
if (consumer.getAgentName().equals(MantaAgent.getInstance().getAgentName()) &&
467                 service.getConsumersByAgentId(MantaAgent.getInstance().getAgentName()).isEmpty()) {
468                 MantaAgent.getInstance().getSingletonRepository().
469                                 getWorldModeler().removeConsumedServices(service);
470             }
471             return;
472         }
473
474         // The consumer tries to unregister from a non-hierarcial topic.
475
service = (TopicService)hierarchicalTopicServices.get(serviceName);
476         if (service != null) {
477             service.removeConsumer(consumer);
478             // the following if was added by Lital
479
if (consumer.getAgentName().equals(MantaAgent.getInstance().getAgentName()) &&
480                 service.getConsumersByAgentId(MantaAgent.getInstance().getAgentName()).isEmpty()) {
481                 MantaAgent.getInstance().getSingletonRepository().
482                                 getWorldModeler().removeConsumedServices(service);
483             }
484             return;
485         }
486         initHierarchialTopicSubscribers();
487         if (hierarchicalTopicSubscribers.remove(consumer.getId()) != null) {
488             // That means the consumer is registered with wild cards.
489
// Remove it from all topics that match the hierarchy.
490
String JavaDoc pattern;
491             pattern = StringUtils.replace(serviceName,
492                                           HIERARCHICAL_TOPIC_ANY,
493                                           regexpAny);
494             pattern = StringUtils.replace(pattern,
495                                           HIERARCHICAL_TOPIC_CURRENT,
496                                           regexpCurrent);
497             synchronized (hierarchicalTopicServices) {
498                 Iterator JavaDoc topics = hierarchicalTopicServices.keySet().iterator();
499                 while (topics.hasNext()) {
500                     service = (TopicService)hierarchicalTopicServices.get(topics.next());
501                     if (service.getServiceName().matches(pattern)) {
502                         service.removeConsumer(consumer);
503                         // the following if was added by Lital
504
if (consumer.getAgentName().equals(MantaAgent.getInstance().getAgentName()) &&
505                             service.getConsumersByAgentId(MantaAgent.getInstance().getAgentName()).isEmpty()) {
506                             MantaAgent.getInstance().getSingletonRepository().
507                                 getWorldModeler().removeConsumedServices(service);
508                         }
509                     }
510                 }
511             }
512         }
513     }
514
515
516     // returns true if the topic exists
517
public boolean hasTopic(String JavaDoc topic){
518         return topicServices.get(topic) != null ||
519                hierarchicalTopicServices.get(topic) != null;
520
521     }
522
523     // removes the topic from the topic list
524
public void closeTopic(String JavaDoc service) {
525         Object JavaDoc removed = topicServices.remove(service);
526         if (removed == null) {
527             removed = hierarchicalTopicServices.remove(service);
528         }
529         if (removed != null && log.isDebugEnabled()) {
530             log.debug("Deleted topic "+service);
531         }
532     }
533
534     // publishes a message to a topic
535
public void publish(String JavaDoc serviceName,
536                         MantaBusMessage message,
537                         ServiceProducer producer,
538                         byte deliveryMode,
539                         byte priority,
540                         long expiration) throws IOException JavaDoc {
541         getTopicService(serviceName).publish(message,producer,
542                                              deliveryMode,
543                                              priority,
544                                              expiration);
545     }
546
547     // removes a durable subscriber from a topic
548
public void removeDurableConsumer(String JavaDoc serviceName, ServiceConsumer consumer) {
549         if (hasTopic(serviceName)) {
550             TopicService service = getTopicService(serviceName);
551             service.removeDurableConsumer(consumer);
552         }
553         else {
554             if (log.isDebugEnabled()) {
555                 log.debug("Removing a durable consumer faild. Service not found: "+serviceName);
556             }
557         }
558     }
559 }
560
Popular Tags