1 46 50 package org.mr.kernel.services.topics; 51 52 import java.io.File ; 53 import java.io.IOException ; 54 import java.util.HashMap ; 55 import java.util.Hashtable ; 56 import java.util.Iterator ; 57 import java.util.Map ; 58 import java.util.StringTokenizer ; 59 import java.util.Vector ; 60 61 import javax.jms.JMSException ; 62 import javax.management.NotCompliantMBeanException ; 63 64 import org.apache.commons.logging.Log; 65 import org.apache.commons.logging.LogFactory; 66 import org.mr.MantaAgent; 67 import org.mr.MantaAgentConstants; 68 import org.mr.core.configuration.ConfigManager; 69 import org.mr.core.log.StartupLogger; 70 import org.mr.core.protocol.MantaBusMessage; 71 import org.mr.core.util.StringUtils; 72 import org.mr.core.persistent.PersistentMap; 73 import org.mr.core.persistent.PersistentManagerFactory; 74 import org.mr.kernel.services.MantaService; 75 import org.mr.kernel.services.ServiceConsumer; 76 import org.mr.kernel.services.ServiceProducer; 77 import org.mr.kernel.world.WorldModeler; 78 79 86 public class VirtualTopicManager { 87 88 public static String HIERARCHICAL_TOPIC_DELIMITER = "/"; 89 public static String HIERARCHICAL_TOPIC_CURRENT = "#"; 90 public static String HIERARCHICAL_TOPIC_ANY = "*"; 91 public static String HIERARCHICAL_TOPIC_CURRENT_ESCAPE = "#"; 92 public static String HIERARCHICAL_TOPIC_ANY_ESCAPE = "\\*"; 93 String regexpCurrent = "[^/]*/?"; 94 String regexpAny = ".*"; 95 96 private static final String metaChars = "\\,(,[,{,^,$,|,),?,*,+,."; 99 100 public static String wildcardMisuseLeft1 = ".*/[^/]+"; 101 public static String wildcardMisuseLeft2 = ".*"; 102 public static String wildcardMisuseRight1 = ".*/"; 103 public static String wildcardMisuseRight2 = "[^/]+.*"; 104 105 private Hashtable topicServices = new Hashtable (); 108 private Hashtable hierarchicalTopicServices = new Hashtable (); 110 private PersistentMap hierarchicalTopicSubscribers = null; 115 private Object hierarchicalSynchObj = new Object (); 116 public Log log = null; 118 119 private boolean reportErrors = true; 120 121 static { 123 124 } 125 126 127 public VirtualTopicManager() { 128 super(); 129 log = LogFactory.getLog("VirtualTopicManager"); 130 loadConfig(); 131 } 132 133 134 public void validateTopicName(String name) throws JMSException { 136 Log myLog = LogFactory.getLog("VirtualTopicManager"); 137 if (name.startsWith(HIERARCHICAL_TOPIC_DELIMITER)) { 138 if (name.indexOf(HIERARCHICAL_TOPIC_DELIMITER+HIERARCHICAL_TOPIC_DELIMITER) != -1) { 139 String msg = "Illegal topic name format: "+name; 140 StartupLogger.log.error(msg, "VirtualTopicManager"); 141 throw new JMSException (msg); 142 } 143 if (!isLegalWildcardUsage(name)) { 144 String msg = "Illegal topic name format: "+name; 145 StartupLogger.log.error(msg, "VirtualTopicManager"); 146 throw new JMSException (msg); 147 } 148 } 149 else { 150 if (isWildCardTopic(name)) { 151 String msg = "Illegal use of topic hierarchy delimiter in a non hierarchial topic: "+name; 152 StartupLogger.log.error(msg, "VirtualTopicManager"); 153 throw new JMSException (msg); 154 } 155 } 156 } 157 158 159 private void loadConfig() { 162 163 ConfigManager cm =MantaAgent.getInstance().getSingletonRepository().getConfigManager(); 164 HIERARCHICAL_TOPIC_ANY = cm.getStringProperty("jms.TopicHierarchyWildcardAny", "*"); 165 HIERARCHICAL_TOPIC_CURRENT = cm.getStringProperty("jms.TopicHierarchyWildcardCurrent", "#"); 166 HIERARCHICAL_TOPIC_DELIMITER = cm.getStringProperty("jms.TopicHierarchyDelimiter", "/"); 167 HIERARCHICAL_TOPIC_ANY_ESCAPE = escapeWildcards(HIERARCHICAL_TOPIC_ANY); 168 HIERARCHICAL_TOPIC_CURRENT_ESCAPE = escapeWildcards(HIERARCHICAL_TOPIC_CURRENT); 169 170 } 171 172 private static String escapeWildcards(String wildCard) { 175 int length = metaChars.length(); 176 StringTokenizer st = new StringTokenizer (metaChars, ","); 177 while (st.hasMoreTokens()) { 178 String metaChar = st.nextToken(); 179 if (metaChar.equals("\\")) { 180 wildCard = wildCard.replaceAll("\\\\", "\\\\\\\\"); 181 } 182 else { 183 wildCard = wildCard.replaceAll("\\"+metaChar, "\\\\"+metaChar); 184 } 185 } 186 return wildCard; 187 } 188 189 private static boolean fileOrFolderExists(String fileOrFolderName) { 191 File f = new File (fileOrFolderName); 192 return f.exists(); 193 } 194 195 public static final boolean isWildCardTopic(String topicName){ 197 return (topicName.indexOf(HIERARCHICAL_TOPIC_CURRENT)!= -1 || 198 topicName.indexOf(HIERARCHICAL_TOPIC_ANY)!= -1); 199 } 200 201 private static boolean isLegalWildcardUsage(String topicName) { 204 return isLegalWildcardUsage(topicName, HIERARCHICAL_TOPIC_CURRENT_ESCAPE) && 205 isLegalWildcardUsage(topicName, HIERARCHICAL_TOPIC_ANY_ESCAPE); 206 } 207 208 private static boolean isLegalWildcardUsage(String topicName, String wildcard) { 211 String patternLeft = wildcardMisuseLeft1+wildcard+wildcardMisuseLeft2; 212 String patternRight = wildcardMisuseRight1+wildcard+wildcardMisuseRight2; 213 return !topicName.matches(patternLeft) && !topicName.matches(patternRight); 214 } 215 216 private void reportErrors() { 218 if (!log.isErrorEnabled()) 219 return; 220 221 boolean reportWildcardsEqual = HIERARCHICAL_TOPIC_ANY.equals(HIERARCHICAL_TOPIC_CURRENT); 222 boolean reportDelimiterEqualsCurrent = HIERARCHICAL_TOPIC_DELIMITER.equals(HIERARCHICAL_TOPIC_CURRENT); 223 boolean reportDelimiterEqualsAny = HIERARCHICAL_TOPIC_DELIMITER.equals(HIERARCHICAL_TOPIC_ANY); 224 if (reportWildcardsEqual) 225 log.error("Topic Hierarchy: The wildcards for both lateral and recursive inclusion are the same: "+HIERARCHICAL_TOPIC_CURRENT); 226 if (reportDelimiterEqualsCurrent) 227 log.error("Topic Hierarchy: The delimiter and the wildcard for lateral inclusion are the same: "+HIERARCHICAL_TOPIC_CURRENT); 228 if (reportDelimiterEqualsAny) 229 log.error("Topic Hierarchy: The delimiter and the wildcard for recursive inclusion are the same: "+HIERARCHICAL_TOPIC_ANY); 230 } 231 232 233 237 public TopicService getTopicService(String topicName) { 238 TopicService result = null; 239 result = (TopicService)topicServices.get(topicName); 241 if (result != null) 242 return result; 243 244 result = (TopicService)hierarchicalTopicServices.get(topicName); 246 if (result != null) 247 return result; 248 249 if (!topicName.startsWith(HIERARCHICAL_TOPIC_DELIMITER)) { 250 return createNonHierarchyTopic(topicName); 253 } 254 255 if (reportErrors) { 259 reportErrors(); 260 reportErrors = false; 261 } 262 263 if (isWildCardTopic(topicName)) { 265 if (!isLegalWildcardUsage(topicName)) { 266 if (log.isErrorEnabled()) { 267 log.error("Illegal use of topic hierarchy wildcard while adding a topic. Hierarchy="+topicName); 268 } 269 } 270 271 return null; 273 } 274 return createHierarchyTopic(topicName); 277 } 278 279 private TopicService createNonHierarchyTopic(String topicName) { 281 WorldModeler world = MantaAgent.getInstance(). 282 getSingletonRepository().getWorldModeler(); 283 TopicService service = (TopicService)world.getService(world.getDefaultDomainName(), 284 topicName, 285 MantaService.SERVICE_TYPE_TOPIC); 286 if (service != null) { 287 topicServices.put(topicName, service); 288 if (log.isDebugEnabled()) { 289 log.debug("Added topic "+topicName); 290 } 291 } 292 return service; 293 } 294 295 private TopicService createHierarchyTopic(String topicName) { 297 WorldModeler world = MantaAgent.getInstance(). 298 getSingletonRepository().getWorldModeler(); 299 TopicService service = (TopicService)world.getService(world.getDefaultDomainName(), 300 topicName, 301 MantaService.SERVICE_TYPE_TOPIC); 302 if (service != null) { 303 hierarchicalTopicServices.put(topicName, service); 304 registerHierarchyConsumers(service); 305 if (log.isDebugEnabled()) { 306 log.debug("Added topic "+topicName); 307 } 308 } 309 return service; 310 } 311 312 private void registerHierarchyConsumers(TopicService service) { 314 ServiceConsumer subscriber; 315 String subscribedTo, pattern; 316 317 String serviceName = service.getServiceName(); 318 initHierarchialTopicSubscribers(); 319 synchronized (hierarchicalTopicSubscribers) { 320 Iterator subscribers = hierarchicalTopicSubscribers.keySet().iterator(); 321 while (subscribers.hasNext()) { 322 323 subscriber = (ServiceConsumer) hierarchicalTopicSubscribers.get(subscribers.next()); 324 subscribedTo = subscriber.getServiceName(); 325 pattern = StringUtils.replace(subscribedTo, 326 HIERARCHICAL_TOPIC_ANY, 327 regexpAny); 328 pattern = StringUtils.replace(pattern, 329 HIERARCHICAL_TOPIC_CURRENT, 330 regexpCurrent); 331 if (serviceName.matches(pattern)) { 332 service.addConsumer(subscriber); 333 if (subscriber.getAgentName().equals(MantaAgent.getInstance().getAgentName())) { 335 MantaAgent.getInstance().getSingletonRepository(). 336 getWorldModeler().addConsumedServices(service); 337 } 338 } 339 } } 341 } 342 343 348 private void restoreHierarchies(String wildCardTopicName) { 349 String topicNames[] = PersistentManagerFactory.getAllServices(); 350 for (int i = 0 ; i < topicNames.length; i++) { 351 if (topicNames[i].matches(wildCardTopicName)) { 352 this.getTopicService(topicNames[i]); 353 } 354 } 355 } 356 357 362 public synchronized void addConsumer(ServiceConsumer consumer){ 363 String serviceName = consumer.getServiceName(); 364 TopicService service; 365 366 service = this.getTopicService(serviceName); 367 if (service != null) { 368 service.addConsumer(consumer); 369 if (consumer.getAgentName().equals(MantaAgent.getInstance().getAgentName())) { 371 MantaAgent.getInstance().getSingletonRepository(). 372 getWorldModeler().addConsumedServices(service); 373 } 374 return; 375 } 376 377 String pattern; 381 pattern = StringUtils.replace(serviceName, 382 HIERARCHICAL_TOPIC_ANY, 383 regexpAny); 384 pattern = StringUtils.replace(pattern, 385 HIERARCHICAL_TOPIC_CURRENT, 386 regexpCurrent); 387 Map hierarchicalServices = null; 388 restoreHierarchies(pattern); 389 synchronized (hierarchicalTopicServices) { 390 hierarchicalServices = new HashMap (hierarchicalTopicServices); 391 } 392 Iterator topics = hierarchicalServices.keySet().iterator(); 393 while (topics.hasNext()) { 394 service = (TopicService)hierarchicalServices.get(topics.next()); 395 if (service.getServiceName().matches(pattern)) { 396 service.addConsumer(consumer); 397 if (consumer.getAgentName().equals(MantaAgent.getInstance().getAgentName())) { 399 MantaAgent.getInstance().getSingletonRepository(). 400 getWorldModeler().addConsumedServices(service); 401 } 402 } 403 } 404 initHierarchialTopicSubscribers(); 405 hierarchicalTopicSubscribers.put(consumer.getId(),consumer,consumer.isDurable()); 406 } 408 private void initHierarchialTopicSubscribers() { 409 if (hierarchicalTopicSubscribers == null) { 410 synchronized(hierarchicalSynchObj) { 411 if (hierarchicalTopicSubscribers == null) { 412 hierarchicalTopicSubscribers = new PersistentMap("wildcard_map", false, true); 413 } 414 } 415 } 416 } 417 418 422 public synchronized void removeProducer(ServiceProducer producer){ 424 String serviceName = producer.getServiceName(); 425 TopicService service; 426 427 service = (TopicService)topicServices.get(serviceName); 429 if (service != null) { 430 service.removeProducer(producer); 431 if (producer.getAgentName().equals(MantaAgent.getInstance().getAgentName()) && 433 service.getProducersByAgentId(MantaAgent.getInstance().getAgentName()).isEmpty()) { 434 MantaAgent.getInstance().getSingletonRepository(). 435 getWorldModeler().removeProducedService(service); 436 } 437 return; 438 } 439 440 service = (TopicService)hierarchicalTopicServices.get(serviceName); 442 if (service != null) { 443 service.removeProducer(producer); 444 if (producer.getAgentName().equals(MantaAgent.getInstance().getAgentName()) && 446 service.getProducersByAgentId(MantaAgent.getInstance().getAgentName()).isEmpty()) { 447 MantaAgent.getInstance().getSingletonRepository(). 448 getWorldModeler().removeProducedService(service); 449 } 450 return; 451 } 452 } 453 454 457 public synchronized void removeConsumer(ServiceConsumer consumer){ 458 String serviceName = consumer.getServiceName(); 459 TopicService service; 460 461 service = (TopicService)topicServices.get(serviceName); 463 if (service != null) { 464 service.removeConsumer(consumer); 465 if (consumer.getAgentName().equals(MantaAgent.getInstance().getAgentName()) && 467 service.getConsumersByAgentId(MantaAgent.getInstance().getAgentName()).isEmpty()) { 468 MantaAgent.getInstance().getSingletonRepository(). 469 getWorldModeler().removeConsumedServices(service); 470 } 471 return; 472 } 473 474 service = (TopicService)hierarchicalTopicServices.get(serviceName); 476 if (service != null) { 477 service.removeConsumer(consumer); 478 if (consumer.getAgentName().equals(MantaAgent.getInstance().getAgentName()) && 480 service.getConsumersByAgentId(MantaAgent.getInstance().getAgentName()).isEmpty()) { 481 MantaAgent.getInstance().getSingletonRepository(). 482 getWorldModeler().removeConsumedServices(service); 483 } 484 return; 485 } 486 initHierarchialTopicSubscribers(); 487 if (hierarchicalTopicSubscribers.remove(consumer.getId()) != null) { 488 String pattern; 491 pattern = StringUtils.replace(serviceName, 492 HIERARCHICAL_TOPIC_ANY, 493 regexpAny); 494 pattern = StringUtils.replace(pattern, 495 HIERARCHICAL_TOPIC_CURRENT, 496 regexpCurrent); 497 synchronized (hierarchicalTopicServices) { 498 Iterator topics = hierarchicalTopicServices.keySet().iterator(); 499 while (topics.hasNext()) { 500 service = (TopicService)hierarchicalTopicServices.get(topics.next()); 501 if (service.getServiceName().matches(pattern)) { 502 service.removeConsumer(consumer); 503 if (consumer.getAgentName().equals(MantaAgent.getInstance().getAgentName()) && 505 service.getConsumersByAgentId(MantaAgent.getInstance().getAgentName()).isEmpty()) { 506 MantaAgent.getInstance().getSingletonRepository(). 507 getWorldModeler().removeConsumedServices(service); 508 } 509 } 510 } 511 } 512 } 513 } 514 515 516 public boolean hasTopic(String topic){ 518 return topicServices.get(topic) != null || 519 hierarchicalTopicServices.get(topic) != null; 520 521 } 522 523 public void closeTopic(String service) { 525 Object removed = topicServices.remove(service); 526 if (removed == null) { 527 removed = hierarchicalTopicServices.remove(service); 528 } 529 if (removed != null && log.isDebugEnabled()) { 530 log.debug("Deleted topic "+service); 531 } 532 } 533 534 public void publish(String serviceName, 536 MantaBusMessage message, 537 ServiceProducer producer, 538 byte deliveryMode, 539 byte priority, 540 long expiration) throws IOException { 541 getTopicService(serviceName).publish(message,producer, 542 deliveryMode, 543 priority, 544 expiration); 545 } 546 547 public void removeDurableConsumer(String serviceName, ServiceConsumer consumer) { 549 if (hasTopic(serviceName)) { 550 TopicService service = getTopicService(serviceName); 551 service.removeDurableConsumer(consumer); 552 } 553 else { 554 if (log.isDebugEnabled()) { 555 log.debug("Removing a durable consumer faild. Service not found: "+serviceName); 556 } 557 } 558 } 559 } 560 | Popular Tags |