1 46 50 package org.mr.kernel.services.topics; 51 52 import java.io.IOException ; 53 import java.util.ArrayList ; 54 import java.util.Iterator ; 55 import java.util.List ; 56 57 import org.apache.commons.logging.Log; 58 import org.apache.commons.logging.LogFactory; 59 60 import org.mr.MantaAgent; 61 import org.mr.MantaAgentConstants; 62 import org.mr.MantaException; 63 import org.mr.kernel.delivery.PostOffice; 64 import org.mr.kernel.delivery.PostOfficeBox; 65 import org.mr.kernel.services.MantaService; 66 import org.mr.kernel.services.PayLoadSelector; 67 import org.mr.kernel.services.SelectorsManager; 68 import org.mr.kernel.services.ServiceActorControlCenter; 69 import org.mr.kernel.services.ServiceConsumer; 70 import org.mr.kernel.services.ServiceProducer; 71 import org.mr.core.configuration.ConfigManager; 72 import org.mr.core.persistent.PersistentMap; 73 import org.mr.core.persistent.PersistentManager; 74 import org.mr.core.protocol.MantaBusMessage; 75 import org.mr.core.protocol.MantaBusMessageConsts; 76 import org.mr.core.protocol.RecipientAddress; 77 import org.mr.core.util.StringUtils; 78 79 89 class TopicService extends MantaService implements TopicServiceMBean{ 90 91 private static String HIERARCHY_DELIMITER = "~"; 92 PersistentMap subscribers; 93 protected Log log; 94 private boolean pause = false; 95 96 static { 97 ConfigManager config = MantaAgent.getInstance().getSingletonRepository().getConfigManager(); 98 HIERARCHY_DELIMITER = config.getStringProperty("persistency.hierarchy_delimiter", HIERARCHY_DELIMITER); 99 } 100 105 public TopicService(String serviceName) { 106 super(serviceName); 107 log=LogFactory.getLog("Topic:"+serviceName); 108 subscribers = new PersistentMap(PersistentManager.SUBSCRIBERS_PERSISTENT_PREFIX + cleanupServiceName(serviceName), false, true); 109 ArrayList subscribersList = new ArrayList (); 110 synchronized(subscribers){ 111 subscribersList.addAll(subscribers.values()); 112 } 113 int size = subscribersList.size(); 114 for (int i = 0; i < size; i++) { 115 ServiceConsumer durable = (ServiceConsumer) subscribersList.get(i); 116 consumers.add(durable); 117 serviceActorMap.put(durable.getId() ,durable ); 118 } 119 try { 120 if(!serviceName.startsWith(VirtualTopicManager.HIERARCHICAL_TOPIC_DELIMITER)){ 121 MantaAgent.getInstance().getSingletonRepository().getMantaJMXManagment().addManagedObject(this, "MantaRay:topic="+this.getServiceName()); 122 } 123 124 } catch (MantaException e) { 125 if(log.isErrorEnabled()){ 126 log.error("Could not create the JMX MBean.",e); 127 } 128 } 129 } 130 131 132 private String cleanupServiceName(String serviceName){ 133 String result = serviceName; 134 result = StringUtils.replace(result,VirtualTopicManager.HIERARCHICAL_TOPIC_DELIMITER, HIERARCHY_DELIMITER); 135 return result; 136 } 137 138 143 public synchronized void pause(){ 144 if(pause ==false){ 145 PostOffice po= MantaAgent.getInstance().getSingletonRepository() 146 .getPostOffice(); 147 List consumers = this.getConsumers(); 148 Iterator consumersIter = consumers.iterator(); 149 while(consumersIter.hasNext()){ 150 ServiceConsumer sub = (ServiceConsumer) consumersIter.next(); 151 PostOfficeBox pob = po.getPostOfficeBox(sub.getId()); 152 pob.pause(); 153 } 154 pause = true; 155 } 156 157 } 158 159 162 public boolean isPaused(){ 163 return pause; 164 } 165 168 public synchronized void resume(){ 169 if(pause !=false){ 170 PostOffice po= MantaAgent.getInstance().getSingletonRepository() 171 .getPostOffice(); 172 List consumers = this.getConsumers(); 173 Iterator consumersIter = consumers.iterator(); 174 while(consumersIter.hasNext()){ 175 ServiceConsumer sub = (ServiceConsumer) consumersIter.next(); 176 PostOfficeBox pob = po.getPostOfficeBox(sub.getId()); 177 pob.resume(); 178 } 179 pause = false; 180 } 181 182 } 183 184 188 public void purge(){ 189 PostOffice po= MantaAgent.getInstance().getSingletonRepository() 190 .getPostOffice(); 191 List consumers = this.getConsumers(); 192 Iterator consumersIter = consumers.iterator(); 193 while(consumersIter.hasNext()){ 194 ServiceConsumer sub = (ServiceConsumer) consumersIter.next(); 195 PostOfficeBox pob = po.getPostOfficeBox(sub.getId()); 196 pob.purge(); 197 } 198 } 199 200 201 202 205 public void addConsumer(ServiceConsumer consumer){ 206 207 ServiceConsumer s = (ServiceConsumer)subscribers.get(consumer.getId()); 208 if (s != null) { 209 if ( !s.getServiceName().equals(consumer.getServiceName()) || 211 !checkEqual(s.getSelectorStatment(), consumer.getSelectorStatment())) { 212 subscribers.remove(consumer.getId()); 213 super.removeConsumer(consumer); 214 } 215 } 216 subscribers.put(consumer.getId(), consumer, consumer.isDurable()); 217 super.addConsumer(consumer); 218 } 219 220 private boolean checkEqual(Object o1, Object o2) { 221 if (o1 == null) { 222 return o2 == null; 223 } 224 if (o2 != null) { 225 return o1.equals(o2); 226 } 227 return false; 228 } 229 230 233 public void removeConsumer(ServiceConsumer consumer){ 234 if(!consumer.isDurable()){ 235 subscribers.remove(consumer.getId()); 236 super.removeConsumer(consumer); 237 }else{ 238 ServiceActorControlCenter.removeUpConsumer(consumer); 239 } 240 241 } 242 243 246 public byte getServiceType() { 247 return MantaService.SERVICE_TYPE_TOPIC; 248 } 249 250 260 public void publish(MantaBusMessage message, ServiceProducer producer, byte deliveryMode, byte priority, long expiration) throws IOException { 261 262 if(log.isDebugEnabled()){ 263 log.debug("Message arrived. Message ID="+message.getMessageId()); 264 } 265 266 List currentConsumers = new ArrayList (); 267 synchronized (this.getConsumers()) { 268 currentConsumers.addAll(this.getConsumers()); 269 } 270 int size = currentConsumers.size(); 271 if(size == 0 ){ 273 if(log.isDebugEnabled()){ 274 log.debug("No consumer found for message "+message.getMessageId()+". The message was not sent"); 275 } 276 return; 277 } 278 boolean sentToConsumer =true; 280 SelectorsManager manager = MantaAgent.getInstance().getSingletonRepository().getSelectorsManager(); 281 String payloadType =message.getHeader(MantaBusMessageConsts.HEADER_NAME_PAYLOAD_TYPE); 282 PayLoadSelector select = manager.getSelector(payloadType); 283 284 for (int i = 0; i < size; i++) { 286 sentToConsumer =true; 287 ServiceConsumer consumer = (ServiceConsumer) currentConsumers.get(i); 288 if(consumer == null){ 289 continue; 290 } 291 if(select!= null){ 293 sentToConsumer =select.accept(consumer.getSelectorStatment() ,message ); 296 } 297 if(sentToConsumer){ 299 MantaBusMessage copy; 300 if(size ==1){ 301 copy = message; 302 }else{ 303 copy = PostOffice.prepareMessageShallowCopy(message); 304 } 305 RecipientAddress adderss = consumer; 306 copy.setRecipient(adderss); 307 copy.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION ,consumer.getId() ); 308 if(this.getPersistentMode() == MantaAgentConstants.PERSISTENT) 310 deliveryMode = MantaAgentConstants.PERSISTENT; 311 312 if(log.isDebugEnabled()){ 313 log.debug("Sending message "+copy.getMessageId()+" to consumer "+consumer.getId()); 314 } 315 MantaAgent.getInstance().send(copy,producer , deliveryMode , priority, expiration ); 317 } 318 } 319 320 321 } 323 324 325 329 public void removeProducer(ServiceProducer producer){ 330 super.removeProducer(producer); 331 } 332 333 334 public void removeDurableConsumer(ServiceConsumer consumer) { 335 336 subscribers.remove(consumer.getId()); 337 super.removeConsumer(consumer); 338 ServiceActorControlCenter.removeUpConsumer(consumer); 339 MantaAgent.getInstance().getSingletonRepository().getPostOffice().closeBox(consumer); 340 } 341 342 } 343 | Popular Tags |