KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > kernel > delivery > PostOfficeBox


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Amir Shevat.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 /*
47  * Created on 31/05/2004
48  *
49  * Manta LTD
50  */

51 package org.mr.kernel.delivery;
52
53
54 import java.io.IOException JavaDoc;
55 import java.util.ArrayList JavaDoc;
56 import java.util.HashMap JavaDoc;
57 import java.util.Iterator JavaDoc;
58
59 import org.apache.commons.logging.Log;
60 import org.apache.commons.logging.LogFactory;
61 import org.mr.MantaAgent;
62 import org.mr.MantaAgentConstants;
63 import org.mr.core.net.AgentStateListener;
64 import org.mr.core.net.NetworkManager;
65 import org.mr.core.persistent.PersistentMap;
66 import org.mr.core.protocol.DeadEndRecipient;
67 import org.mr.core.protocol.MantaBusMessage;
68 import org.mr.core.protocol.MantaBusMessageConsts;
69 import org.mr.core.protocol.MantaBusMessageUtil;
70 import org.mr.core.protocol.RecipientAddress;
71 import org.mr.core.util.PrioritizedList;
72 import org.mr.core.util.SystemTime;
73 import org.mr.kernel.services.DeadLetterHandler;
74 import org.mr.kernel.services.ServiceActorControlCenter;
75 import org.mr.kernel.services.ServiceConsumer;
76
77
78 /**
79  * PostOfficeBox is a hold the outgoing message to a recipient agent
80  * and know how to send/resend/dump/save the messages
81  * @author Amir Shevat
82  *
83  */

84 public class PostOfficeBox implements AgentStateListener {
85     NetworkManager manager;
86     // the recipient agent name
87
RecipientAddress recipient;
88     // see NetworkManager
89
int agentState = AGENT_STATE_NOT_MONITORING;
90     // recipient status
91
private boolean recipientOnline = true;
92
93     // the old message mapper but per recipient
94
PersistentMap savedMessages;
95     // if true the agentName == this agent name and no network is needed
96
private boolean localBox = false;
97     // if the recipient is durable
98
public boolean durable = false;
99     // if true the box is monitoring the state of the recipient agent
100
private boolean netWorkKeepAliveOn = false;
101     // moderator for this recipient
102
NetworkModerator moderator;
103     // logger
104
private Log log;
105
106
107     private static int DefaultNumberOfMessageInNetBuffer = 100;
108     private boolean notYetRecovered = true;
109
110     private boolean pause = false;
111     private boolean gotRejects = false;
112     private boolean throttle = false;
113     private boolean throttling = false;
114     private int highWatermark;
115     private int lowWatermark;
116     private long throttleSleep;
117
118     /**
119      * @param agentName the name of the recipient agent
120      */

121     public PostOfficeBox(RecipientAddress recipient) {
122
123         if(recipient instanceof ServiceConsumer && ((ServiceConsumer)recipient).isDurable()){
124             durable = true;
125             recipientOnline = ServiceActorControlCenter.isConsumerUp(recipient);
126         }
127         this.recipient = recipient;
128         moderator = new NetworkModerator(recipient.getId() , DefaultNumberOfMessageInNetBuffer);
129         if(MantaAgent.getInstance().getAgentName().equals(recipient.getAgentName())){
130             localBox = true;
131             //agentState = AGENT_STATE_UP;
132
}
133
134         manager = MantaAgent.getInstance().getSingletonRepository().getNetworkManager();
135         log=LogFactory.getLog("PostOfficeBox");
136
137     }
138
139     public PostOfficeBox(RecipientAddress recipient, boolean throttle,
140                          int highWatermark, int lowWatermark,
141                          long throttleSleep)
142     {
143         this(recipient);
144         this.throttle = throttle;
145         this.highWatermark = highWatermark;
146         this.lowWatermark = lowWatermark;
147         this.throttleSleep = throttleSleep;
148     }
149
150     private synchronized void initSavedMessages() {
151         if(savedMessages != null)
152             return;
153             savedMessages = new PersistentMap("messages_to_"+recipient.getId() ,true ,true);
154             notYetRecovered = false;
155     }
156
157     /**
158      * called by the logic or the resending reactor this method sends/saves the message
159      * if needed
160      * @param msg the message to be sent
161      */

162     public synchronized void handleMessage(MantaBusMessage msg) {
163         long now = SystemTime.gmtCurrentTimeMillis() ;
164         if( (msg.getValidUntil() < now) ){
165             // message is old and going to dead letter queue
166
if(log.isDebugEnabled()){
167                 log.info("Not sending message due to TTL expiration: Message ID="+msg.getMessageId()+
168                          ", Expiration Time="+msg.getValidUntil()+",Current Time="+now+".");
169             }
170             msg.addHeader(MantaBusMessageConsts.HEADER_NAME_SENT_FAIL , MantaBusMessageConsts.HEADER_VALUE_TRUE);
171             DeadLetterHandler.HandleDeadMessage(msg);
172         }else{
173             // all is good here
174
save(msg);
175             if(recipientOnline){
176                 sendToNet(msg);
177             }
178         }
179     }
180
181
182
183
184     private void sendToNet(MantaBusMessage msg){
185         if(!pause){
186             msg.setRecipient(this.recipient);
187             msg.setDeliveryCount(msg.getDeliveryCount()+1);
188             if(localBox){
189                 if(log.isDebugEnabled()){
190                     log.debug("Sending message "+msg.getMessageId()+" to local recipient.");
191                 }
192                 MantaAgent.getInstance().getSingletonRepository().getIncomingMessageManager().messageArrived(msg);
193                 return;
194             }else{
195 // if we are not minotoring the recipient we better start
196
if(!netWorkKeepAliveOn){
197                     manager.addAgentStateListener(msg.getRecipient().getAgentName() , this);
198                     netWorkKeepAliveOn = true;
199                 }
200 // send to network
201
if(log.isDebugEnabled()){
202                     log.debug("About to send message "+msg.getMessageId()+".");
203                 }
204                 try {
205                     moderator.sendToNetwork(msg);
206                 } catch (Exception JavaDoc e) {
207                     if(log.isErrorEnabled()){
208                         log.error("An error occured while trying to send message "+msg.getMessageId()+"." ,e);
209                     }
210                 }//try
211
}
212         }
213
214     }//sendToNet
215

216     /**
217      * saves the message in the map and on disk if needed
218      * @param msg the message to be saved
219      */

220     private void save(MantaBusMessage msg){
221         //if no ack can be returned by the recipient no need to save
222
if ((msg.getRecipient().getAcknowledgeMode() ==
223              MantaAgentConstants.NO_ACK) ||
224             msg.isRerouted()) {
225             return;
226         }
227         //if needed save message
228
byte delivery = msg.getDeliveryMode();
229         boolean persistent = (delivery == MantaAgentConstants.PERSISTENT);
230         initSavedMessages();
231         // we do not keep messages to non durable recipients
232
if(!durable){
233             persistent = false;
234         }
235
236         if(log.isDebugEnabled()){
237             log.debug("Ack required. Saving Manta message "+msg.getMessageId()+".");
238         }
239
240         synchronized(savedMessages){
241             savedMessages.put(msg.getMessageId() , msg ,persistent );
242         }
243         // if the throttle feature is activated, delay the action of
244
// this box if the number of backed up messages have exceeded
245
// the high watermark, and have not gone below the low
246
// watermark since.
247
if (throttle) {
248             int backlog = savedMessages.size();
249             if (backlog > this.highWatermark) {
250                 this.throttling = true;
251             }
252             if (throttling) {
253                 if (backlog < this.lowWatermark) {
254                     this.throttling = false;
255                 } else {
256                     try {
257                         Thread.sleep(this.throttleSleep);
258                     } catch (InterruptedException JavaDoc e) {}
259                 }
260             }
261         }
262     }
263
264     /**
265      * got ack no need to keep in box anymore if all recipients acked it
266      * @param messageId the message that was aked
267      */

268     public MantaBusMessage gotAck(String JavaDoc messageId){
269         initSavedMessages();
270         MantaBusMessage msg =(MantaBusMessage) savedMessages.get(messageId);
271
272         if(msg != null){
273             synchronized (msg) {
274                 if(log.isDebugEnabled()){
275                     log.debug("Got ack for message id " + messageId + " from " +
276                                  msg.getRecipient() +
277                                  ". Removing message from saved messages list.");
278                 }
279
280                     // we do not need this message any more it has been sent
281
synchronized(savedMessages){
282                         savedMessages.remove(messageId);
283                     }
284             }//synchronized
285
}// if
286

287         return msg;
288
289     }//gotAck
290

291     public MantaBusMessage gotRejectAck(String JavaDoc messageId) {
292
293         initSavedMessages();
294         MantaBusMessage msg =(MantaBusMessage) savedMessages.get(messageId);
295
296         if(msg != null){
297             synchronized (msg) {
298                 if(durable){
299                     if(log.isDebugEnabled()){
300                         log.debug("Got reject ack for message id " + messageId + " from " +
301                                      msg.getRecipient() +
302                                      ". This is a durable recipient so we keep the message ");
303                     }
304                     this.gotRejects = true;
305                 }else{
306                     if(log.isDebugEnabled()){
307                         log.debug("Got reject ack for message id " + messageId + " from " +
308                                      msg.getRecipient() +
309                                      ". Removing message from saved messages list.");
310                     }
311 // we do not need this message any more it has been sent- this is not a durable subscriber
312
synchronized(savedMessages){
313                         savedMessages.remove(messageId);
314                     }
315                 }
316
317
318
319             }//synchronized
320
}// if
321

322         return msg;
323     }
324
325
326     // this method helps to detect a change in the durable subscription.
327
// if the recovered messages were sent to a durable subscriber with
328
// different settings, the messages will be deleted.
329
private boolean checkMessageMatchConsumer() {
330         synchronized(savedMessages){
331             Iterator JavaDoc sentIter = savedMessages.values().iterator();
332             MantaBusMessage orig =(MantaBusMessage) sentIter.next();
333             ServiceConsumer messageRecipient = (ServiceConsumer)orig.getRecipient();
334             ServiceConsumer boxRecipient = (ServiceConsumer)this.recipient;
335             if (//bug:421 messageRecipient.getNoLocal() != boxRecipient.getNoLocal() ||
336
!messageRecipient.getServiceName().equals(boxRecipient.getServiceName()) ||
337                 !checkEqual(messageRecipient.getSelectorStatment(), boxRecipient.getSelectorStatment())) {
338                 return false;
339             }
340         }
341         return true;
342     }
343
344     private boolean checkEqual(Object JavaDoc o1, Object JavaDoc o2) {
345         //Aviad changed this method - null selector String is like an empty(i.e "") String
346
if (o1 == null) {
347             o1 = "";
348         }
349         if (o2 == null) {
350             o2 = "";
351         }
352         return o1.equals(o2);
353 // if (o1 == null) {
354
// return o2 == null;
355
// }
356
// if (o2 != null) {
357
// return o1.equals(o2);
358
// }
359
// return false;
360
}
361
362     /**
363      * checks if we need to send or resend messages
364      */

365     public synchronized void recoverBox() {
366         initSavedMessages();
367         if(savedMessages.isEmpty()){
368             return;
369         }
370
371         ArrayList JavaDoc tempList = new ArrayList JavaDoc(savedMessages.size());
372         synchronized(savedMessages){
373             // chek the the durable subscription didn't change.
374
// if it changed delete all messages.
375
if (durable) {
376                 if (!checkMessageMatchConsumer()) {
377                     if (log.isInfoEnabled()) {
378                         ServiceConsumer boxRecipient = (ServiceConsumer)this.recipient;
379                         log.info("Durable subscription '"+boxRecipient.getId()+"' was changed. Deleting old subscription's messages.");
380                     }
381                     savedMessages.removeAll();
382                     return;
383                 }
384             }
385
386             // because the state of the sent messages is not clear,
387
// we need to make a copy of them
388
Iterator JavaDoc sentIter = savedMessages.values().iterator();
389             while(sentIter.hasNext()){
390                 try {
391                     MantaBusMessage orig =(MantaBusMessage) sentIter.next();
392                     if(orig.getDeliveryCount()>0){
393                         orig = PostOffice.prepareMessageShallowCopy(orig);
394                     }
395                     tempList.add(orig);
396                 } catch (IOException JavaDoc e) {
397                     log.error("Resending Messages: An error occured during message recovering process.",e);
398                 }
399             }
400         }
401
402         // here we order by two orders one time of send and the other is priority
403
MantaBusMessageUtil.sortMessagesBySendTime(tempList);
404         PrioritizedList resendTempList = new PrioritizedList(MantaAgentConstants.TOTAL_PRIORITIES);
405         resendTempList.addAll(tempList);
406
407         // now the list is sorted lets send the messages
408
MantaBusMessage msg;
409         long now;
410         Iterator JavaDoc it = resendTempList.iterator();
411         while (it.hasNext()) {
412             msg = (MantaBusMessage)it.next();
413             now = SystemTime.gmtCurrentTimeMillis() ;
414             if (msg.getValidUntil() < now) {
415                 // message is old and going to dead letter queue
416
if(log.isInfoEnabled()){
417                     log.info("Resending Messages: Not resending message due to TTL expiration: Message ID="+
418                              msg.getMessageId()+", Expiration Time=" +msg.getValidUntil()+", Current Time="+now+".");
419                 }
420                 msg.addHeader(MantaBusMessageConsts.HEADER_NAME_SENT_FAIL , MantaBusMessageConsts.HEADER_VALUE_TRUE);
421                 synchronized(msg){
422                     msg.notifyAll();
423                 }
424                 savedMessages.remove(msg.getMessageId());
425                 DeadLetterHandler.HandleDeadMessage(msg);
426
427             }// end of dead old messge
428
else{
429 // check if message needs to be sent/resent
430
// if the remove agent is down there is nothing for me to do
431
// and it is time to send/resend message
432
if(log.isDebugEnabled() && msg.getDeliveryCount() >=1 ){
433                     log.debug("Resending message "+msg.getMessageId()+".");
434                 }
435                 //put current recipient
436

437                 // send the message
438
// delivery count is incremented in sendToNet();
439
//msg.setDeliveryCount(msg.getDeliveryCount()+1);
440
sendToNet(msg);
441             }
442             // we have recoved all rejects
443
this.gotRejects = false;
444         }//for
445
}
446
447
448     /* (non-Javadoc)
449      * @see org.mr.core.net.AgentStateListener#agentStateChanged(java.lang.String, int)
450      */

451     public synchronized void agentStateChanged(String JavaDoc agent, int state) {
452
453         if(state == AGENT_STATE_DOWN && agentState != AGENT_STATE_DOWN){
454             int oldState = agentState;
455             agentState = AGENT_STATE_DOWN;
456             // log this only if the agent previously was up
457
if(oldState == AGENT_STATE_UP && log.isInfoEnabled()){
458                 log.info("Got agent down event from peer "+agent+" (box is "+recipient+")");
459             }
460             setRecipientOnline(false);
461         }else if(state == AGENT_STATE_UP && agentState != AGENT_STATE_UP){
462
463             // then allow new messages
464
agentState = AGENT_STATE_UP;
465             if(!durable || ServiceActorControlCenter.isConsumerUp(recipient)){
466                 setRecipientOnline(true);
467             }
468             if(log.isInfoEnabled()){
469                 log.info("Got agent up event from peer "+agent+" (box is "+recipient+")");
470             }
471         }
472
473     }
474
475     /**
476      * @return Returns the moderator.
477      */

478     public NetworkModerator getModerator() {
479         return moderator;
480     }
481
482     /**
483      * closes the PO box this is done only when an agent is removed from the world map
484      */

485     public void close() {
486         manager.removeAgentStateListener(recipient.getAgentName(), this);
487         netWorkKeepAliveOn = false;
488
489     }
490
491     public boolean isRecipientOnline() {
492         return recipientOnline;
493     }
494
495     synchronized void setRecipientOnline(boolean newStatus) {
496         if(newStatus == true){
497             // recipient online
498
if(notYetRecovered == true ||recipientOnline == false || gotRejects == true ){
499                 recoverBox();
500             }
501         }else{
502             // recipient offline
503
if(recipientOnline == true ){
504                 moderator.clear();
505             }
506             if(recipient instanceof DeadEndRecipient){
507                 MantaAgent.getInstance().getSingletonRepository().getPostOffice().handleRecipientDown(recipient);
508             }
509         }
510
511         recipientOnline = newStatus;
512     }
513
514     /**
515      * called when the recipient VM went down or when recipient is recalled
516      *
517      */

518     void handleRecipientDown(){
519         moderator.clear();
520         if(durable == false){
521             if(savedMessages != null){
522                 savedMessages.clear();
523             }
524         }
525         if(netWorkKeepAliveOn && !localBox ){
526             manager.removeAgentStateListener(recipient.getAgentName() , this);
527             netWorkKeepAliveOn = false;
528         }
529         recipientOnline = false;
530     }
531
532
533     /**
534      * updates the mataray layer of the consumer
535      */

536     synchronized void updateConsumer(ServiceConsumer consumer) {
537         if(consumer.isDurable()){
538             if(MantaAgent.getInstance().getAgentName().equals(consumer.getAgentName())){
539                 localBox = true;
540             }else{
541                 localBox = false;
542             }
543             this.recipient = consumer;
544         }
545     }
546
547     public synchronized void pause() {
548         pause = true;
549
550     }
551
552     public synchronized void resume() {
553         pause = false;
554         recoverBox();
555
556     }
557
558     public synchronized void purge() {
559         savedMessages.clear();
560
561     }
562
563     public HashMap JavaDoc getSavedMessages() {
564         HashMap JavaDoc result;
565         synchronized(savedMessages){
566              result = new HashMap JavaDoc(savedMessages);
567         }
568         return result;
569     }
570
571
572 }
573
Popular Tags