KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > dest > QueueImpl


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2007 ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - 2000 Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (INRIA)
22  * Contributor(s): ScalAgent Distributed Technologies
23  */

24 package org.objectweb.joram.mom.dest;
25
26 import java.io.IOException JavaDoc;
27 import java.io.ObjectInputStream JavaDoc;
28 import java.io.ObjectOutputStream JavaDoc;
29 import java.util.ArrayList JavaDoc;
30 import java.util.Enumeration JavaDoc;
31 import java.util.Hashtable JavaDoc;
32 import java.util.Iterator JavaDoc;
33 import java.util.List JavaDoc;
34 import java.util.Properties JavaDoc;
35 import java.util.Vector JavaDoc;
36
37 import org.objectweb.joram.mom.messages.Message;
38 import org.objectweb.joram.mom.notifications.AbortReceiveRequest;
39 import org.objectweb.joram.mom.notifications.AcknowledgeRequest;
40 import org.objectweb.joram.mom.notifications.AdminReply;
41 import org.objectweb.joram.mom.notifications.BrowseReply;
42 import org.objectweb.joram.mom.notifications.BrowseRequest;
43 import org.objectweb.joram.mom.notifications.ClientMessages;
44 import org.objectweb.joram.mom.notifications.DenyRequest;
45 import org.objectweb.joram.mom.notifications.DestinationAdminRequestNot;
46 import org.objectweb.joram.mom.notifications.ExceptionReply;
47 import org.objectweb.joram.mom.notifications.Monit_GetDMQSettings;
48 import org.objectweb.joram.mom.notifications.Monit_GetDMQSettingsRep;
49 import org.objectweb.joram.mom.notifications.Monit_GetNbMaxMsg;
50 import org.objectweb.joram.mom.notifications.Monit_GetNbMaxMsgRep;
51 import org.objectweb.joram.mom.notifications.Monit_GetNumberRep;
52 import org.objectweb.joram.mom.notifications.Monit_GetPendingMessages;
53 import org.objectweb.joram.mom.notifications.Monit_GetPendingRequests;
54 import org.objectweb.joram.mom.notifications.QueueMsgReply;
55 import org.objectweb.joram.mom.notifications.ReceiveRequest;
56 import org.objectweb.joram.mom.notifications.SetNbMaxMsgRequest;
57 import org.objectweb.joram.mom.notifications.SetRightRequest;
58 import org.objectweb.joram.mom.notifications.SetThreshRequest;
59 import org.objectweb.joram.mom.notifications.WakeUpNot;
60 import org.objectweb.joram.shared.admin.ClearQueue;
61 import org.objectweb.joram.shared.admin.DeleteQueueMessage;
62 import org.objectweb.joram.shared.admin.GetQueueMessage;
63 import org.objectweb.joram.shared.admin.GetQueueMessageIds;
64 import org.objectweb.joram.shared.admin.GetQueueMessageIdsRep;
65 import org.objectweb.joram.shared.admin.GetQueueMessageRep;
66 import org.objectweb.joram.shared.excepts.AccessException;
67 import org.objectweb.joram.shared.excepts.DestinationException;
68 import org.objectweb.joram.shared.selectors.Selector;
69 import org.objectweb.util.monolog.api.BasicLevel;
70 import org.objectweb.util.monolog.api.Logger;
71
72 import fr.dyade.aaa.agent.AgentId;
73 import fr.dyade.aaa.agent.DeleteNot;
74 import fr.dyade.aaa.agent.Notification;
75 import fr.dyade.aaa.agent.UnknownAgent;
76 import fr.dyade.aaa.util.Debug;
77
78 /**
79  * The <code>QueueImpl</code> class implements the MOM queue behaviour,
80  * basically storing messages and delivering them upon clients requests.
81  */

82 public class QueueImpl extends DestinationImpl implements QueueImplMBean {
83   public static Logger logger = Debug.getLogger(QueueImpl.class.getName());
84
85   /** period to run task at regular interval: cleaning, load-balancing, etc. */
86   protected long period = -1;
87
88   /**
89    * Returns the period value of this queue, -1 if not set.
90    *
91    * @return the period value of this queue; -1 if not set.
92    */

93   public long getPeriod() {
94     return period;
95   }
96
97   /**
98    * Sets or unsets the period for this queue.
99    *
100    * @param period The period value to be set or -1 for unsetting previous
101    * value.
102    */

103   public void setPeriod(long period) {
104     if ((this.period == -1L) && (period != -1L)) {
105       // Schedule the CleaningTask.
106
forward(destId, new WakeUpNot());
107     }
108     this.period = period;
109   }
110
111   /**
112    * Threshold above which messages are considered as undeliverable because
113    * constantly denied; 0 stands for no threshold, <code>null</code> for value
114    * not set.
115    */

116   private Integer JavaDoc threshold = null;
117
118   /**
119    * Returns the threshold value of this queue, -1 if not set.
120    *
121    * @return the threshold value of this queue; -1 if not set.
122    */

123   public int getThreshold() {
124     if (threshold == null)
125       return -1;
126     else
127       return threshold.intValue();
128   }
129
130   /**
131    * Sets or unsets the threshold for this queue.
132    *
133    * @param The threshold value to be set (-1 for unsetting previous value).
134    */

135   public void setThreshold(int threshold) {
136     if (threshold < 0)
137       this.threshold = null;
138     else
139       this.threshold = new Integer JavaDoc(threshold);
140   }
141
142   /** <code>true</code> if all the stored messages have the same priority. */
143   private boolean samePriorities;
144   /** Common priority value. */
145   private int priority;
146
147   /** Table keeping the messages' consumers identifiers. */
148   protected Hashtable JavaDoc consumers;
149   /** Table keeping the messages' consumers contexts. */
150   protected Hashtable JavaDoc contexts;
151
152   /** Counter of messages arrivals. */
153   protected long arrivalsCounter = 0;
154
155   /**
156    * Returns the number of messages received since creation time.
157    *
158    * @return The number of received messages.
159    */

160   public int getMessageCounter() {
161     if (messages != null) {
162       return messages.size();
163     }
164     return 0;
165   }
166
167   /** Vector holding the requests before reply or expiry. */
168   protected Vector JavaDoc requests;
169
170   /**
171    * Cleans the waiting request list.
172    * Removes all request that the expiration time is less than the time
173    * given in parameter.
174    *
175    * @param currentTime The current time.
176    */

177   protected void cleanWaitingRequest(long currentTime) {
178     int index = 0;
179     while (index < requests.size()) {
180       if (! ((ReceiveRequest) requests.get(index)).isValid(currentTime)) {
181         // Request expired: removing it
182
requests.remove(index);
183         // It's not really necessary to save its state, in case of failure
184
// a similar work will be done at restart.
185
} else {
186         index++;
187       }
188     }
189   }
190
191   /**
192    * Returns the number of waiting requests in the queue.
193    *
194    * @return The number of waiting requests.
195    */

196   public int getWaitingRequestCount() {
197     if (requests != null) {
198       cleanWaitingRequest(System.currentTimeMillis());
199       return requests.size();
200     }
201     return 0;
202   }
203
204   /** <code>true</code> if the queue is currently receiving messages. */
205   protected transient boolean receiving = false;
206
207   /** Vector holding the messages before delivery. */
208   protected transient Vector JavaDoc messages;
209
210   /**
211    * Cleans the pending messages list.
212    * Removes all messages that the expiration time is less than the time
213    * given in parameter.
214    *
215    * @param currentTime The current time.
216    * @return A vector of all expired messages.
217    */

218   protected ClientMessages cleanPendingMessage(long currentTime) {
219     int index = 0;
220
221     ClientMessages deadMessages = null;
222
223     Message message = null;
224     while (index < messages.size()) {
225       message = (Message) messages.get(index);
226       if (! message.isValid(currentTime)) {
227         messages.remove(index);
228         message.delete();
229         
230         message.msg.expired = true;
231
232         if (deadMessages == null)
233           deadMessages = new ClientMessages();
234         deadMessages.addMessage(message.msg);
235
236         if (logger.isLoggable(BasicLevel.DEBUG))
237           logger.log(BasicLevel.DEBUG,
238                      "Removes expired message " + message.getIdentifier());
239       } else {
240         index++;
241       }
242     }
243     return deadMessages;
244   }
245
246   /**
247    * Returns the number of pending messages in the queue.
248    *
249    * @return The number of pending messages.
250    */

251   public int getPendingMessageCount() {
252     if (messages != null) {
253       return messages.size();
254     }
255     return 0;
256   }
257
258   /** Table holding the delivered messages before acknowledgement. */
259   protected transient Hashtable JavaDoc deliveredMsgs;
260
261   /**
262    * Returns the number of messages delivered and waiting for acknowledge.
263    *
264    * @return The number of messages delivered.
265    */

266   public int getDeliveredMessageCount() {
267     if (deliveredMsgs != null) {
268       return deliveredMsgs.size();
269     }
270     return 0;
271   }
272
273   /** nb Max of Message store in queue (-1 no limit). */
274   protected int nbMaxMsg = -1;
275
276   /**
277    * Returns the maximum number of message for the destination.
278    * If the limit is unset the method returns -1.
279    *
280    * @return the maximum number of message for subscription if set;
281    * -1 otherwise.
282    */

283   public int getNbMaxMsg() {
284     return nbMaxMsg;
285   }
286
287   /**
288    * Sets the maximum number of message for the destination.
289    *
290    * @param nbMaxMsg the maximum number of message (-1 set no limit).
291    */

292   public void setNbMaxMsg(int nbMaxMsg) {
293     // state change, so save.
294
setSave();
295     this.nbMaxMsg = nbMaxMsg;
296   }
297
298   /**
299    * Constructs a <code>QueueImpl</code> instance.
300    *
301    * @param destId Identifier of the agent hosting the queue.
302    * @param adminId Identifier of the administrator of the queue.
303    * @param prop The initial set of properties.
304    */

305   public QueueImpl(AgentId destId, AgentId adminId, Properties JavaDoc prop) {
306     super(destId, adminId, prop);
307
308     try {
309       if (prop != null)
310         period = Long.valueOf(prop.getProperty("period")).longValue();
311     } catch (NumberFormatException JavaDoc exc) {
312       period = -1L;
313     }
314
315     consumers = new Hashtable JavaDoc();
316     contexts = new Hashtable JavaDoc();
317     requests = new Vector JavaDoc();
318   }
319
320   /**
321    * Returns a string representation of this destination.
322    */

323   public String JavaDoc toString() {
324     return "QueueImpl:" + (destId == null ? "null" : destId.toString());
325   }
326
327   /**
328    * wake up, and cleans the queue.
329    */

330   public void wakeUpNot(WakeUpNot not) {
331     long current = System.currentTimeMillis();
332     cleanWaitingRequest(current);
333      // Cleaning the possible expired messages.
334
ClientMessages deadMessages = cleanPendingMessage(current);
335     // If needed, sending the dead messages to the DMQ:
336
if (deadMessages != null)
337       sendToDMQ(deadMessages, null);
338   }
339
340   /**
341    * Method implementing the reaction to a <code>SetThreshRequest</code>
342    * instance setting the threshold value for this queue.
343    *
344    * @exception AccessException If the requester is not the administrator.
345    */

346   public void setThreshRequest(AgentId from, SetThreshRequest req) throws AccessException {
347     if (! isAdministrator(from))
348       throw new AccessException("ADMIN right not granted");
349
350     // state change, so save.
351
setSave();
352
353     threshold = req.getThreshold();
354     
355     String JavaDoc info = strbuf.append("Request [").append(req.getClass().getName())
356       .append("], sent to Queue [").append(destId)
357       .append("], successful [true]: threshold [")
358       .append(threshold).append("] set").toString();
359     strbuf.setLength(0);
360     forward(from, new AdminReply(req, true, info));
361
362     if (logger.isLoggable(BasicLevel.DEBUG))
363       logger.log(BasicLevel.DEBUG, info);
364   }
365
366   /**
367    * Method implementing the reaction to a <code>SetNbMaxMsgRequest</code>
368    * instance setting the NbMaxMsg value for this queue.
369    *
370    * @exception AccessException If the requester is not the administrator.
371    */

372   public void setNbMaxMsgRequest(AgentId from, SetNbMaxMsgRequest req) throws AccessException {
373     if (! isAdministrator(from))
374       throw new AccessException("ADMIN right not granted");
375
376     nbMaxMsg = req.getNbMaxMsg();
377     
378     String JavaDoc info = strbuf.append("Request [").append(req.getClass().getName())
379       .append("], sent to Queue [").append(destId)
380       .append("], successful [true]: nbMaxMsg [")
381       .append(nbMaxMsg).append("] set").toString();
382     strbuf.setLength(0);
383     forward(from, new AdminReply(req, true, info));
384
385     if (logger.isLoggable(BasicLevel.DEBUG))
386       logger.log(BasicLevel.DEBUG, info);
387   }
388
389   /**
390    * Overrides this <code>DestinationImpl</code> method for sending back
391    * the threshold along with the DMQ id.
392    *
393    * @exception AccessException If the requester is not the administrator.
394    */

395   public void MonitGetDMQSettings(AgentId from, Monit_GetDMQSettings not) throws AccessException {
396     if (! isAdministrator(from))
397       throw new AccessException("ADMIN right not granted");
398
399     String JavaDoc id = null;
400     if (dmqId != null)
401       id = dmqId.toString();
402     forward(from, new Monit_GetDMQSettingsRep(not, id, threshold));
403   }
404
405   /**
406    * Method implementing the reaction to a
407    * <code>Monit_GetPendingMessages</code> notification requesting the
408    * number of pending messages.
409    *
410    * @exception AccessException If the requester is not the administrator.
411    */

412   public void monitGetPendingMessages(AgentId from, Monit_GetPendingMessages not) throws AccessException {
413     if (! isAdministrator(from))
414       throw new AccessException("ADMIN right not granted");
415
416     // Cleaning the possible expired messages.
417
ClientMessages deadMessages = cleanPendingMessage(System.currentTimeMillis());
418     // Sending the dead messages to the DMQ, if needed:
419
if (deadMessages != null)
420       sendToDMQ(deadMessages, null);
421
422     forward(from, new Monit_GetNumberRep(not, messages.size()));
423   }
424
425   /**
426    * Method implementing the reaction to a
427    * <code>Monit_GetPendingRequests</code> notification requesting the
428    * number of pending requests.
429    *
430    * @exception AccessException If the requester is not the administrator.
431    */

432   public void monitGetPendingRequests(AgentId from, Monit_GetPendingRequests not) throws AccessException {
433     if (! isAdministrator(from))
434       throw new AccessException("ADMIN right not granted");
435
436
437     forward(from, new Monit_GetNumberRep(not, getWaitingRequestCount()));
438   }
439
440   /**
441    * Method implementing the reaction to a
442    * <code>Monit_GetNbMaxMsg</code> notification requesting the
443    * number max of messages in this queue.
444    *
445    * @exception AccessException If the requester is not the administrator.
446    */

447   public void monitGetNbMaxMsg(AgentId from, Monit_GetNbMaxMsg not) throws AccessException {
448     if (! isAdministrator(from))
449       throw new AccessException("ADMIN right not granted");
450
451     forward(from, new Monit_GetNbMaxMsgRep(not,nbMaxMsg));
452   }
453
454   /**
455    * Method implementing the reaction to a <code>ReceiveRequest</code>
456    * instance, requesting a message.
457    * <p>
458    * This method stores the request and launches a delivery sequence.
459    *
460    * @exception AccessException If the sender is not a reader.
461    */

462   public void receiveRequest(AgentId from, ReceiveRequest not) throws AccessException {
463     // If client is not a reader, sending an exception.
464
if (! isReader(from))
465       throw new AccessException("READ right not granted");
466
467     String JavaDoc[] toAck = not.getMessageIds();
468     if (toAck != null) {
469       for (int i = 0; i < toAck.length; i++) {
470         acknowledge(toAck[i]);
471       }
472     }
473
474     long current = System.currentTimeMillis();
475     cleanWaitingRequest(current);
476     // Storing the request:
477
not.requester = from;
478     not.setExpiration(current);
479     if (not.isPersistent()) {
480       // state change, so save.
481
setSave();
482     }
483     requests.add(not);
484
485     if (logger.isLoggable(BasicLevel.DEBUG))
486       logger.log(BasicLevel.DEBUG, " -> requests count = " + requests.size());
487
488     // Launching a delivery sequence for this request:
489
int reqIndex = requests.size() - 1;
490     deliverMessages(reqIndex);
491     
492     // If the request has not been answered and if it is an immediate
493
// delivery request, sending a null:
494
if ((requests.size() - 1) == reqIndex && not.getTimeOut() == -1) {
495       requests.remove(reqIndex);
496       QueueMsgReply reply = new QueueMsgReply(not);
497       if (isLocal(from)) {
498         reply.setPersistent(false);
499       }
500       forward(from, reply);
501
502       if (logger.isLoggable(BasicLevel.DEBUG))
503         logger.log(BasicLevel.DEBUG, "Receive answered by a null.");
504     }
505   }
506
507   /**
508    * Method implementing the queue reaction to a <code>BrowseRequest</code>
509    * instance, requesting an enumeration of the messages on the queue.
510    * <p>
511    * The method sends a <code>BrowseReply</code> back to the client. Expired
512    * messages are sent to the DMQ.
513    *
514    * @exception AccessException If the requester is not a reader.
515    */

516   public void browseRequest(AgentId from, BrowseRequest not) throws AccessException {
517     // If client is not a reader, sending an exception.
518
if (! isReader(from))
519       throw new AccessException("READ right not granted");
520
521     // Building the reply:
522
BrowseReply rep = new BrowseReply(not);
523     
524     // Cleaning the possible expired messages.
525
ClientMessages deadMessages = cleanPendingMessage(System.currentTimeMillis());
526     // Adding the deliverable messages to it:
527
int i = 0;
528     Message message;
529     while (i < messages.size()) {
530       message = (Message) messages.get(i);
531       if (Selector.matches(message.msg, not.getSelector())) {
532         // Matching selector: adding the message:
533
rep.addMessage(message.msg);
534       }
535       i++;
536     }
537
538     // Sending the dead messages to the DMQ, if needed:
539
if (deadMessages != null)
540       sendToDMQ(deadMessages, null);
541
542     // Delivering the reply:
543
forward(from, rep);
544
545     if (logger.isLoggable(BasicLevel.DEBUG))
546       logger.log(BasicLevel.DEBUG, "Request answered.");
547   }
548
549   /**
550    * Method implementing the reaction to an <code>AcknowledgeRequest</code>
551    * instance, requesting messages to be acknowledged.
552    */

553   public void acknowledgeRequest(AgentId from, AcknowledgeRequest not) {
554     for (Enumeration JavaDoc ids = not.getIds(); ids.hasMoreElements();) {
555       String JavaDoc msgId = (String JavaDoc) ids.nextElement();
556       acknowledge(msgId);
557     }
558   }
559   
560   private void acknowledge(String JavaDoc msgId) {
561     Message msg = (Message) deliveredMsgs.remove(msgId);
562     if ((msg != null) && msg.getPersistent()) {
563       // state change, so save.
564
setSave();
565     }
566     consumers.remove(msgId);
567     contexts.remove(msgId);
568     if (msg != null) {
569       msg.delete();
570
571       if (logger.isLoggable(BasicLevel.DEBUG)) {
572         logger.log(BasicLevel.DEBUG, "Message " + msgId + " acknowledged.");
573       }
574     } else if (logger.isLoggable(BasicLevel.WARN)) {
575       logger.log(BasicLevel.WARN,
576                  "Message " + msgId + " not found for acknowledgement.");
577     }
578   }
579
580   /**
581    * Method implementing the reaction to a <code>DenyRequest</code>
582    * instance, requesting messages to be denied.
583    * <p>
584    * This method denies the messages and launches a delivery sequence.
585    * Messages considered as undeliverable are sent to the DMQ.
586    */

587   public void denyRequest(AgentId from, DenyRequest not) {
588     if (logger.isLoggable(BasicLevel.DEBUG))
589       logger.log(BasicLevel.DEBUG,
590                  "QueueImpl.DenyRequest(" + from + ',' + not + ')');
591     
592     Enumeration JavaDoc ids = not.getIds();
593
594     String JavaDoc msgId;
595     Message message;
596     AgentId consId;
597     int consCtx;
598     ClientMessages deadMessages = null;
599
600     // If the deny request is empty, the denying is a contextual one: it
601
// requests the denying of all the messages consumed by the denier in
602
// the denying context:
603
if (! ids.hasMoreElements()) {
604       // Browsing the delivered messages:
605
for (Enumeration JavaDoc delIds = deliveredMsgs.keys();
606            delIds.hasMoreElements();) {
607         msgId = (String JavaDoc) delIds.nextElement();
608
609         message = (Message) deliveredMsgs.get(msgId);
610         consId = (AgentId) consumers.get(msgId);
611         consCtx = ((Integer JavaDoc) contexts.get(msgId)).intValue();
612
613         if (logger.isLoggable(BasicLevel.DEBUG))
614           logger.log(BasicLevel.DEBUG,
615                      " -> deny msg " + msgId + "(consId = " + consId + ')');
616
617         // If the current message has been consumed by the denier in the same
618
// context: denying it.
619
if (consId.equals(from) && consCtx == not.getClientContext()) {
620           // state change, so save.
621
setSave();
622           consumers.remove(msgId);
623           contexts.remove(msgId);
624           deliveredMsgs.remove(msgId);
625           message.msg.redelivered = true;
626
627           // If message considered as undeliverable, adding
628
// it to the vector of dead messages:
629
if (isUndeliverable(message)) {
630             message.delete();
631             message.msg.undeliverable = true;
632
633             if (deadMessages == null)
634               deadMessages = new ClientMessages();
635             deadMessages.addMessage(message.msg);
636           } else {
637             // Else, putting the message back into the deliverables vector:
638
storeMessage(message);
639           }
640
641           if (logger.isLoggable(BasicLevel.DEBUG))
642             logger.log(BasicLevel.DEBUG, "Message " + msgId + " denied.");
643         }
644       }
645     }
646
647     // For a non empty request, browsing the denied messages:
648
for (ids = not.getIds(); ids.hasMoreElements();) {
649       msgId = (String JavaDoc) ids.nextElement();
650       message = (Message) deliveredMsgs.remove(msgId);
651
652       // Message may have already been denied. For example, a proxy may deny
653
// a message twice, first when detecting a connection failure - and
654
// in that case it sends a contextual denying -, then when receiving
655
// the message from the queue - and in that case it also sends an
656
// individual denying.
657
if (message == null) {
658         if (logger.isLoggable(BasicLevel.ERROR))
659           logger.log(BasicLevel.ERROR, " -> already denied message " + msgId);
660         break;
661       }
662
663       message.msg.redelivered = true;
664
665
666       if (logger.isLoggable(BasicLevel.DEBUG))
667           logger.log(BasicLevel.DEBUG, " -> deny " + msgId);
668
669       // state change, so save.
670
setSave();
671       consumers.remove(msgId);
672       contexts.remove(msgId);
673
674       // If message considered as undeliverable, adding it
675
// to the vector of dead messages:
676
if (isUndeliverable(message)) {
677         message.delete();
678
679         message.msg.undeliverable = true;
680
681         if (deadMessages == null)
682           deadMessages = new ClientMessages();
683         deadMessages.addMessage(message.msg);
684       } else {
685         // Else, putting the message back into the deliverables vector:
686
storeMessage(message);
687       }
688
689       if (logger.isLoggable(BasicLevel.DEBUG))
690         logger.log(BasicLevel.DEBUG, "Message " + msgId + " denied.");
691     }
692     // Sending the dead messages to the DMQ, if needed:
693
if (deadMessages != null)
694       sendToDMQ(deadMessages, null);
695
696     // Lauching a delivery sequence:
697
deliverMessages(0);
698   }
699
700   public void abortReceiveRequest(AgentId from,
701                          AbortReceiveRequest not) {
702     for (int i = 0; i < requests.size(); i++) {
703       ReceiveRequest request = (ReceiveRequest) requests.get(i);
704       if (request.requester.equals(from) &&
705           request.getClientContext() == not.getClientContext() &&
706           request.getRequestId() == not.getAbortedRequestId()) {
707         if (not.isPersistent()) {
708           // state change, so save.
709
setSave();
710         }
711         requests.remove(i);
712         break;
713       }
714     }
715   }
716
717   public void destinationAdminRequestNot(AgentId from, DestinationAdminRequestNot not) {
718     org.objectweb.joram.shared.admin.AdminRequest adminRequest =
719       not.getRequest();
720     if (adminRequest instanceof GetQueueMessageIds) {
721       getQueueMessageIds((GetQueueMessageIds)adminRequest,
722               not.getReplyTo(),
723               not.getRequestMsgId(),
724               not.getReplyMsgId());
725     } else if (adminRequest instanceof GetQueueMessage) {
726       getQueueMessage((GetQueueMessage)adminRequest,
727               not.getReplyTo(),
728               not.getRequestMsgId(),
729               not.getReplyMsgId());
730     } else if (adminRequest instanceof DeleteQueueMessage) {
731       deleteQueueMessage((DeleteQueueMessage)adminRequest,
732               not.getReplyTo(),
733               not.getRequestMsgId(),
734               not.getReplyMsgId());
735     } else if (adminRequest instanceof ClearQueue) {
736       clearQueue((ClearQueue)adminRequest,
737               not.getReplyTo(),
738               not.getRequestMsgId(),
739               not.getReplyMsgId());
740     }
741   }
742
743   private void getQueueMessageIds(GetQueueMessageIds request,
744                                   AgentId replyTo,
745                                   String JavaDoc requestMsgId,
746                                   String JavaDoc replyMsgId) {
747     String JavaDoc[] res = new String JavaDoc[messages.size()];
748     for (int i = 0; i < messages.size(); i++) {
749       Message msg = (Message)messages.elementAt(i);
750       res[i] = msg.getIdentifier();
751     }
752     GetQueueMessageIdsRep reply =
753       new GetQueueMessageIdsRep(res);
754     replyToTopic(reply, replyTo, requestMsgId, replyMsgId);
755   }
756
757   private void getQueueMessage(GetQueueMessage request,
758                        AgentId replyTo,
759                        String JavaDoc requestMsgId,
760                        String JavaDoc replyMsgId) {
761     Message message = null;
762     for (int i = 0; i < messages.size(); i++) {
763       message = (Message) messages.elementAt(i);
764       if (message.getIdentifier().equals(request.getMessageId()))
765         break;
766     }
767     if (message != null) {
768       replyToTopic(new GetQueueMessageRep(message.msg),
769                    replyTo, requestMsgId, replyMsgId);
770     } else {
771       
772       replyToTopic(
773         new org.objectweb.joram.shared.admin.AdminReply(
774           false, "Message not found: " + message.getIdentifier()),
775         replyTo, requestMsgId, replyMsgId);
776     }
777   }
778
779   private void deleteQueueMessage(DeleteQueueMessage request,
780                        AgentId replyTo,
781                        String JavaDoc requestMsgId,
782                        String JavaDoc replyMsgId) {
783     for (int i = 0; i < messages.size(); i++) {
784       Message message = (Message) messages.elementAt(i);
785       if (message.getIdentifier().equals(request.getMessageId())) {
786         messages.removeElementAt(i);
787         message.delete();
788         ClientMessages deadMessages = new ClientMessages();
789         deadMessages.addMessage(message.msg);
790         sendToDMQ(deadMessages, null);
791         break;
792       }
793     }
794     replyToTopic(new org.objectweb.joram.shared.admin.AdminReply(true, null),
795                  replyTo, requestMsgId, replyMsgId);
796   }
797
798   private void clearQueue(ClearQueue request,
799                        AgentId replyTo,
800                        String JavaDoc requestMsgId,
801                        String JavaDoc replyMsgId) {
802     if (messages.size() > 0) {
803       ClientMessages deadMessages = new ClientMessages();
804       for (int i = 0; i < messages.size(); i++) {
805         Message message = (Message) messages.elementAt(i);
806         message.delete();
807         deadMessages.addMessage(message.msg);
808       }
809       sendToDMQ(deadMessages, null);
810       messages.clear();
811     }
812     replyToTopic(new org.objectweb.joram.shared.admin.AdminReply(true, null),
813                  replyTo, requestMsgId, replyMsgId);
814   }
815   
816   /**
817    * Method specifically processing a <code>SetRightRequest</code> instance.
818    * <p>
819    * When a reader is removed, and receive requests of this reader are still
820    * on the queue, they are replied to by an <code>ExceptionReply</code>.
821    */

822   protected void doRightRequest(SetRightRequest not) {
823     // If the request does not unset a reader, doing nothing.
824
if (not.getRight() != -READ)
825       return;
826
827     SetRightRequest rightRequest = preProcess(not);
828     if (rightRequest != null) {
829       AgentId user = rightRequest.getClient();
830
831       ReceiveRequest request;
832       AccessException exc;
833       ExceptionReply reply;
834
835       // Free reading right has been removed; replying to the non readers
836
// requests.
837
if (user == null) {
838         for (int i = 0; i < requests.size(); i++) {
839           request = (ReceiveRequest) requests.get(i);
840           if (! isReader(request.requester)) {
841             exc = new AccessException("Free READ access removed");
842             reply = new ExceptionReply(request, exc);
843             forward(request.requester, reply);
844             // state change, so save.
845
setSave();
846             requests.remove(i);
847             i--;
848           }
849         }
850       } else {
851         // Reading right of a given user has been removed; replying to its
852
// requests.
853
for (int i = 0; i < requests.size(); i++) {
854           request = (ReceiveRequest) requests.get(i);
855           if (user.equals(request.requester)) {
856             exc = new AccessException("READ right removed");
857             reply = new ExceptionReply(request, exc);
858             forward(request.requester, reply);
859             // state change, so save.
860
setSave();
861             requests.remove(i);
862             i--;
863           }
864         }
865       }
866       postProcess(rightRequest);
867     }
868   }
869   
870   /**
871    * Method specifically processing a <code>ClientMessages</code> instance.
872    * <p>
873    * This method stores the messages and launches a delivery sequence.
874    */

875   protected void doClientMessages(AgentId from, ClientMessages not) {
876     receiving = true;
877     ClientMessages clientMsgs = preProcess(from, not);
878     
879     if (clientMsgs != null) {
880       Message msg;
881       // Storing each received message:
882
for (Enumeration JavaDoc msgs = clientMsgs.getMessages().elements();
883       msgs.hasMoreElements();) {
884
885         msg = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement());
886         if (clientMsgs.isPersistent()) {
887           // state change, so save.
888
setSave();
889         }
890         msg.order = arrivalsCounter++;
891         storeMessage(msg);
892       }
893     }
894     
895     // Lauching a delivery sequence:
896
deliverMessages(0);
897
898     if (clientMsgs != null)
899       postProcess(clientMsgs);
900     
901     receiving = false;
902   }
903
904   /**
905    * Method specifically processing an <code>UnknownAgent</code> instance.
906    * <p>
907    * The specific processing is done when a <code>QueueMsgReply</code> was
908    * sent to a requester which does not exist anymore. In that case, the
909    * messages sent to this requester and not yet acknowledged are marked as
910    * "denied" for delivery to an other requester, and a new delivery sequence
911    * is launched. Messages considered as undeliverable are removed and sent to
912    * the DMQ.
913    */

914   protected void doUnknownAgent(UnknownAgent uA) {
915     AgentId client = uA.agent;
916     Notification not = uA.not;
917
918     // If the notification is not a delivery, doing nothing.
919
if (! (not instanceof QueueMsgReply))
920       return;
921
922     String JavaDoc msgId;
923     Message message;
924     AgentId consId;
925     ClientMessages deadMessages = null;
926     for (Enumeration JavaDoc e = deliveredMsgs.keys(); e.hasMoreElements();) {
927       msgId = (String JavaDoc) e.nextElement();
928       message = (Message) deliveredMsgs.get(msgId);
929       consId = (AgentId) consumers.get(msgId);
930       // Delivered message has been delivered to the deleted client:
931
// denying it.
932
if (consId.equals(client)) {
933         deliveredMsgs.remove(msgId);
934         message.msg.redelivered = true;
935
936         // state change, so save.
937
setSave();
938         consumers.remove(msgId);
939         contexts.remove(msgId);
940
941         // If message considered as undeliverable, adding it to the
942
// vector of dead messages:
943
if (isUndeliverable(message)) {
944           message.delete();
945           message.msg.undeliverable = true;
946           if (deadMessages == null)
947             deadMessages = new ClientMessages();
948           deadMessages.addMessage(message.msg);
949         } else {
950           // Else, putting it back into the deliverables vector:
951
storeMessage(message);
952         }
953
954         if (logger.isLoggable(BasicLevel.WARN))
955           logger.log(BasicLevel.WARN,
956                      "Message " + message.getIdentifier() + " denied.");
957       }
958     }
959     // Sending dead messages to the DMQ, if needed:
960
if (deadMessages != null)
961       sendToDMQ(deadMessages, null);
962
963     // Launching a delivery sequence:
964
deliverMessages(0);
965   }
966
967   /**
968    * Method specifically processing a
969    * <code>fr.dyade.aaa.agent.DeleteNot</code> instance.
970    * <p>
971    * <code>ExceptionReply</code> replies are sent to the pending receivers,
972    * and the remaining messages are sent to the DMQ and deleted.
973    */

974   protected void doDeleteNot(DeleteNot not) {
975     // Building the exception to send to the pending receivers:
976
DestinationException exc = new DestinationException("Queue " + destId
977                                                         + " is deleted.");
978     ReceiveRequest rec;
979     ExceptionReply excRep;
980     // Sending it to the pending receivers:
981
cleanWaitingRequest(System.currentTimeMillis());
982     for (int i = 0; i < requests.size(); i++) {
983       rec = (ReceiveRequest) requests.elementAt(i);
984
985       excRep = new ExceptionReply(rec, exc);
986       if (logger.isLoggable(BasicLevel.DEBUG))
987         logger.log(BasicLevel.DEBUG,
988                    "Requester " + rec.requester +
989                    " notified of the queue deletion.");
990       forward(rec.requester, excRep);
991     }
992     // Sending the remaining messages to the DMQ, if needed:
993
if (! messages.isEmpty()) {
994       Message message;
995       ClientMessages deadMessages = new ClientMessages();
996       while (! messages.isEmpty()) {
997         message = (Message) messages.remove(0);
998         message.msg.deletedDest = true;
999         deadMessages.addMessage(message.msg);
1000      }
1001      sendToDMQ(deadMessages, null);
1002    }
1003
1004    // Deleting the messages:
1005
Message.deleteAll(getMsgTxname());
1006  }
1007
1008  transient String JavaDoc msgTxname = null;
1009
1010  protected final String JavaDoc getMsgTxname() {
1011    if (msgTxname == null)
1012      msgTxname = 'M' + getDestinationId() + '_';
1013    return msgTxname;
1014  }
1015
1016  protected final void setMsgTxName(Message msg) {
1017    if (msg.getTxName() == null)
1018      msg.setTxName(getMsgTxname() + msg.order);
1019  }
1020
1021  /**
1022   * Actually stores a message in the deliverables vector.
1023   *
1024   * @param message The message to store.
1025   */

1026  protected final synchronized void storeMessage(Message message) {
1027    addMessage(message);
1028
1029    // Persisting the message.
1030
setMsgTxName(message);
1031    message.save();
1032
1033    if (logger.isLoggable(BasicLevel.DEBUG))
1034      logger.log(BasicLevel.DEBUG,
1035                 "Message " + message.getIdentifier() + " stored.");
1036    
1037  }
1038
1039  protected final synchronized void addMessage(Message message) {
1040    nbMsgsReceiveSinceCreation++;
1041
1042    if (nbMaxMsg > -1 && nbMaxMsg <= messages.size()) {
1043      ClientMessages deadMessages = new ClientMessages();
1044      deadMessages.addMessage(message.msg);
1045      sendToDMQ(deadMessages, null);
1046      return;
1047    }
1048
1049    if (messages.isEmpty()) {
1050      samePriorities = true;
1051      priority = message.getPriority();
1052    } else if (samePriorities && priority != message.getPriority()) {
1053      samePriorities = false;
1054    }
1055
1056    if (samePriorities) {
1057      // Constant priorities: no need to insert the message according to
1058
// its priority.
1059
if (receiving) {
1060        // Message being received: adding it at the end of the queue.
1061
messages.add(message);
1062      } else {
1063        // Denying or recovery: adding the message according to its original
1064
// arrival order.
1065
long currentO;
1066        int i = 0;
1067        for (Enumeration JavaDoc e = messages.elements(); e.hasMoreElements();) {
1068          currentO = ((Message) e.nextElement()).order;
1069          if (currentO > message.order) break;
1070          i++;
1071        }
1072        messages.insertElementAt(message, i);
1073      }
1074    } else {
1075      // Non constant priorities: inserting the message according to its
1076
// priority.
1077
Message currentMsg;
1078      int currentP;
1079      long currentO;
1080      int i = 0;
1081      for (Enumeration JavaDoc e = messages.elements(); e.hasMoreElements();) {
1082        currentMsg = (Message) e.nextElement();
1083        currentP = currentMsg.getPriority();
1084        currentO = currentMsg.order;
1085
1086        if (! receiving && currentP == message.getPriority()) {
1087          // Message denied or recovered, priorities are equal: inserting the
1088
// message according to its original arrival order.
1089
if (currentO > message.order) break;
1090        } else if (currentP < message.getPriority()) {
1091          // Current priority lower than the message to store: inserting it.
1092
break;
1093        }
1094        i++;
1095      }
1096      messages.insertElementAt(message, i);
1097    }
1098  }
1099  
1100  /**
1101   * get a client message contain nb messages.
1102   *
1103   * @param nb number of messages returned in ClientMessage.
1104   * @param selector jms selector
1105   * @param remove delete all messages returned if true
1106   * @return ClientMessages (contains nb Messages)
1107   */

1108  protected ClientMessages getClientMessages(int nb, String JavaDoc selector, boolean remove) {
1109    if (logger.isLoggable(BasicLevel.DEBUG))
1110      logger.log(BasicLevel.DEBUG, "QueueImpl.getClientMessages(" + nb + ',' + selector + ',' + remove + ')');
1111
1112    ClientMessages cm = null ;
1113    List JavaDoc lsMessages = getMessages(nb, selector, remove);
1114    if (lsMessages.size() > 0) {
1115     cm = new ClientMessages();
1116    }
1117    Message message = null;
1118    Iterator JavaDoc itMessages = lsMessages.iterator();
1119    while (itMessages.hasNext()) {
1120      message = (Message) itMessages.next();
1121      cm.addMessage(message.msg);
1122    }
1123    return cm;
1124  }
1125  
1126  protected ClientMessages getClientMessages(List JavaDoc lsMsgId, boolean remove) {
1127    if (logger.isLoggable(BasicLevel.DEBUG))
1128      logger.log(BasicLevel.DEBUG, "QueueImpl.getClientMessages(" + lsMsgId + ',' + remove + ')');
1129
1130    ClientMessages cm = new ClientMessages();
1131    Message message = null;
1132    String JavaDoc msgId = null;
1133    Iterator JavaDoc itMsgId = lsMsgId.iterator();
1134    // Checking the deliverable messages:
1135
while (itMsgId.hasNext()) {
1136      msgId = (String JavaDoc) itMsgId.next();
1137      message = getMessage(msgId, remove);
1138
1139      if (checkDelivery(message)) {
1140        message.msg.deliveryCount++;
1141        nbMsgsDeliverSinceCreation++;
1142        
1143        // use in sub class see ClusterQueueImpl
1144
messageDelivered(message.getIdentifier());
1145
1146        if (logger.isLoggable(BasicLevel.DEBUG))
1147          logger.log(BasicLevel.DEBUG, "Message " + msgId);
1148
1149        cm.addMessage(message.msg);
1150      }
1151    }
1152    return cm;
1153  }
1154
1155  /**
1156   * List of message to be removed.
1157   *
1158   * @param msgIds List of message id.
1159   */

1160  protected void removeMessages(List JavaDoc msgIds) {
1161    String JavaDoc id = null;
1162    Iterator JavaDoc itMessages = msgIds.iterator();
1163    while (itMessages.hasNext()) {
1164      id = (String JavaDoc) itMessages.next();
1165      int i = 0;
1166      Message message = null;
1167      while (i < messages.size()) {
1168        message = (Message) messages.get(i);
1169        if (id.equals(message.getIdentifier())) {
1170          messages.remove(i);
1171          message.delete();
1172          break;
1173        }
1174      }
1175    }
1176  }
1177  
1178  /**
1179   * get nb messages, if it's possible.
1180   *
1181   * @param nb -1 return all messages.
1182   * @param selector jms selector.
1183   * @return List of mom messages.
1184   */

1185  private List JavaDoc getMessages(int nb, String JavaDoc selector, boolean remove) {
1186    if (logger.isLoggable(BasicLevel.DEBUG))
1187      logger.log(BasicLevel.DEBUG, "QueueImpl.getMessage(" + nb + ',' + selector + ',' + remove +')');
1188
1189    List JavaDoc lsMessages = new ArrayList JavaDoc();
1190    Message message;
1191    int j = 0;
1192    // Checking the deliverable messages:
1193
while ((lsMessages.size() < nb || nb == -1) && j < messages.size()) {
1194      message = (Message) messages.get(j);
1195
1196      // If selector matches, sending the message:
1197
if (Selector.matches(message.msg, selector) &&
1198          checkDelivery(message)) {
1199        message.msg.deliveryCount++;
1200        nbMsgsDeliverSinceCreation++;
1201        
1202        // use in sub class see ClusterQueueImpl
1203
messageDelivered(message.getIdentifier());
1204
1205        if (logger.isLoggable(BasicLevel.DEBUG))
1206          logger.log(BasicLevel.DEBUG, "Message " + message.msg.id);
1207
1208        lsMessages.add(message);
1209        
1210        if (remove) {
1211          messages.remove(message);
1212          message.delete();
1213        }
1214        
1215      } else {
1216        // If message delivered or selector does not match: going on
1217
j++;
1218      }
1219    }
1220    return lsMessages;
1221  }
1222  
1223  private Message getMomMessage(String JavaDoc msgId) {
1224    Message msg = null;
1225    for (Enumeration JavaDoc e = messages.elements(); e.hasMoreElements(); ) {
1226      msg = (Message) e.nextElement();
1227      if (msgId.equals(msg.getIdentifier()))
1228        return msg;
1229    }
1230    return msg;
1231  }
1232  
1233  /**
1234   * get mom message, delete if remove = true.
1235   *
1236   * @param msgId message identification
1237   * @param remove if true delete message
1238   * @return mom message
1239   */

1240  protected Message getMessage(String JavaDoc msgId, boolean remove) {
1241    if (logger.isLoggable(BasicLevel.DEBUG))
1242      logger.log(BasicLevel.DEBUG, "QueueImpl.getMessage(" + msgId + ',' + remove + ')');
1243
1244    Message message = getMomMessage(msgId);
1245      if (checkDelivery(message)) {
1246        message.msg.deliveryCount++;
1247        nbMsgsDeliverSinceCreation++;
1248        
1249        // use in sub class see ClusterQueueImpl
1250
messageDelivered(message.getIdentifier());
1251
1252        if (logger.isLoggable(BasicLevel.DEBUG))
1253          logger.log(BasicLevel.DEBUG, "Message " + msgId);
1254        
1255        if (remove) {
1256          messages.remove(message);
1257          message.delete();
1258        }
1259      }
1260    return message;
1261  }
1262  
1263  /**
1264   * Actually tries to answer the pending "receive" requests.
1265   * <p>
1266   * The method may send <code>QueueMsgReply</code> replies to clients.
1267   *
1268   * @param index Index where starting to "browse" the requests.
1269   */

1270  protected void deliverMessages(int index) {
1271    if (logger.isLoggable(BasicLevel.DEBUG))
1272      logger.log(BasicLevel.DEBUG, "QueueImpl.deliverMessages(" + index + ')');
1273
1274    ReceiveRequest notRec = null;
1275    Message message;
1276    QueueMsgReply notMsg;
1277    ClientMessages deadMessages = null;
1278    List JavaDoc lsMessages = null;
1279
1280    if (logger.isLoggable(BasicLevel.DEBUG))
1281      logger.log(BasicLevel.DEBUG, " -> requests = " + requests + ')');
1282
1283    long current = System.currentTimeMillis();
1284    cleanWaitingRequest(current);
1285    // Cleaning the possible expired messages.
1286
deadMessages = cleanPendingMessage(current);
1287   
1288    // Processing each request as long as there are deliverable messages:
1289
while (! messages.isEmpty() && index < requests.size()) {
1290      notRec = (ReceiveRequest) requests.get(index);
1291      notMsg = new QueueMsgReply(notRec);
1292
1293      lsMessages = getMessages(notRec.getMessageCount(), notRec.getSelector(), notRec.getAutoAck());
1294
1295      if (!notRec.getAutoAck()) {
1296        Iterator JavaDoc itMessages = lsMessages.iterator();
1297        while (itMessages.hasNext()) {
1298          message = (Message) itMessages.next();
1299          notMsg.addMessage(message.msg);
1300          // putting the message in the delivered messages table:
1301
consumers.put(message.getIdentifier(), notRec.requester);
1302          contexts.put(message.getIdentifier(),
1303              new Integer JavaDoc(notRec.getClientContext()));
1304          deliveredMsgs.put(message.getIdentifier(), message);
1305          messages.remove(message);
1306          
1307          if (logger.isLoggable(BasicLevel.DEBUG))
1308            logger.log(BasicLevel.DEBUG,
1309                "Message " + message.msg.id + " to " + notRec.requester +
1310                " as reply to " + notRec.getRequestId());
1311        }
1312      }
1313
1314      if (isLocal(notRec.requester)) {
1315        notMsg.setPersistent(false);
1316      }
1317
1318      if (notMsg.isPersistent() && !notRec.getAutoAck()) {
1319        // state change, so save.
1320
setSave();
1321      }
1322
1323      // Next request:
1324
if (notMsg.getSize() > 0) {
1325        requests.remove(index);
1326        forward(notRec.requester, notMsg);
1327      } else {
1328        index++;
1329      }
1330    }
1331    // If needed, sending the dead messages to the DMQ:
1332
if (deadMessages != null)
1333      sendToDMQ(deadMessages, null);
1334  }
1335  
1336// /**
1337
// * Actually tries to answer the pending "receive" requests.
1338
// * <p>
1339
// * The method may send <code>QueueMsgReply</code> replies to clients.
1340
// *
1341
// * @param index Index where starting to "browse" the requests.
1342
// */
1343
// protected void deliverMessages(int index) {
1344
// if (logger.isLoggable(BasicLevel.DEBUG))
1345
// logger.log(BasicLevel.DEBUG, "QueueImpl.deliverMessages(" + index + ')');
1346
//
1347
// ReceiveRequest notRec = null;
1348
// boolean replied;
1349
// int j = 0;
1350
// Message message;
1351
// QueueMsgReply notMsg;
1352
// ClientMessages deadMessages = null;
1353
//
1354
// if (logger.isLoggable(BasicLevel.DEBUG))
1355
// logger.log(BasicLevel.DEBUG, " -> requests = " + requests + ')');
1356
//
1357
// long current = System.currentTimeMillis();
1358
// cleanWaitingRequest(current);
1359
// // Cleaning the possible expired messages.
1360
// deadMessages = cleanPendingMessage(current);
1361
//
1362
// // Processing each request as long as there are deliverable messages:
1363
// while (! messages.isEmpty() && index < requests.size()) {
1364
// notRec = (ReceiveRequest) requests.get(index);
1365
// replied = false;
1366
// notMsg = new QueueMsgReply(notRec);
1367
//
1368
// // Checking the deliverable messages:
1369
// while (j < messages.size()) {
1370
// message = (Message) messages.get(j);
1371
//
1372
// // If selector matches, sending the message:
1373
// if (Selector.matches(message.msg, notRec.getSelector()) &&
1374
// checkDelivery(message)) {
1375
// messages.remove(j);
1376
// message.msg.deliveryCount++;
1377
// notMsg.addMessage(message.msg);
1378
//
1379
// if (isLocal(notRec.requester))
1380
// notMsg.setPersistent(false);
1381
//
1382
// nbMsgsDeliverSinceCreation++;
1383
//
1384
// // use in sub class see ClusterQueueImpl
1385
// messageDelivered(message.getIdentifier());
1386
//
1387
// if (logger.isLoggable(BasicLevel.DEBUG))
1388
// logger.log(BasicLevel.DEBUG,
1389
// "Message " + message.msg.id + " to " + notRec.requester +
1390
// " as reply to " + notRec.getRequestId());
1391
//
1392
// if (notRec.getAutoAck()) {
1393
// // Removing the message if request in auto ack mode:
1394
// message.delete();
1395
// } else {
1396
// // Else, putting the message in the delivered messages table:
1397
// if (notMsg.isPersistent()) {
1398
// // state change, so save.
1399
// setSave();
1400
// }
1401
// consumers.put(message.getIdentifier(), notRec.requester);
1402
// contexts.put(message.getIdentifier(),
1403
// new Integer(notRec.getClientContext()));
1404
// deliveredMsgs.put(message.getIdentifier(), message);
1405
// }
1406
//
1407
// if (notMsg.getSize() == notRec.getMessageCount()) {
1408
// break;
1409
// }
1410
// } else {
1411
// // If message delivered or selector does not match: going on
1412
// j++;
1413
// }
1414
// }
1415
//
1416
// // Next request:
1417
// if (notMsg.getSize() > 0) {
1418
// requests.remove(index);
1419
// forward(notRec.requester, notMsg);
1420
// } else {
1421
// index++;
1422
// }
1423
//
1424
// j = 0;
1425
// }
1426
// // If needed, sending the dead messages to the DMQ:
1427
// if (deadMessages != null)
1428
// sendToDMQ(deadMessages, null);
1429
// }
1430

1431  protected boolean checkDelivery(Message msg) {
1432    return true;
1433  }
1434
1435  /**
1436   * call in deliverMessages just after forward(msg),
1437   * overload this methode to process a specific treatment.
1438   */

1439  protected void messageDelivered(String JavaDoc msgId) {}
1440
1441  /**
1442   * call in deliverMessages just after a remove message (invalid),
1443   * overload this methode to process a specific treatment.
1444   */

1445  protected void messageRemoved(String JavaDoc msgId) {}
1446  
1447  /**
1448   * Returns <code>true</code> if a given message is considered as
1449   * undeliverable, because its delivery count matches the queue's
1450   * threshold, if any, or the server's default threshold value (if any).
1451   */

1452  protected boolean isUndeliverable(Message message) {
1453    if (threshold != null)
1454      return message.msg.deliveryCount == threshold.intValue();
1455    else if (DeadMQueueImpl.threshold != null)
1456      return message.msg.deliveryCount == DeadMQueueImpl.threshold.intValue();
1457    return false;
1458  }
1459
1460  /** Deserializes a <code>QueueImpl</code> instance. */
1461  private void readObject(java.io.ObjectInputStream JavaDoc in)
1462               throws IOException JavaDoc, ClassNotFoundException JavaDoc {
1463    if (logger.isLoggable(BasicLevel.DEBUG))
1464      logger.log(BasicLevel.DEBUG, "QueueImpl.readObject()");
1465    in.defaultReadObject();
1466
1467    cleanWaitingRequest(System.currentTimeMillis());
1468
1469    receiving = false;
1470    messages = new Vector JavaDoc();
1471    deliveredMsgs = new Hashtable JavaDoc();
1472
1473    // Retrieving the persisted messages, if any.
1474
Vector JavaDoc persistedMsgs = null;
1475    persistedMsgs = Message.loadAll(getMsgTxname());
1476
1477    if (persistedMsgs != null) {
1478      Message persistedMsg;
1479      AgentId consId;
1480      while (! persistedMsgs.isEmpty()) {
1481        persistedMsg = (Message) persistedMsgs.remove(0);
1482        consId = (AgentId) consumers.get(persistedMsg.getIdentifier());
1483        if (consId == null) {
1484          addMessage(persistedMsg);
1485        } else if (isLocal(consId)) {
1486          if (logger.isLoggable(BasicLevel.DEBUG))
1487            logger.log(BasicLevel.DEBUG,
1488                       " -> deny " + persistedMsg.getIdentifier());
1489          consumers.remove(persistedMsg.getIdentifier());
1490          contexts.remove(persistedMsg.getIdentifier());
1491          addMessage(persistedMsg);
1492        } else {
1493          deliveredMsgs.put(persistedMsg.getIdentifier(), persistedMsg);
1494        }
1495      }
1496    }
1497  }
1498
1499  public void readBag(ObjectInputStream JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
1500    receiving = in.readBoolean();
1501    messages = (Vector JavaDoc) in.readObject();
1502    deliveredMsgs = (Hashtable JavaDoc) in.readObject();
1503
1504    for (int i = 0; i < messages.size(); i++) {
1505      Message message = (Message)messages.elementAt(i);
1506      // Persisting the message.
1507
setMsgTxName(message);
1508      message.save();
1509    }
1510  }
1511
1512  public void writeBag(ObjectOutputStream JavaDoc out) throws IOException JavaDoc {
1513    out.writeBoolean(receiving);
1514    out.writeObject(messages);
1515    out.writeObject(deliveredMsgs);
1516  }
1517}
1518
Popular Tags