1 46 47 package org.mr.kernel.services; 48 49 50 import java.util.ArrayList ; 51 import java.util.Collections ; 52 import java.util.HashMap ; 53 import java.util.List ; 54 55 import org.mr.MantaAgent; 56 import org.mr.MantaAgentConstants; 57 import org.apache.commons.logging.LogFactory; 58 import org.mr.core.protocol.MantaBusMessage; 59 import org.mr.core.protocol.MantaBusMessageConsts; 60 import org.mr.core.util.SystemTime; 61 62 71 public abstract class MantaService{ 73 public static final byte SERVICE_TYPE_QUEUE = 1; 74 public static final byte SERVICE_TYPE_TOPIC = 2; 75 76 private boolean serviceUpdated = false; 77 78 protected String logicalName; 79 80 private byte persistentMode = MantaAgentConstants.AGENT_DEFAULT_DELIVERY_MODE; 82 private boolean blocking = MantaAgentConstants.DEFAULT_PERSISTENCY_BLOCKING_MODE; 83 84 protected ArrayList consumers = new ArrayList (); 85 protected ArrayList producers = new ArrayList (); 86 87 protected HashMap serviceActorMap = new HashMap (); 88 90 94 public MantaService(String serviceName ){ 95 this.logicalName = serviceName; 96 } 98 99 102 public String getServiceName() { 103 return logicalName; 104 } 105 106 107 108 113 public boolean isConsumer(ServiceActor actor){ 114 return consumers.contains(actor); 115 } 116 121 public boolean isProducer(ServiceActor actor){ 122 return producers.contains(actor); 123 } 124 125 128 public List getConsumers() { 129 return new ArrayList (consumers); 130 } 131 132 137 public ArrayList getConsumersByAgentId(String agentId){ 138 ArrayList list = new ArrayList (); 139 int size = consumers.size(); 140 for (int i = 0; i < size; i++) { 141 ServiceConsumer consumer = (ServiceConsumer)consumers.get(i); 142 if(consumer.getAgentName().equals(agentId)) 143 list.add(consumer); 144 } 145 146 return list; 147 } 149 152 public List getProducers() { 153 return Collections.unmodifiableList(producers) ; 154 } 155 156 161 public ArrayList getProducersByAgentId(String agentId){ 162 synchronized(producers){ 163 ArrayList list = new ArrayList (); 164 int size = producers.size(); 165 for (int i = 0; i < size; i++) { 166 ServiceProducer producer = (ServiceProducer)producers.get(i); 167 if(producer.getAgentName().equals(agentId)) 168 list.add(producer); 169 } 170 171 return list; 172 } 173 174 } 176 180 public ServiceActor getActor(String actorId){ 181 return (ServiceActor)serviceActorMap.get(actorId); 182 } 183 184 188 public void addProducer(ServiceProducer producer){ 189 if (serviceActorMap.get(producer.getId()) != null) { 190 return; 191 } 192 synchronized(producers){ 193 producers.add(producer); 194 } 195 serviceActorMap.put(producer.getId() ,producer ); 196 } 197 198 199 203 public void removeProducer(ServiceProducer producer){ 204 205 synchronized(producers){ 206 producers.remove(producer); 207 } 208 serviceActorMap.remove(producer.getId()); 209 } 210 211 215 public void addConsumer(ServiceConsumer consumer){ 216 if(consumer.isDurable()){ 219 ServiceConsumer old = (ServiceConsumer) serviceActorMap.get(consumer.getId()); 220 if(old != null){ 221 consumers.remove(old); 222 serviceActorMap.remove(old.getId()); 223 } 224 } 225 226 if(serviceActorMap.get(consumer.getId())!= null){ 227 if(LogFactory.getLog("MantaService").isInfoEnabled()){ 228 LogFactory.getLog("MantaService").info("Consumer with the same id is already registered to this service consumer id ="+consumer.getId()+"."); 229 } 230 return; 231 } 232 synchronized(consumers){ 233 consumers.add(consumer); 234 } 235 236 serviceActorMap.put(consumer.getId() ,consumer ); 237 238 ServiceActorControlCenter.addUpConsumer(consumer); 239 240 } 241 242 246 public void removeConsumer(ServiceConsumer consumer){ 247 synchronized (consumers) { 248 consumers.remove(consumer); 249 } 250 serviceActorMap.remove(consumer.getId()); 251 ServiceActorControlCenter.removeUpConsumer(consumer); 252 253 } 254 255 256 257 258 259 260 261 264 public abstract byte getServiceType(); 265 266 267 268 public String toString(){ 269 StringBuffer buff = new StringBuffer (); 270 buff.append(" service{"); 271 buff.append(" service name="); 272 buff.append(logicalName); 273 buff.append(" serviceType="); 274 buff.append(getServiceType()); 275 buff.append(" consumers="); 276 buff.append(consumers); 277 buff.append(" producers="); 278 buff.append(producers); 279 buff.append(" persistentMode="); 280 buff.append(persistentMode); 281 buff.append(" }"); 282 return buff.toString(); 283 } 284 285 288 public byte getPersistentMode() { 289 return persistentMode; 290 } 291 292 295 public void setPersistentMode(byte deliveryMode) { 296 this.persistentMode = deliveryMode; 297 } 298 299 303 public boolean isServiceUpdated() { 304 return serviceUpdated; 305 } 306 307 310 public void setServiceUpdated(boolean serviceUpdated) { 311 this.serviceUpdated = serviceUpdated; 312 } 313 314 318 public boolean isBlocking() { 319 return blocking; 320 } 321 322 326 public void setBlocking(boolean blocking) { 327 this.blocking = blocking; 328 } 329 330 336 public boolean checkValidMessage(MantaBusMessage msg ,ServiceConsumer consumer){ 337 String payloadType =msg.getHeader(MantaBusMessageConsts.HEADER_NAME_PAYLOAD_TYPE); 341 SelectorsManager manager = MantaAgent.getInstance().getSingletonRepository().getSelectorsManager(); 342 PayLoadSelector select = manager.getSelector(payloadType); 343 boolean validTime = msg.getValidUntil() > SystemTime.gmtCurrentTimeMillis(); 345 if(!validTime){ 346 if(LogFactory.getLog("MantaService").isInfoEnabled()){ 347 LogFactory.getLog("MantaService").info("Not sending message "+msg +" msg.getValidUntil()=" +msg.getValidUntil()+ " SystemTime.gmtCurrentTimeMillis()=" +SystemTime.gmtCurrentTimeMillis()+"."); 348 } 349 } 350 boolean validSelect = select == null ||select.accept(consumer.getSelectorStatment() , msg); 352 353 return validTime && validSelect; 354 } 356 } 357
| Popular Tags
|