1 46 51 package org.mr.kernel.delivery; 52 53 54 import java.io.IOException ; 55 import java.util.ArrayList ; 56 import java.util.HashMap ; 57 import java.util.Iterator ; 58 59 import org.apache.commons.logging.Log; 60 import org.apache.commons.logging.LogFactory; 61 import org.mr.MantaAgent; 62 import org.mr.MantaAgentConstants; 63 import org.mr.core.net.AgentStateListener; 64 import org.mr.core.net.NetworkManager; 65 import org.mr.core.persistent.PersistentMap; 66 import org.mr.core.protocol.DeadEndRecipient; 67 import org.mr.core.protocol.MantaBusMessage; 68 import org.mr.core.protocol.MantaBusMessageConsts; 69 import org.mr.core.protocol.MantaBusMessageUtil; 70 import org.mr.core.protocol.RecipientAddress; 71 import org.mr.core.util.PrioritizedList; 72 import org.mr.core.util.SystemTime; 73 import org.mr.kernel.services.DeadLetterHandler; 74 import org.mr.kernel.services.ServiceActorControlCenter; 75 import org.mr.kernel.services.ServiceConsumer; 76 77 78 84 public class PostOfficeBox implements AgentStateListener { 85 NetworkManager manager; 86 RecipientAddress recipient; 88 int agentState = AGENT_STATE_NOT_MONITORING; 90 private boolean recipientOnline = true; 92 93 PersistentMap savedMessages; 95 private boolean localBox = false; 97 public boolean durable = false; 99 private boolean netWorkKeepAliveOn = false; 101 NetworkModerator moderator; 103 private Log log; 105 106 107 private static int DefaultNumberOfMessageInNetBuffer = 100; 108 private boolean notYetRecovered = true; 109 110 private boolean pause = false; 111 private boolean gotRejects = false; 112 private boolean throttle = false; 113 private boolean throttling = false; 114 private int highWatermark; 115 private int lowWatermark; 116 private long throttleSleep; 117 118 121 public PostOfficeBox(RecipientAddress recipient) { 122 123 if(recipient instanceof ServiceConsumer && ((ServiceConsumer)recipient).isDurable()){ 124 durable = true; 125 recipientOnline = ServiceActorControlCenter.isConsumerUp(recipient); 126 } 127 this.recipient = recipient; 128 moderator = new NetworkModerator(recipient.getId() , DefaultNumberOfMessageInNetBuffer); 129 if(MantaAgent.getInstance().getAgentName().equals(recipient.getAgentName())){ 130 localBox = true; 131 } 133 134 manager = MantaAgent.getInstance().getSingletonRepository().getNetworkManager(); 135 log=LogFactory.getLog("PostOfficeBox"); 136 137 } 138 139 public PostOfficeBox(RecipientAddress recipient, boolean throttle, 140 int highWatermark, int lowWatermark, 141 long throttleSleep) 142 { 143 this(recipient); 144 this.throttle = throttle; 145 this.highWatermark = highWatermark; 146 this.lowWatermark = lowWatermark; 147 this.throttleSleep = throttleSleep; 148 } 149 150 private synchronized void initSavedMessages() { 151 if(savedMessages != null) 152 return; 153 savedMessages = new PersistentMap("messages_to_"+recipient.getId() ,true ,true); 154 notYetRecovered = false; 155 } 156 157 162 public synchronized void handleMessage(MantaBusMessage msg) { 163 long now = SystemTime.gmtCurrentTimeMillis() ; 164 if( (msg.getValidUntil() < now) ){ 165 if(log.isDebugEnabled()){ 167 log.info("Not sending message due to TTL expiration: Message ID="+msg.getMessageId()+ 168 ", Expiration Time="+msg.getValidUntil()+",Current Time="+now+"."); 169 } 170 msg.addHeader(MantaBusMessageConsts.HEADER_NAME_SENT_FAIL , MantaBusMessageConsts.HEADER_VALUE_TRUE); 171 DeadLetterHandler.HandleDeadMessage(msg); 172 }else{ 173 save(msg); 175 if(recipientOnline){ 176 sendToNet(msg); 177 } 178 } 179 } 180 181 182 183 184 private void sendToNet(MantaBusMessage msg){ 185 if(!pause){ 186 msg.setRecipient(this.recipient); 187 msg.setDeliveryCount(msg.getDeliveryCount()+1); 188 if(localBox){ 189 if(log.isDebugEnabled()){ 190 log.debug("Sending message "+msg.getMessageId()+" to local recipient."); 191 } 192 MantaAgent.getInstance().getSingletonRepository().getIncomingMessageManager().messageArrived(msg); 193 return; 194 }else{ 195 if(!netWorkKeepAliveOn){ 197 manager.addAgentStateListener(msg.getRecipient().getAgentName() , this); 198 netWorkKeepAliveOn = true; 199 } 200 if(log.isDebugEnabled()){ 202 log.debug("About to send message "+msg.getMessageId()+"."); 203 } 204 try { 205 moderator.sendToNetwork(msg); 206 } catch (Exception e) { 207 if(log.isErrorEnabled()){ 208 log.error("An error occured while trying to send message "+msg.getMessageId()+"." ,e); 209 } 210 } } 212 } 213 214 } 216 220 private void save(MantaBusMessage msg){ 221 if ((msg.getRecipient().getAcknowledgeMode() == 223 MantaAgentConstants.NO_ACK) || 224 msg.isRerouted()) { 225 return; 226 } 227 byte delivery = msg.getDeliveryMode(); 229 boolean persistent = (delivery == MantaAgentConstants.PERSISTENT); 230 initSavedMessages(); 231 if(!durable){ 233 persistent = false; 234 } 235 236 if(log.isDebugEnabled()){ 237 log.debug("Ack required. Saving Manta message "+msg.getMessageId()+"."); 238 } 239 240 synchronized(savedMessages){ 241 savedMessages.put(msg.getMessageId() , msg ,persistent ); 242 } 243 if (throttle) { 248 int backlog = savedMessages.size(); 249 if (backlog > this.highWatermark) { 250 this.throttling = true; 251 } 252 if (throttling) { 253 if (backlog < this.lowWatermark) { 254 this.throttling = false; 255 } else { 256 try { 257 Thread.sleep(this.throttleSleep); 258 } catch (InterruptedException e) {} 259 } 260 } 261 } 262 } 263 264 268 public MantaBusMessage gotAck(String messageId){ 269 initSavedMessages(); 270 MantaBusMessage msg =(MantaBusMessage) savedMessages.get(messageId); 271 272 if(msg != null){ 273 synchronized (msg) { 274 if(log.isDebugEnabled()){ 275 log.debug("Got ack for message id " + messageId + " from " + 276 msg.getRecipient() + 277 ". Removing message from saved messages list."); 278 } 279 280 synchronized(savedMessages){ 282 savedMessages.remove(messageId); 283 } 284 } } 287 return msg; 288 289 } 291 public MantaBusMessage gotRejectAck(String messageId) { 292 293 initSavedMessages(); 294 MantaBusMessage msg =(MantaBusMessage) savedMessages.get(messageId); 295 296 if(msg != null){ 297 synchronized (msg) { 298 if(durable){ 299 if(log.isDebugEnabled()){ 300 log.debug("Got reject ack for message id " + messageId + " from " + 301 msg.getRecipient() + 302 ". This is a durable recipient so we keep the message "); 303 } 304 this.gotRejects = true; 305 }else{ 306 if(log.isDebugEnabled()){ 307 log.debug("Got reject ack for message id " + messageId + " from " + 308 msg.getRecipient() + 309 ". Removing message from saved messages list."); 310 } 311 synchronized(savedMessages){ 313 savedMessages.remove(messageId); 314 } 315 } 316 317 318 319 } } 322 return msg; 323 } 324 325 326 private boolean checkMessageMatchConsumer() { 330 synchronized(savedMessages){ 331 Iterator sentIter = savedMessages.values().iterator(); 332 MantaBusMessage orig =(MantaBusMessage) sentIter.next(); 333 ServiceConsumer messageRecipient = (ServiceConsumer)orig.getRecipient(); 334 ServiceConsumer boxRecipient = (ServiceConsumer)this.recipient; 335 if ( !messageRecipient.getServiceName().equals(boxRecipient.getServiceName()) || 337 !checkEqual(messageRecipient.getSelectorStatment(), boxRecipient.getSelectorStatment())) { 338 return false; 339 } 340 } 341 return true; 342 } 343 344 private boolean checkEqual(Object o1, Object o2) { 345 if (o1 == null) { 347 o1 = ""; 348 } 349 if (o2 == null) { 350 o2 = ""; 351 } 352 return o1.equals(o2); 353 } 361 362 365 public synchronized void recoverBox() { 366 initSavedMessages(); 367 if(savedMessages.isEmpty()){ 368 return; 369 } 370 371 ArrayList tempList = new ArrayList (savedMessages.size()); 372 synchronized(savedMessages){ 373 if (durable) { 376 if (!checkMessageMatchConsumer()) { 377 if (log.isInfoEnabled()) { 378 ServiceConsumer boxRecipient = (ServiceConsumer)this.recipient; 379 log.info("Durable subscription '"+boxRecipient.getId()+"' was changed. Deleting old subscription's messages."); 380 } 381 savedMessages.removeAll(); 382 return; 383 } 384 } 385 386 Iterator sentIter = savedMessages.values().iterator(); 389 while(sentIter.hasNext()){ 390 try { 391 MantaBusMessage orig =(MantaBusMessage) sentIter.next(); 392 if(orig.getDeliveryCount()>0){ 393 orig = PostOffice.prepareMessageShallowCopy(orig); 394 } 395 tempList.add(orig); 396 } catch (IOException e) { 397 log.error("Resending Messages: An error occured during message recovering process.",e); 398 } 399 } 400 } 401 402 MantaBusMessageUtil.sortMessagesBySendTime(tempList); 404 PrioritizedList resendTempList = new PrioritizedList(MantaAgentConstants.TOTAL_PRIORITIES); 405 resendTempList.addAll(tempList); 406 407 MantaBusMessage msg; 409 long now; 410 Iterator it = resendTempList.iterator(); 411 while (it.hasNext()) { 412 msg = (MantaBusMessage)it.next(); 413 now = SystemTime.gmtCurrentTimeMillis() ; 414 if (msg.getValidUntil() < now) { 415 if(log.isInfoEnabled()){ 417 log.info("Resending Messages: Not resending message due to TTL expiration: Message ID="+ 418 msg.getMessageId()+", Expiration Time=" +msg.getValidUntil()+", Current Time="+now+"."); 419 } 420 msg.addHeader(MantaBusMessageConsts.HEADER_NAME_SENT_FAIL , MantaBusMessageConsts.HEADER_VALUE_TRUE); 421 synchronized(msg){ 422 msg.notifyAll(); 423 } 424 savedMessages.remove(msg.getMessageId()); 425 DeadLetterHandler.HandleDeadMessage(msg); 426 427 } else{ 429 if(log.isDebugEnabled() && msg.getDeliveryCount() >=1 ){ 433 log.debug("Resending message "+msg.getMessageId()+"."); 434 } 435 437 sendToNet(msg); 441 } 442 this.gotRejects = false; 444 } } 446 447 448 451 public synchronized void agentStateChanged(String agent, int state) { 452 453 if(state == AGENT_STATE_DOWN && agentState != AGENT_STATE_DOWN){ 454 int oldState = agentState; 455 agentState = AGENT_STATE_DOWN; 456 if(oldState == AGENT_STATE_UP && log.isInfoEnabled()){ 458 log.info("Got agent down event from peer "+agent+" (box is "+recipient+")"); 459 } 460 setRecipientOnline(false); 461 }else if(state == AGENT_STATE_UP && agentState != AGENT_STATE_UP){ 462 463 agentState = AGENT_STATE_UP; 465 if(!durable || ServiceActorControlCenter.isConsumerUp(recipient)){ 466 setRecipientOnline(true); 467 } 468 if(log.isInfoEnabled()){ 469 log.info("Got agent up event from peer "+agent+" (box is "+recipient+")"); 470 } 471 } 472 473 } 474 475 478 public NetworkModerator getModerator() { 479 return moderator; 480 } 481 482 485 public void close() { 486 manager.removeAgentStateListener(recipient.getAgentName(), this); 487 netWorkKeepAliveOn = false; 488 489 } 490 491 public boolean isRecipientOnline() { 492 return recipientOnline; 493 } 494 495 synchronized void setRecipientOnline(boolean newStatus) { 496 if(newStatus == true){ 497 if(notYetRecovered == true ||recipientOnline == false || gotRejects == true ){ 499 recoverBox(); 500 } 501 }else{ 502 if(recipientOnline == true ){ 504 moderator.clear(); 505 } 506 if(recipient instanceof DeadEndRecipient){ 507 MantaAgent.getInstance().getSingletonRepository().getPostOffice().handleRecipientDown(recipient); 508 } 509 } 510 511 recipientOnline = newStatus; 512 } 513 514 518 void handleRecipientDown(){ 519 moderator.clear(); 520 if(durable == false){ 521 if(savedMessages != null){ 522 savedMessages.clear(); 523 } 524 } 525 if(netWorkKeepAliveOn && !localBox ){ 526 manager.removeAgentStateListener(recipient.getAgentName() , this); 527 netWorkKeepAliveOn = false; 528 } 529 recipientOnline = false; 530 } 531 532 533 536 synchronized void updateConsumer(ServiceConsumer consumer) { 537 if(consumer.isDurable()){ 538 if(MantaAgent.getInstance().getAgentName().equals(consumer.getAgentName())){ 539 localBox = true; 540 }else{ 541 localBox = false; 542 } 543 this.recipient = consumer; 544 } 545 } 546 547 public synchronized void pause() { 548 pause = true; 549 550 } 551 552 public synchronized void resume() { 553 pause = false; 554 recoverBox(); 555 556 } 557 558 public synchronized void purge() { 559 savedMessages.clear(); 560 561 } 562 563 public HashMap getSavedMessages() { 564 HashMap result; 565 synchronized(savedMessages){ 566 result = new HashMap (savedMessages); 567 } 568 return result; 569 } 570 571 572 } 573 | Popular Tags |