KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > proxies > ClientSubscription


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2003 - 2006 ScalAgent Distributed Technologies
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA.
19  *
20  * Initial developer(s): Frederic Maistre (INRIA)
21  * Contributor(s): ScalAgent Distributed Technologies
22  */

23 package org.objectweb.joram.mom.proxies;
24
25 import java.util.Enumeration JavaDoc;
26 import java.util.Hashtable JavaDoc;
27 import java.util.Vector JavaDoc;
28 import java.io.ObjectInputStream JavaDoc;
29 import java.io.ObjectOutputStream JavaDoc;
30 import java.io.IOException JavaDoc;
31
32 import org.objectweb.joram.mom.dest.DeadMQueueImpl;
33 import org.objectweb.joram.mom.notifications.ClientMessages;
34 import org.objectweb.joram.mom.messages.Message;
35 import org.objectweb.joram.shared.client.ConsumerMessages;
36 import org.objectweb.joram.shared.selectors.Selector;
37
38 import fr.dyade.aaa.agent.AgentId;
39 import fr.dyade.aaa.agent.Channel;
40
41 import org.objectweb.joram.shared.JoramTracing;
42 import org.objectweb.util.monolog.api.BasicLevel;
43
44 /**
45  * The <code>ClientSubscription</code> class holds the data of a client
46  * subscription, and the methods managing the delivery and acknowledgement
47  * of the messages.
48  */

49 class ClientSubscription implements java.io.Serializable JavaDoc {
50   /** The proxy's agent identifier. */
51   private AgentId proxyId;
52   /** <code>true</code> if the subscription is durable. */
53   private boolean durable;
54   /** The topic identifier. */
55   private AgentId topicId;
56   /** The subscription name. */
57   private String JavaDoc name;
58   /** The selector for filtering messages. */
59   private String JavaDoc selector;
60   /**
61    * Identifier of the subscriber's dead message queue, <code>null</code> for
62    * DMQ not set.
63    */

64   private AgentId dmqId;
65   /**
66    * Threshold value, 0 or negative for no threshold, <code>null</code> for
67    * value not set.
68    */

69   private Integer JavaDoc threshold;
70
71   /** nb Max of Message store in queue (-1 no limit). */
72   protected int nbMaxMsg = -1;
73
74   /** Vector of identifiers of the messages to deliver. */
75   private Vector JavaDoc messageIds;
76   /** Table of delivered messages identifiers. */
77   private Hashtable JavaDoc deliveredIds;
78   /** Table keeping the denied messages identifiers. */
79   private Hashtable JavaDoc deniedMsgs;
80
81   /** Identifier of the subscription context. */
82   private transient int contextId;
83   /** Identifier of the subscription request. */
84   private transient int subRequestId;
85   /**
86    * <code>true</code> if the subscriber does not wish to consume
87    * messages published in the same context.
88    */

89   private transient boolean noLocal;
90   /**
91    * <code>true</code> if the subscription does not filter messages
92    * in any way.
93    */

94   private transient boolean noFiltering;
95
96   /** <code>true</code> if the subscription is active. */
97   private transient boolean active;
98   /**
99    * Identifier of the request requesting messages, either the listener's
100    * request, or a "receive" request.
101    */

102   private transient int requestId;
103   /** <code>true</code> if the messages are destinated to a listener. */
104   private transient boolean toListener;
105   /** Expiration time of the "receive" request, if any. */
106   private transient long requestExpTime;
107
108   /**
109    * Proxy messages table.
110    * AF: Currently this table is shared between all subscription.
111    */

112   private transient Hashtable JavaDoc messagesTable;
113
114   /** string proxy agent id */
115   private transient String JavaDoc proxyStringId;
116   
117   private transient ProxyAgentItf proxy;
118
119   /**
120    * Constructs a <code>ClientSubscription</code> instance.
121    *
122    * @param proxyId Proxy's identifier.
123    * @param contextId Context identifier.
124    * @param reqId Request identifier.
125    * @param durable <code>true</code> for a durable subscription.
126    * @param topicId Topic identifier.
127    * @param name Subscription's name.
128    * @param selector Selector for filtering messages.
129    * @param noLocal <code>true</code> for not consuming messages published
130    * within the same proxy's context.
131    * @param dmqId Identifier of the proxy's dead message queue, if any.
132    * @param threshold Proxy's threshold value, if any.
133    * @param messagesTable Proxy's messages table.
134    */

135   ClientSubscription(AgentId proxyId,
136                      int contextId,
137                      int reqId,
138                      boolean durable,
139                      AgentId topicId,
140                      String JavaDoc name,
141                      String JavaDoc selector,
142                      boolean noLocal,
143                      AgentId dmqId,
144                      Integer JavaDoc threshold,
145                      Hashtable JavaDoc messagesTable)
146   {
147     this.proxyId = proxyId;
148     this.contextId = contextId;
149     this.subRequestId = reqId;
150     this.durable = durable;
151     this.topicId = topicId;
152     this.name = name;
153     this.selector = selector;
154     this.noLocal = noLocal;
155     this.dmqId = dmqId;
156     this.threshold = threshold;
157     this.messagesTable = messagesTable;
158
159     messageIds = new Vector JavaDoc();
160     deliveredIds = new Hashtable JavaDoc();
161     deniedMsgs = new Hashtable JavaDoc();
162
163     noFiltering = (! noLocal) && (selector == null || selector.equals(""));
164
165     active = true;
166     requestId = -1;
167     toListener = false;
168
169     proxyStringId = proxyId.toString();
170
171     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
172       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
173                               this + ": created.");
174   }
175
176 // public String dump() {
177
// StringBuffer buff = new StringBuffer();
178
// buff.append("ClientSubscription (proxyId=");
179
// buff.append(proxyId);
180
// buff.append(",topicId=");
181
// buff.append(topicId);
182
// buff.append(",messageIds=");
183
// buff.append(messageIds);
184
// buff.append(",contextId=");
185
// buff.append(contextId);
186
// buff.append(",subRequestId=");
187
// buff.append(subRequestId);
188
// buff.append(",noLocal=");
189
// buff.append(noLocal);
190
// buff.append(",active=");
191
// buff.append(active);
192
// buff.append(",requestId=");
193
// buff.append(requestId);
194
// buff.append(",toListener=");
195
// buff.append(toListener);
196
// buff.append(",messagesTable=");
197
// buff.append(messagesTable);
198
// buff.append(")");
199
// return buff.toString();
200
// }
201

202   public String JavaDoc toString()
203   {
204     return "ClientSubscription" + proxyId + name;
205   }
206
207
208   /** Returns the subscription's context identifier. */
209   int getContextId()
210   {
211     return contextId;
212   }
213
214   /** Returns the identifier of the subscribing request. */
215   int getSubRequestId()
216   {
217     return subRequestId;
218   }
219
220   /** Returns the name of the subscription. */
221   String JavaDoc getName()
222   {
223     return name;
224   }
225
226   /** Returns the identifier of the subscription topic. */
227   AgentId getTopicId()
228   {
229     return topicId;
230   }
231
232   /** Returns the selector. */
233   String JavaDoc getSelector()
234   {
235     return selector;
236   }
237
238   /** Returns <code>true</code> if the subscription is durable. */
239   boolean getDurable()
240   {
241     return durable;
242   }
243
244   /** Returns <code>true</code> if the subscription is active. */
245   boolean getActive()
246   {
247     return active;
248   }
249
250   /**
251    * Returns the maximum number of message for the subscription.
252    * If the limit is unset the method returns -1.
253    *
254    * @return the maximum number of message for subscription if set;
255    * -1 otherwise.
256    */

257   public int getNbMaxMsg() {
258     return nbMaxMsg;
259   }
260
261   /**
262    * Sets the maximum number of message for the subscription.
263    *
264    * @param nbMaxMsg the maximum number of message for subscription (-1 set
265    * no limit).
266    */

267   public void setNbMaxMsg(int nbMaxMsg) {
268     this.nbMaxMsg = nbMaxMsg;
269   }
270
271   /**
272    * Returns the number of pending messages for the subscription.
273    *
274    * @return The number of pending message for the subscription.
275    */

276   int getMessageCount() {
277     return messageIds.size();
278   }
279
280   /**
281    * Returns the list of message's identifiers for the subscription.
282    *
283    * @return the list of message's identifiers for the subscription.
284    */

285   String JavaDoc[] getMessageIds() {
286     String JavaDoc[] res = new String JavaDoc[messageIds.size()];
287     messageIds.copyInto(res);
288     return res;
289   }
290   
291   void setProxyAgent(ProxyAgentItf px) {
292     proxy = px;
293   }
294   
295   /**
296    * Re-initializes the client subscription.
297    *
298    * @param proxyStringId string proxy id.
299    * @param messagesTable Proxy's table where storing the messages.
300    * @param persistedMessages Proxy's persisted messages.
301    * @param denyDeliveredMessages Denies already delivered messages.
302    */

303   void reinitialize(String JavaDoc proxyStringId,
304                     Hashtable JavaDoc messagesTable,
305                     Vector JavaDoc persistedMessages,
306                     boolean denyDeliveredMessages)
307   {
308     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
309       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
310                               "ClientSubscription[" + this +
311                               "].reinitialize()");
312     
313     this.proxyStringId = proxyStringId;
314     this.messagesTable = messagesTable;
315
316     // Browsing the persisted messages.
317
Message message;
318     String JavaDoc msgId;
319     for (Enumeration JavaDoc e = persistedMessages.elements(); e.hasMoreElements();) {
320       message = (Message) e.nextElement();
321       msgId = message.getIdentifier();
322
323       if (messageIds.contains(msgId) || deliveredIds.contains(msgId)) {
324         if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
325           JoramTracing.dbgProxy.log(
326             BasicLevel.DEBUG,
327             " -> contains message " + msgId);
328         message.acksCounter++;
329         message.durableAcksCounter++;
330         
331         if (message.acksCounter == 1) {
332           if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
333             JoramTracing.dbgProxy.log(
334               BasicLevel.DEBUG,
335               " -> messagesTable.put(" + msgId + ')');
336           messagesTable.put(msgId, message);
337         }
338 // if (message.durableAcksCounter == 1) {
339
// if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
340
// JoramTracing.dbgProxy.log(
341
// BasicLevel.DEBUG,
342
// " -> save message " + message);
343
// it's alredy save.
344
// message.save(proxyStringId);
345
// }
346
}
347     }
348
349     if (denyDeliveredMessages) {
350       // Denying all previously delivered messages:
351
deny(deliveredIds.keys());
352       deliveredIds.clear();
353     }
354   }
355
356   /**
357    * Reactivates the subscription.
358    *
359    * @param context Re-activation context.
360    * @param reqId Re-activation request identifier.
361    * @param topicId Topic identifier.
362    * @param selector Selector for filtering messages.
363    * @param noLocal <code>true</code> for not consuming messages published
364    * within the same proxy's context.
365    */

366   void reactivate(int contextId,
367                   int reqId,
368                   AgentId topicId,
369                   String JavaDoc selector,
370                   boolean noLocal)
371   {
372     this.contextId = contextId;
373     this.subRequestId = reqId;
374     this.topicId = topicId;
375     this.selector = selector;
376     this.noLocal = noLocal;
377
378     noFiltering = (! noLocal) && (selector == null || selector.equals(""));
379
380     active = true;
381     requestId = -1;
382     toListener = false;
383     
384     // Some updated attributes are persistent
385
save();
386
387     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
388       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
389                               this + ": reactivated.");
390   }
391
392   /** De-activates the subscription, denies the non acknowledgded messages. */
393   void deactivate() {
394     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
395       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
396                               "ClientSubscription.deactivate()");
397
398     unsetListener();
399     unsetReceiver();
400     active = false;
401    
402     // Denying all delivered messages:
403
deny(deliveredIds.keys());
404     deliveredIds.clear();
405     
406     // deliveredIds is persistent
407
save();
408
409     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
410       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
411                               this + ": deactivated.");
412   }
413
414   void setActive(boolean active) {
415     this.active = active;
416   }
417
418   /**
419    * Sets a listener.
420    *
421    * @param requestId Identifier of the listener request.
422    */

423   void setListener(int requestId)
424   {
425     this.requestId = requestId;
426     toListener = true;
427
428     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
429       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
430                               this + ": listener set.");
431   }
432
433   /** Unsets the listener. */
434   void unsetListener()
435   {
436     requestId = -1;
437     toListener = false;
438
439     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
440       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
441                               this + ": listener unset.");
442   }
443
444   /**
445    * Sets a receiver request.
446    *
447    * @param requestId Identifier of the "receive" request.
448    * @param timeToLive Request's time to live value.
449    */

450   void setReceiver(int requestId, long timeToLive) {
451     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
452       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
453                               this + ".setReceiver(" + requestId +
454                               "," + timeToLive + ")");
455
456     this.requestId = requestId;
457     toListener = false;
458
459     if (timeToLive > 0)
460       requestExpTime = System.currentTimeMillis() + timeToLive;
461     else
462       requestExpTime = 0;
463   }
464
465   /** Unsets a receiver request. */
466   void unsetReceiver() {
467     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
468       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
469                               this + ".unsetReceiver()");
470     requestId = -1;
471     requestExpTime = 0;
472   }
473
474   /** Sets the subscription's dead message queue identifier. */
475   void setDMQId(AgentId dmqId)
476   {
477     this.dmqId = dmqId;
478     save();
479   }
480
481   /** Sets the subscription's threshold value. */
482   void setThreshold(Integer JavaDoc threshold)
483   {
484     this.threshold = threshold;
485     save();
486   }
487
488   
489   /**
490    * Browses messages and keeps those which will have to be delivered
491    * to the subscriber.
492    */

493   // AF: TODO we should parse each message for each subscription
494
// see ProxyImpl.doFwd
495
void browseNewMessages(Vector JavaDoc newMessages) {
496     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
497       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
498                               this + ".browseNewMessages(" +
499                               newMessages + ')');
500     // Browsing the messages one by one.
501
Message message;
502     String JavaDoc msgId;
503     for (Enumeration JavaDoc e = newMessages.elements(); e.hasMoreElements();) {
504       message = (Message) e.nextElement();
505       msgId = message.getIdentifier();
506
507       // test nbMaxMsg
508
if (nbMaxMsg > -1 && nbMaxMsg <= messageIds.size()) {
509         ClientMessages deadMessages = new ClientMessages();
510         deadMessages.addMessage(message.msg);
511         sendToDMQ(deadMessages);
512         continue;
513       }
514
515       // Keeping the message if filtering is successful.
516
if (noFiltering ||
517           (Selector.matches(message.msg, selector) &&
518            (! noLocal || ! msgId.startsWith(proxyId.toString().substring(1) + "c" + contextId + "m", 3)))) {
519
520         // It's the first delivery, adds the message to the proxy's table
521
if (message.acksCounter == 0)
522
523           messagesTable.put(msgId, message);
524         
525         message.acksCounter++;
526         if (durable)
527           message.durableAcksCounter++;
528
529         messageIds.add(msgId);
530         save();
531
532         if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
533           JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
534                                   this + ": added msg " + msgId
535                                   + " for delivery.");
536       }
537     }
538   }
539
540   /**
541    * Launches a delivery sequence, either for a listener, or for a receiver.
542    */

543   ConsumerMessages deliver() {
544     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
545       JoramTracing.dbgProxy.log(
546         BasicLevel.DEBUG,
547         "ClientSubscription[" + proxyId + ',' +
548         topicId + ',' + name + "].deliver()");
549
550     // Returning null if no request exists:
551
if (requestId == -1)
552       return null;
553
554      // Returning null if a "receive" request has expired:
555
if (! toListener
556         && requestExpTime > 0
557         && System.currentTimeMillis() >= requestExpTime) {
558       if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
559         JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
560                                       this + ": receive request " + requestId
561                                       + " expired.");
562       requestId = -1;
563       requestExpTime = 0;
564       return null;
565     }
566
567     String JavaDoc id;
568     Message message;
569     Integer JavaDoc deliveryAttempts = null;
570     int lastPrior = -1;
571     int insertionIndex = -1;
572     int prior;
573     Vector JavaDoc deliverables = new Vector JavaDoc();
574     ClientMessages deadMessages = null;
575
576     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
577       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
578                                 " -> messageIds.size() = " + messageIds.size());
579     
580     // Delivering to a listener.
581
if (toListener) {
582       // Browsing the identifiers of the messages to deliver.
583
while (! messageIds.isEmpty()) {
584         id = (String JavaDoc) messageIds.remove(0);
585         save();
586         message = (Message) messagesTable.get(id);
587
588         // Message still exists.
589
if (message != null) {
590           // Delivering it if valid.
591
if (message.isValid(System.currentTimeMillis())) {
592             deliveredIds.put(id, id);
593
594             // Setting the message's deliveryCount and denied fields.
595
deliveryAttempts = (Integer JavaDoc) deniedMsgs.get(id);
596             if (deliveryAttempts == null)
597               message.msg.deliveryCount = 1;
598             else {
599               message.msg.deliveryCount = deliveryAttempts.intValue() + 1;
600               message.msg.redelivered = true;
601             }
602
603             // Inserting it according to its priority.
604
if (lastPrior == -1 || message.getPriority() == lastPrior)
605               insertionIndex++;
606             else {
607               insertionIndex = 0;
608               while (insertionIndex < deliverables.size()) {
609                 prior =
610                   ((Message) deliverables.get(insertionIndex)).getPriority();
611                 if (prior >= message.getPriority())
612                   insertionIndex++;
613                 else
614                   break;
615               }
616             }
617             lastPrior = message.getPriority();
618             deliverables.insertElementAt(message.msg.clone(), insertionIndex);
619
620             if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
621               JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
622                                       this + ": message " + id
623                                       + " added for delivery.");
624           } else {
625             // Invalid message: removing and adding it to the vector of dead
626
// messages.
627
messagesTable.remove(id);
628             // Deleting the message, if needed.
629
if (durable)
630               message.delete();
631
632             // Setting the message's deliveryCount, denied and expired fields.
633
deliveryAttempts = (Integer JavaDoc) deniedMsgs.remove(id);
634             if (deliveryAttempts != null) {
635               message.msg.deliveryCount = deliveryAttempts.intValue();
636               message.msg.redelivered = true;
637             }
638             message.msg.expired = true;
639             if (deadMessages == null)
640               deadMessages = new ClientMessages();
641             deadMessages.addMessage(message.msg);
642           }
643         } else {
644           // Message has already been deleted.
645
deniedMsgs.remove(id);
646         }
647       }
648     } else {
649       // Delivering to a receiver: getting the highest priority message.
650
int highestP = -1;
651       Message keptMsg = null;
652       // Browsing the non delivered messages.
653
int i = 0;
654       while (i < messageIds.size()) {
655         id = (String JavaDoc) messageIds.elementAt(i);
656         message = (Message) messagesTable.get(id);
657         if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
658           JoramTracing.dbgProxy.log(BasicLevel.DEBUG, " -> message = " + message);
659         
660         // Message still exists.
661
if (message != null) {
662           // Checking valid message.
663
if (message.isValid(System.currentTimeMillis())) {
664             if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
665               JoramTracing.dbgProxy.log(BasicLevel.DEBUG, " -> valid message");
666             // Higher priority: keeping the message.
667
if (message.getPriority() > highestP) {
668               highestP = message.getPriority();
669               keptMsg = message;
670             }
671
672             // get next message
673
i++;
674           } else {
675             // Invalid message: removing and adding it to the vector of dead
676
// messages.
677
if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
678               JoramTracing.dbgProxy.log(BasicLevel.DEBUG, " -> invalid message");
679             messageIds.remove(id);
680             save();
681             messagesTable.remove(id);
682             // Deleting the message, if needed.
683
if (durable)
684               message.delete();
685
686             // Setting the message's deliveryCount, denied and expired fields.
687
deliveryAttempts = (Integer JavaDoc) deniedMsgs.remove(id);
688             if (deliveryAttempts != null) {
689               message.msg.deliveryCount = deliveryAttempts.intValue();
690               message.msg.redelivered = true;
691             }
692             message.msg.expired = true;
693             deadMessages = new ClientMessages();
694             deadMessages.addMessage(message.msg);
695           }
696         } else {
697           // Message has already been deleted.
698
if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
699             JoramTracing.dbgProxy.log(BasicLevel.DEBUG, " -> deleted message");
700
701           messageIds.remove(id);
702           deniedMsgs.remove(id);
703           save();
704         }
705       }
706
707       // Putting the kept message in the vector.
708
if (keptMsg != null) {
709         messageIds.remove(keptMsg.getIdentifier());
710         deliveredIds.put(keptMsg.getIdentifier(), keptMsg.getIdentifier());
711         save();
712
713         // Setting the message's deliveryCount and denied fields.
714
deliveryAttempts = (Integer JavaDoc) deniedMsgs.get(keptMsg.getIdentifier());
715         if (deliveryAttempts == null)
716           keptMsg.msg.deliveryCount = 1;
717         else {
718           keptMsg.msg.deliveryCount = deliveryAttempts.intValue() + 1;
719           keptMsg.msg.redelivered = true;
720         }
721         deliverables.add(keptMsg.msg.clone());
722
723         if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
724           JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
725                                   this + ": message " + keptMsg.getIdentifier()
726                                   + " added for delivery.");
727       } else {
728         i++;
729       }
730     }
731    
732     // Sending the dead messages to the DMQ, if any:
733
if (deadMessages != null)
734       sendToDMQ(deadMessages);
735
736     // Finally, returning the reply or null:
737
if (! deliverables.isEmpty()) {
738       ConsumerMessages consM = new ConsumerMessages(requestId,
739                                                     deliverables,
740                                                     name,
741                                                     false);
742       if (! toListener) requestId = -1;
743
744       return consM;
745     }
746     return null;
747   }
748
749   /**
750    * Acknowledges messages.
751    */

752   void acknowledge(Enumeration JavaDoc acks) {
753     while (acks.hasMoreElements()) {
754       String JavaDoc id = (String JavaDoc) acks.nextElement();
755       acknowledge(id);
756     }
757   }
758
759   void acknowledge(String JavaDoc id) {
760     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
761       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
762                               this + ": acknowledges message: " + id);
763     
764     deliveredIds.remove(id);
765     deniedMsgs.remove(id);
766     save();
767     Message msg = (Message) messagesTable.get(id);
768     
769     // Message may be null if it is not valid anymore
770
if (msg != null) {
771       msg.acksCounter--;
772       if (msg.acksCounter == 0)
773         messagesTable.remove(id);
774       if (durable) {
775         msg.durableAcksCounter--;
776         
777         if (msg.durableAcksCounter == 0)
778           msg.delete();
779       }
780     }
781   }
782
783   /**
784    * Denies messages.
785    */

786   void deny(Enumeration JavaDoc denies) {
787     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
788       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
789                               this + ".deny(" + denies + ')');
790     String JavaDoc id;
791     Message message;
792     ClientMessages deadMessages = null;
793     int deliveryAttempts = 1;
794     int i;
795     String JavaDoc currentId;
796     long currentO;
797
798     denyLoop:
799     while (denies.hasMoreElements()) {
800       id = (String JavaDoc) denies.nextElement();
801
802       String JavaDoc deliveredMsgId = (String JavaDoc)deliveredIds.remove(id);
803       if (deliveredMsgId == null) {
804         if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
805           JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
806                                   this + ": cannot denies message: " + id);
807
808         continue denyLoop;
809       }
810       save();
811       
812       if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
813         JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
814                                 this + ": denies message: " + id);
815       
816       message = (Message) messagesTable.get(id);
817       
818       // Message may be null if it is not valid anymore
819
if (message == null) continue denyLoop;
820       
821       Integer JavaDoc value = (Integer JavaDoc) deniedMsgs.get(id);
822       if (value != null)
823         deliveryAttempts = value.intValue() + 1;
824       
825       // If maximum delivery attempts reached, the message is no more
826
// deliverable to this sbscriber.
827
if (isUndeliverable(deliveryAttempts)) {
828         deniedMsgs.remove(id);
829         message.msg.deliveryCount = deliveryAttempts;
830         message.msg.undeliverable = true;
831         if (deadMessages == null)
832           deadMessages = new ClientMessages();
833         deadMessages.addMessage(message.msg);
834         
835         message.acksCounter--;
836         if (message.acksCounter == 0)
837           messagesTable.remove(id);
838         
839         if (durable) {
840           message.durableAcksCounter--;
841           if (message.durableAcksCounter == 0)
842             message.delete();
843         }
844       } else {
845         // Else, putting it back to the deliverables vector according to its
846
// original delivery order, and adding a new entry for it in the
847
// denied messages table.
848
if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
849           JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
850                                     " -> put back to the messages to deliver");
851         
852         i = 0;
853         insertLoop:
854         while (i < messageIds.size()) {
855           currentId = (String JavaDoc) messageIds.elementAt(i);
856           Message currentMessage = (Message) messagesTable.get(currentId);
857             
858           // Message may be null if it is not valid anymore
859
if (currentMessage != null) {
860             currentO = currentMessage.order;
861             if (currentO > message.order) {
862               break insertLoop;
863             } else {
864               i++;
865             }
866           } else {
867             // Remove the invalid message
868
messageIds.removeElementAt(i);
869           }
870         }
871         
872         messageIds.insertElementAt(id, i);
873         deniedMsgs.put(id, new Integer JavaDoc(deliveryAttempts));
874       }
875     }
876
877     // Sending dead messages to the DMQ, if needed:
878
if (deadMessages != null)
879       sendToDMQ(deadMessages);
880
881   }
882
883   /**
884    * Decreases the subscription's messages acknowledgement expectations,
885    * deletes those not to be consumed anymore.
886    */

887   void delete() {
888     for (Enumeration JavaDoc e = deliveredIds.keys(); e.hasMoreElements();)
889       messageIds.add(e.nextElement());
890
891     for (Enumeration JavaDoc allMessageIds = messageIds.elements();
892          allMessageIds.hasMoreElements();) {
893       removeMessage((String JavaDoc) allMessageIds.nextElement());
894     }
895   }
896
897   
898   /**
899    * Returns <code>true</code> if a given value matches the threshold value
900    * for this user.
901    */

902   private boolean isUndeliverable(int deliveryAttempts) {
903     if (threshold != null)
904       return deliveryAttempts == threshold.intValue();
905     else if (DeadMQueueImpl.getDefaultThreshold() != null)
906       return deliveryAttempts == DeadMQueueImpl.getDefaultThreshold().intValue();
907     return false;
908   }
909
910   /**
911    * Method used for sending messages to the appropriate dead message queue.
912    */

913   private void sendToDMQ(ClientMessages messages)
914   {
915     if (dmqId != null)
916       Channel.sendTo(dmqId, messages);
917     else if (DeadMQueueImpl.getId() != null)
918       Channel.sendTo(DeadMQueueImpl.getId(), messages);
919   }
920
921   Message getMessage(String JavaDoc msgId) {
922     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
923       JoramTracing.dbgProxy.log(
924         BasicLevel.DEBUG,
925         "ClientSubscription.getMessage(" + msgId + ')');
926     int index = messageIds.indexOf(msgId);
927     if (index < 0) {
928       // The message has been delivered
929
if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
930         JoramTracing.dbgProxy.log(
931           BasicLevel.DEBUG, " -> message not found");
932       return null;
933     } else {
934       return (Message) messagesTable.get(msgId);
935     }
936   }
937
938   void deleteMessage(String JavaDoc msgId) {
939     messageIds.remove(msgId);
940     Message message = removeMessage(msgId);
941     save();
942     if (message != null) {
943       ClientMessages deadMessages = new ClientMessages();
944       deadMessages.addMessage(message.msg);
945       sendToDMQ(deadMessages);
946     }
947   }
948
949   void clear() {
950     ClientMessages deadMessages = null;
951     for (int i = 0; i < messageIds.size(); i++) {
952       String JavaDoc msgId = (String JavaDoc)messageIds.elementAt(i);
953       Message message = removeMessage(msgId);
954       if (message != null) {
955         if (deadMessages == null)
956           deadMessages = new ClientMessages();
957         deadMessages.addMessage(message.msg);
958       }
959     }
960     if (deadMessages != null)
961       sendToDMQ(deadMessages);
962     messageIds.clear();
963     save();
964   }
965
966   /**
967    * Removes a particular pending message in the subscription.
968    * The message is pointed out through its unique identifier.
969    *
970    * @param msgId The unique message's identifier.
971    */

972   Message removeMessage(String JavaDoc msgId) {
973     Message message = (Message) messagesTable.get(msgId);
974     if (message != null) {
975       message.acksCounter--;
976       if (message.acksCounter == 0)
977         messagesTable.remove(msgId);
978       if (durable) {
979         message.durableAcksCounter--;
980         if (message.durableAcksCounter == 0)
981           message.delete();
982       }
983     }
984     return message;
985   }
986   
987   private void save() {
988     if (durable) proxy.setSave();
989   }
990
991   public void readBag(ObjectInputStream JavaDoc in)
992     throws IOException JavaDoc, ClassNotFoundException JavaDoc {
993     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
994       JoramTracing.dbgProxy.log(
995         BasicLevel.DEBUG,
996         "ClientSubscription[" +
997         proxyId +
998         "].readbag()");
999
1000    contextId = in.readInt();
1001    subRequestId = in.readInt();
1002    noLocal = in.readBoolean();
1003    noFiltering = in.readBoolean();
1004    active = in.readBoolean();
1005    requestId = in.readInt();
1006    toListener = in.readBoolean();
1007    requestExpTime = in.readLong();
1008  }
1009
1010  public void writeBag(ObjectOutputStream JavaDoc out)
1011    throws IOException JavaDoc {
1012    if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
1013      JoramTracing.dbgProxy.log(
1014        BasicLevel.DEBUG,
1015        "ClientSubscription[" +
1016        proxyId +
1017        "].writeBag()");
1018
1019    out.writeInt(contextId);
1020    out.writeInt(subRequestId);
1021    out.writeBoolean(noLocal);
1022    out.writeBoolean(noFiltering);
1023    out.writeBoolean(active);
1024    out.writeInt(requestId);
1025    out.writeBoolean(toListener);
1026    out.writeLong(requestExpTime);
1027  }
1028}
1029
Popular Tags