KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > kernel > delivery > PostOffice


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Amir Shevat.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 /*
47  * Created on 31/05/2004
48  *
49  * Manta LTD
50  */

51 package org.mr.kernel.delivery;
52
53
54 import java.io.IOException JavaDoc;
55 import java.nio.ByteBuffer JavaDoc;
56 import java.security.MessageDigest JavaDoc;
57
58 import java.util.HashMap JavaDoc;
59 import java.util.Iterator JavaDoc;
60
61
62 import org.mr.MantaAgent;
63 import org.apache.commons.logging.Log;
64 import org.apache.commons.logging.LogFactory;
65 import org.mr.core.net.MantaAddress;
66 import org.mr.core.protocol.MantaBusMessage;
67 import org.mr.core.protocol.MessageTransformer;
68 import org.mr.core.protocol.PayloadContainer;
69 import org.mr.core.protocol.RecipientAddress;
70 import org.mr.core.util.byteable.Byteable;
71 import org.mr.core.util.byteable.IncomingByteBufferPool;
72 import org.mr.core.configuration.ConfigManager;
73 import org.mr.kernel.services.ServiceActor;
74 import org.mr.kernel.services.ServiceActorControlCenter;
75 import org.mr.kernel.services.ServiceActorStatusListener;
76 import org.mr.kernel.services.ServiceConsumer;
77 import org.mr.kernel.services.MantaService;
78 import org.mr.kernel.services.queues.QueueMaster;
79 import org.mr.kernel.world.WorldModeler;
80
81
82 /**
83  *
84  * PostOffice holds the post office box to all the agents we are in communication with
85  * @author Amir Shevat
86  */

87 public class PostOffice implements ServiceActorStatusListener {
88     // holds the post office boxes for every destination
89
private static final byte THROTTLE_NONE = 0;
90     private static final byte THROTTLE_QUEUES = 1;
91     private static final byte THROTTLE_TOPICS = 2;
92     private static final byte THROTTLE_ALL = 3;
93
94     private HashMap JavaDoc agentToPOB = new HashMap JavaDoc();
95     private Log log;
96     private byte throttleDomain;
97     private int throttleHighWaterMark;
98     private int throttleLowWatermark;
99     private long throttleSleep;
100
101     /**
102      * inits the delivery module and starts the resending reactor
103      * @param modeler the world modeler holds the map of the world as we know it- we added this abject as a listener to updates
104      */

105     public PostOffice(WorldModeler modeler ){
106         log=LogFactory.getLog("PostOffice");
107         ServiceActorControlCenter.addConsumerStatusListeners(this);
108
109         ConfigManager config = MantaAgent.getInstance().
110             getSingletonRepository().getConfigManager();
111         this.throttleDomain =
112             (byte) config.getIntProperty("delivery.throttle.domain", 2);
113         this.throttleHighWaterMark =
114             config.getIntProperty("delivery.throttle.high-watermark", 100000);
115         this.throttleLowWatermark =
116             config.getIntProperty("delivery.throttle.low-watermark", 90000);
117         this.throttleSleep =
118             (long) config.getIntProperty("delivery.throttle.sleep", 5) * 1000;
119     }
120     /**
121      * returns the PostOfficeBox of a given recipiant
122      * @param agentName the recipiant that this PostOfficeBox maps to
123      * @return an object that deals with messages sent to this recipiant
124      */

125     public PostOfficeBox getPostOfficeBox(String JavaDoc recipientID){
126         return (PostOfficeBox) agentToPOB.get(recipientID);
127     }
128     
129     
130     /**
131      * tells the PO box of the source that the source has gotten this message and there
132      * is not need to resend
133      * @param ackedMessageId the id of the message that was acked
134      * @param source the address of the agent that got the message
135      * @return
136      */

137     public MantaBusMessage gotAck(String JavaDoc ackedMessageId, MantaAddress source){
138         String JavaDoc id = ((RecipientAddress)source).getId();
139         PostOfficeBox box = getPostOfficeBox(id);
140         if(box != null)
141             return box.gotAck(ackedMessageId );
142         return null;
143         
144     }
145     
146     public MantaBusMessage gotAckReject(String JavaDoc ackedMessageId, MantaAddress source) {
147         String JavaDoc id = ((RecipientAddress)source).getId();
148         PostOfficeBox box = getPostOfficeBox(id);
149         if(box != null)
150             return box.gotRejectAck(ackedMessageId );
151         return null;
152     }
153
154     /**
155      * duplicates the message (same payload) according to recipient (if needed) and route it to the boxes
156      * @param msg the message to be sent, must have at least 1 recipiant
157      */

158     public final void SendMessage(MantaBusMessage msg ){
159         if(log.isDebugEnabled()){
160             log.debug("Message arrived to post office: "+msg.getMessageId()+".");
161         }
162         
163         if(msg.getRecipient() ==null)
164             throw new IllegalArgumentException JavaDoc("No recipient was set for this message: "+msg+".");
165         
166     
167             PostOfficeBox box= getPostOfficeBox(msg.getRecipient().getId());
168             if(box == null){
169                 box = handleRecipientAdded(msg.getRecipient(), false, 0, 0, 0);
170             }
171             if(box != null){
172                 box.handleMessage(msg);
173             }else{
174                 if(log.isInfoEnabled()){
175                     log.info("Did not find recipient "+msg.getRecipient().getId()+". Message was not sent: "+msg.getMessageId()+".");
176                 }
177             }
178     }
179     
180     
181     
182     
183     /**
184      * when a message arrives from the network layer this method is called
185      * @param partialMD5
186      * @param msg the arriving message
187      */

188     public final void messageArrived(ByteBuffer JavaDoc buff, byte[] messageMD5, MessageDigest JavaDoc partialMD5) throws IOException JavaDoc{
189         try{
190             if(log.isDebugEnabled()){
191                 log.debug("Got buffer size "+buff.remaining()+".");
192             }
193             // transform to a message
194
MantaBusMessage msg = MessageTransformer.fromBuffer(buff);
195             msg.setMessageMD5(messageMD5);
196             msg.setPartialMD5(partialMD5);
197             if(buff!= null && !MantaBusMessage.isLazyParsing()){
198                 IncomingByteBufferPool.getInstance().release(buff);
199             }
200             
201             if(log.isDebugEnabled()){
202                 log.debug("Manta message arrived sending to logic layer. msg = "+msg+".");
203             }
204             // send to ProtocolManager
205
MantaAgent.getInstance().getSingletonRepository().getIncomingMessageManager().messageArrived(msg);
206         } catch (IOException JavaDoc e) {
207             if(log.isErrorEnabled()){
208                 log.error("Error in getting message from stream. ",e);
209             }
210             throw e;
211         }
212     
213         
214     }
215     
216     
217     
218     public static final MantaBusMessage prepareMessageShallowCopy(MantaBusMessage orig) throws IOException JavaDoc{
219         MantaBusMessage msg = MantaBusMessage.getInstance();
220         msg.setMessageType(orig.getMessageType());
221         msg.getElements().putAll(orig.getElements());
222         msg.setMessagesId(orig.getMessageIdAsLong());
223         msg.setDeliveryMode(orig.getDeliveryMode());
224         msg.setMessageType(orig.getMessageType());
225         msg.setPriority(orig.getPriority());
226         msg.setValidUntil(orig.getValidUntil());
227         msg.setDeliveryCount(orig.getDeliveryCount());
228         msg.setSource(orig.getSource());
229         Byteable payload = orig.getPayloadContainer().getPayloadObject();
230         msg.setPayloadContainer(new PayloadContainer(payload));
231         
232         msg.setRecipient(orig.getRecipient());
233         return msg;
234
235     }
236     
237     
238     
239     /* --------------------------- interface WorldModelerNetListener
240     /* (non-Javadoc)
241      * @see org.mr.kernel.world.WorldModelerNetListener#handleAgentUp(java.lang.String)
242      */

243     public synchronized PostOfficeBox
244         handleRecipientAdded(RecipientAddress consumer, boolean throttle,
245                              int highWatermark, int lowWatermark,
246                              long throttleSleep)
247     {
248         PostOfficeBox pob = getPostOfficeBox(consumer.getId());
249         if(pob == null){
250             pob = new PostOfficeBox(consumer, throttle, highWatermark,
251                                     lowWatermark, throttleSleep);
252             agentToPOB.put(consumer.getId() ,pob );
253         }
254         return pob;
255         
256         
257     }
258
259     
260     /**
261      * Indicates that ServiceConsumer has just advertise itself
262      */

263     public void handleConsumerUp(ServiceConsumer consumer) {
264         PostOfficeBox pob = getPostOfficeBox(consumer.getId());
265         if(pob == null){
266             boolean throttle = false;
267             byte serviceType = consumer.getServiceType();
268             if ((this.throttleDomain == THROTTLE_ALL) ||
269                 (this.throttleDomain == THROTTLE_TOPICS &&
270                  serviceType == MantaService.SERVICE_TYPE_TOPIC) ||
271                 (this.throttleDomain == THROTTLE_QUEUES &&
272                  serviceType == MantaService.SERVICE_TYPE_QUEUE)) {
273                 throttle = true;
274             }
275             pob = handleRecipientAdded(consumer, throttle,
276                                        this.throttleHighWaterMark,
277                                        this.throttleLowWatermark,
278                                        this.throttleSleep);
279         }else{
280             // we need to check that the conuser is still on the same layer
281
pob.updateConsumer(consumer);
282         }
283         pob.setRecipientOnline(true);
284         
285     }
286     
287     public void handleConsumerDown(ServiceConsumer consumer) {
288         handleRecipientDown(consumer);
289         
290     }
291     
292     public void handleCoordinatorDown(QueueMaster oldMaster,
293                                       QueueMaster newMaster)
294     {
295         if (newMaster != null) {
296             PostOfficeBox pob = getPostOfficeBox(oldMaster.getId());
297             if (pob != null) {
298                 HashMap JavaDoc messages = pob.getSavedMessages();
299                 Iterator JavaDoc i = messages.values().iterator();
300                 while (i.hasNext()) {
301                     MantaBusMessage message = (MantaBusMessage) i.next();
302                     message.setRecipient(newMaster);
303                     SendMessage(message);
304                 }
305             }
306         }
307         handleRecipientDown(oldMaster);
308     }
309     
310     public void handleRecipientDown(RecipientAddress recipient){
311         PostOfficeBox pob = getPostOfficeBox(recipient.getId());
312         if(pob != null){
313             //pob.setRecipientOnline(false);
314
pob.handleRecipientDown();
315             if(pob.durable == false)
316                 agentToPOB.remove(recipient.getId());
317         }
318     }
319     
320     /**
321      * removes durable subscriber POB and temp queue coordinator
322      * @param consumer the durable subscriber
323      */

324     public void closeBox(ServiceActor actor) {
325         RecipientAddress recipient = (RecipientAddress)actor;
326         PostOfficeBox pob = getPostOfficeBox(recipient.getId());
327         if(pob != null){
328             pob.handleRecipientDown();
329             agentToPOB.remove(recipient.getId());
330             pob.close();
331         }
332     }
333     
334     
335     
336     
337
338     
339     
340 }
341
342 /**
343  * copies all the headers from the original and copies the pointer to the payload
344  * @param orig the original message to be copied
345  * @return a list of copies or the original message in an list (in case no copy is needed)
346  
347 public static final List prepareMessageShallowCopies(MantaBusMessage orig, List recipients){
348     ArrayList result = new ArrayList();
349     
350     int size = recipients.size();
351     // if there is only one recipient no need to make copy
352     if(size == 1){
353         orig.setRecipient((RecipientAddress) recipients.get(0));
354         result.add(orig);
355         return result;
356     }
357     // we have multipul recipient
358     orig.getPayloadContainer().setReferenceCount(size);
359     Iterator dests = recipients.iterator();
360     while(dests.hasNext()){
361         MantaBusMessage msg =prepareMessageShallowCopy(orig);
362         msg.setRecipient((RecipientAddress) dests.next());
363         result.add(msg);
364     }
365     
366     return result;
367 }
368 */

369
Popular Tags