1 46 51 package org.mr.kernel.delivery; 52 53 54 import java.io.IOException ; 55 import java.nio.ByteBuffer ; 56 import java.security.MessageDigest ; 57 58 import java.util.HashMap ; 59 import java.util.Iterator ; 60 61 62 import org.mr.MantaAgent; 63 import org.apache.commons.logging.Log; 64 import org.apache.commons.logging.LogFactory; 65 import org.mr.core.net.MantaAddress; 66 import org.mr.core.protocol.MantaBusMessage; 67 import org.mr.core.protocol.MessageTransformer; 68 import org.mr.core.protocol.PayloadContainer; 69 import org.mr.core.protocol.RecipientAddress; 70 import org.mr.core.util.byteable.Byteable; 71 import org.mr.core.util.byteable.IncomingByteBufferPool; 72 import org.mr.core.configuration.ConfigManager; 73 import org.mr.kernel.services.ServiceActor; 74 import org.mr.kernel.services.ServiceActorControlCenter; 75 import org.mr.kernel.services.ServiceActorStatusListener; 76 import org.mr.kernel.services.ServiceConsumer; 77 import org.mr.kernel.services.MantaService; 78 import org.mr.kernel.services.queues.QueueMaster; 79 import org.mr.kernel.world.WorldModeler; 80 81 82 87 public class PostOffice implements ServiceActorStatusListener { 88 private static final byte THROTTLE_NONE = 0; 90 private static final byte THROTTLE_QUEUES = 1; 91 private static final byte THROTTLE_TOPICS = 2; 92 private static final byte THROTTLE_ALL = 3; 93 94 private HashMap agentToPOB = new HashMap (); 95 private Log log; 96 private byte throttleDomain; 97 private int throttleHighWaterMark; 98 private int throttleLowWatermark; 99 private long throttleSleep; 100 101 105 public PostOffice(WorldModeler modeler ){ 106 log=LogFactory.getLog("PostOffice"); 107 ServiceActorControlCenter.addConsumerStatusListeners(this); 108 109 ConfigManager config = MantaAgent.getInstance(). 110 getSingletonRepository().getConfigManager(); 111 this.throttleDomain = 112 (byte) config.getIntProperty("delivery.throttle.domain", 2); 113 this.throttleHighWaterMark = 114 config.getIntProperty("delivery.throttle.high-watermark", 100000); 115 this.throttleLowWatermark = 116 config.getIntProperty("delivery.throttle.low-watermark", 90000); 117 this.throttleSleep = 118 (long) config.getIntProperty("delivery.throttle.sleep", 5) * 1000; 119 } 120 125 public PostOfficeBox getPostOfficeBox(String recipientID){ 126 return (PostOfficeBox) agentToPOB.get(recipientID); 127 } 128 129 130 137 public MantaBusMessage gotAck(String ackedMessageId, MantaAddress source){ 138 String id = ((RecipientAddress)source).getId(); 139 PostOfficeBox box = getPostOfficeBox(id); 140 if(box != null) 141 return box.gotAck(ackedMessageId ); 142 return null; 143 144 } 145 146 public MantaBusMessage gotAckReject(String ackedMessageId, MantaAddress source) { 147 String id = ((RecipientAddress)source).getId(); 148 PostOfficeBox box = getPostOfficeBox(id); 149 if(box != null) 150 return box.gotRejectAck(ackedMessageId ); 151 return null; 152 } 153 154 158 public final void SendMessage(MantaBusMessage msg ){ 159 if(log.isDebugEnabled()){ 160 log.debug("Message arrived to post office: "+msg.getMessageId()+"."); 161 } 162 163 if(msg.getRecipient() ==null) 164 throw new IllegalArgumentException ("No recipient was set for this message: "+msg+"."); 165 166 167 PostOfficeBox box= getPostOfficeBox(msg.getRecipient().getId()); 168 if(box == null){ 169 box = handleRecipientAdded(msg.getRecipient(), false, 0, 0, 0); 170 } 171 if(box != null){ 172 box.handleMessage(msg); 173 }else{ 174 if(log.isInfoEnabled()){ 175 log.info("Did not find recipient "+msg.getRecipient().getId()+". Message was not sent: "+msg.getMessageId()+"."); 176 } 177 } 178 } 179 180 181 182 183 188 public final void messageArrived(ByteBuffer buff, byte[] messageMD5, MessageDigest partialMD5) throws IOException { 189 try{ 190 if(log.isDebugEnabled()){ 191 log.debug("Got buffer size "+buff.remaining()+"."); 192 } 193 MantaBusMessage msg = MessageTransformer.fromBuffer(buff); 195 msg.setMessageMD5(messageMD5); 196 msg.setPartialMD5(partialMD5); 197 if(buff!= null && !MantaBusMessage.isLazyParsing()){ 198 IncomingByteBufferPool.getInstance().release(buff); 199 } 200 201 if(log.isDebugEnabled()){ 202 log.debug("Manta message arrived sending to logic layer. msg = "+msg+"."); 203 } 204 MantaAgent.getInstance().getSingletonRepository().getIncomingMessageManager().messageArrived(msg); 206 } catch (IOException e) { 207 if(log.isErrorEnabled()){ 208 log.error("Error in getting message from stream. ",e); 209 } 210 throw e; 211 } 212 213 214 } 215 216 217 218 public static final MantaBusMessage prepareMessageShallowCopy(MantaBusMessage orig) throws IOException { 219 MantaBusMessage msg = MantaBusMessage.getInstance(); 220 msg.setMessageType(orig.getMessageType()); 221 msg.getElements().putAll(orig.getElements()); 222 msg.setMessagesId(orig.getMessageIdAsLong()); 223 msg.setDeliveryMode(orig.getDeliveryMode()); 224 msg.setMessageType(orig.getMessageType()); 225 msg.setPriority(orig.getPriority()); 226 msg.setValidUntil(orig.getValidUntil()); 227 msg.setDeliveryCount(orig.getDeliveryCount()); 228 msg.setSource(orig.getSource()); 229 Byteable payload = orig.getPayloadContainer().getPayloadObject(); 230 msg.setPayloadContainer(new PayloadContainer(payload)); 231 232 msg.setRecipient(orig.getRecipient()); 233 return msg; 234 235 } 236 237 238 239 243 public synchronized PostOfficeBox 244 handleRecipientAdded(RecipientAddress consumer, boolean throttle, 245 int highWatermark, int lowWatermark, 246 long throttleSleep) 247 { 248 PostOfficeBox pob = getPostOfficeBox(consumer.getId()); 249 if(pob == null){ 250 pob = new PostOfficeBox(consumer, throttle, highWatermark, 251 lowWatermark, throttleSleep); 252 agentToPOB.put(consumer.getId() ,pob ); 253 } 254 return pob; 255 256 257 } 258 259 260 263 public void handleConsumerUp(ServiceConsumer consumer) { 264 PostOfficeBox pob = getPostOfficeBox(consumer.getId()); 265 if(pob == null){ 266 boolean throttle = false; 267 byte serviceType = consumer.getServiceType(); 268 if ((this.throttleDomain == THROTTLE_ALL) || 269 (this.throttleDomain == THROTTLE_TOPICS && 270 serviceType == MantaService.SERVICE_TYPE_TOPIC) || 271 (this.throttleDomain == THROTTLE_QUEUES && 272 serviceType == MantaService.SERVICE_TYPE_QUEUE)) { 273 throttle = true; 274 } 275 pob = handleRecipientAdded(consumer, throttle, 276 this.throttleHighWaterMark, 277 this.throttleLowWatermark, 278 this.throttleSleep); 279 }else{ 280 pob.updateConsumer(consumer); 282 } 283 pob.setRecipientOnline(true); 284 285 } 286 287 public void handleConsumerDown(ServiceConsumer consumer) { 288 handleRecipientDown(consumer); 289 290 } 291 292 public void handleCoordinatorDown(QueueMaster oldMaster, 293 QueueMaster newMaster) 294 { 295 if (newMaster != null) { 296 PostOfficeBox pob = getPostOfficeBox(oldMaster.getId()); 297 if (pob != null) { 298 HashMap messages = pob.getSavedMessages(); 299 Iterator i = messages.values().iterator(); 300 while (i.hasNext()) { 301 MantaBusMessage message = (MantaBusMessage) i.next(); 302 message.setRecipient(newMaster); 303 SendMessage(message); 304 } 305 } 306 } 307 handleRecipientDown(oldMaster); 308 } 309 310 public void handleRecipientDown(RecipientAddress recipient){ 311 PostOfficeBox pob = getPostOfficeBox(recipient.getId()); 312 if(pob != null){ 313 pob.handleRecipientDown(); 315 if(pob.durable == false) 316 agentToPOB.remove(recipient.getId()); 317 } 318 } 319 320 324 public void closeBox(ServiceActor actor) { 325 RecipientAddress recipient = (RecipientAddress)actor; 326 PostOfficeBox pob = getPostOfficeBox(recipient.getId()); 327 if(pob != null){ 328 pob.handleRecipientDown(); 329 agentToPOB.remove(recipient.getId()); 330 pob.close(); 331 } 332 } 333 334 335 336 337 338 339 340 } 341 342 369 | Popular Tags |