KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > dest > ClusterQueueImpl


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2004 - 2007 ScalAgent Distributed Technologies
4  * Copyright (C) 2004 France Telecom R&D
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): ScalAgent Distributed Technologies
22  * Contributor(s):
23  */

24 package org.objectweb.joram.mom.dest;
25
26 import java.io.IOException JavaDoc;
27 import java.util.ArrayList JavaDoc;
28 import java.util.Enumeration JavaDoc;
29 import java.util.Hashtable JavaDoc;
30 import java.util.List JavaDoc;
31 import java.util.Properties JavaDoc;
32 import java.util.Vector JavaDoc;
33
34 import org.objectweb.joram.mom.messages.Message;
35 import org.objectweb.joram.mom.notifications.AckJoinQueueCluster;
36 import org.objectweb.joram.mom.notifications.ClientMessages;
37 import org.objectweb.joram.mom.notifications.JoinQueueCluster;
38 import org.objectweb.joram.mom.notifications.LBCycleLife;
39 import org.objectweb.joram.mom.notifications.LBMessageGive;
40 import org.objectweb.joram.mom.notifications.LBMessageHope;
41 import org.objectweb.joram.mom.notifications.LeaveQueueCluster;
42 import org.objectweb.joram.mom.notifications.QueueClusterNot;
43 import org.objectweb.joram.mom.notifications.ReceiveRequest;
44 import org.objectweb.joram.mom.notifications.SetRightQueueCluster;
45 import org.objectweb.joram.mom.notifications.SetRightRequest;
46 import org.objectweb.joram.mom.notifications.SpecialAdminRequest;
47 import org.objectweb.joram.mom.notifications.WakeUpNot;
48 import org.objectweb.joram.shared.JoramTracing;
49 import org.objectweb.joram.shared.admin.AddQueueCluster;
50 import org.objectweb.joram.shared.admin.ListClusterQueue;
51 import org.objectweb.joram.shared.admin.RemoveQueueCluster;
52 import org.objectweb.joram.shared.admin.SpecialAdmin;
53 import org.objectweb.joram.shared.excepts.RequestException;
54 import org.objectweb.util.monolog.api.BasicLevel;
55
56 import fr.dyade.aaa.agent.AgentId;
57 import fr.dyade.aaa.agent.UnknownNotificationException;
58
59 /**
60  * The <code>ClusterQueueImpl</code> class implements the MOM queue behaviour,
61  * basically storing messages and delivering them upon clients requests or
62  * delivering to an other cluster queue.
63  */

64 public class ClusterQueueImpl extends QueueImpl {
65   /**
66    * key = agentId of ClusterQueue
67    * value = rateOfFlow (Float)
68    */

69   protected Hashtable JavaDoc clusters;
70
71   /** to calcul the loading factor, overloaded, ... */
72   protected LoadingFactor loadingFactor;
73
74   /**
75    * key = msgId
76    * value = date
77    */

78   private Hashtable JavaDoc timeTable;
79
80   /** key = msgId
81    * value = Vector (alreadyVisit)
82    */

83   private Hashtable JavaDoc visitTable;
84
85   /** number of message send to cluster */
86   private long clusterDeliveryCount;
87
88   /** waiting after a cluster request */
89   private long waitAfterClusterReq = -1;
90
91   /**
92    * Constructs a <code>ClusterQueueImpl</code> instance.
93    *
94    * @param destId Identifier of the agent hosting the queue.
95    * @param adminId Identifier of the administrator of the queue.
96    */

97   public ClusterQueueImpl(AgentId destId, AgentId adminId, Properties JavaDoc prop) {
98     super(destId, adminId, prop);
99
100     /** producer threshold */
101     int producThreshold = -1;
102     /** consumer threshold */
103     int consumThreshold = -1;
104     /** automatic eval threshold */
105     boolean autoEvalThreshold = false;
106
107     if (prop != null) {
108       try {
109         waitAfterClusterReq =
110           Long.valueOf(prop.getProperty("waitAfterClusterReq")).longValue();
111       } catch (NumberFormatException JavaDoc exc) {
112         waitAfterClusterReq = 60000;
113       }
114       try {
115         producThreshold =
116           Integer.valueOf(prop.getProperty("producThreshold")).intValue();
117       } catch (NumberFormatException JavaDoc exc) {
118         producThreshold = 10000;
119       }
120       try {
121         consumThreshold =
122           Integer.valueOf(prop.getProperty("consumThreshold")).intValue();
123       } catch (NumberFormatException JavaDoc exc) {
124         consumThreshold = 10000;
125       }
126       autoEvalThreshold =
127         Boolean.valueOf(prop.getProperty("autoEvalThreshold")).booleanValue();
128     }
129
130     clusters = new Hashtable JavaDoc();
131     clusters.put(destId, new Float JavaDoc(1));
132
133     loadingFactor = new LoadingFactor(this,
134                                       producThreshold,
135                                       consumThreshold,
136                                       autoEvalThreshold,
137                                       waitAfterClusterReq);
138     timeTable = new Hashtable JavaDoc();
139     visitTable = new Hashtable JavaDoc();
140     clusterDeliveryCount = 0;
141
142   }
143
144   public String JavaDoc toString() {
145     return "ClusterQueueImpl:" + destId.toString();
146   }
147   
148   /**
149    * propagate right to all cluster.
150    *
151    * @param not
152    */

153   public void postProcess(SetRightRequest not) {
154     sendToCluster(
155       new SetRightQueueCluster(
156         loadingFactor.getRateOfFlow(),
157         not,
158         clients));
159     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
160       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
161                                     "--- " + this +
162                                     " ClusterQueueImpl.postProcess(" + not + ")" +
163                                     "\nclients=" + clients);
164   }
165
166   /**
167    * use to add or remove ClusterQueue to cluster.
168    *
169    * @param not
170    */

171   public Object JavaDoc specialAdminProcess(SpecialAdminRequest not)
172     throws RequestException {
173
174     Object JavaDoc ret = null;
175     try {
176       SpecialAdmin req = not.getRequest();
177       
178       if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
179         JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
180                                       "--- " + this +
181                                       " specialAdminProcess : " +
182                                       req);
183
184       if (req instanceof AddQueueCluster) {
185         addQueueCluster(((AddQueueCluster) req).joiningQueue,
186                         loadingFactor.getRateOfFlow());
187       } else if (req instanceof RemoveQueueCluster) {
188         broadcastLeave(((RemoveQueueCluster) req).removeQueue);
189         removeQueueCluster(((RemoveQueueCluster) req).removeQueue);
190       } else if(req instanceof ListClusterQueue) {
191         ret = doList((ListClusterQueue) req);
192       }
193     } catch (Exception JavaDoc exc) {
194       if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN))
195         JoramTracing.dbgDestination.log(BasicLevel.WARN,
196                                       "--- " + this +
197                                       " specialAdminProcess",
198                                       exc);
199       throw new RequestException(exc.getMessage());
200     }
201     return ret;
202   }
203   
204   /**
205    * return the cluster list (vector).
206    *
207    * @param req
208    * @return the cluster list (vector).
209    */

210   protected Object JavaDoc doList(ListClusterQueue req) {
211     Vector JavaDoc vect = new Vector JavaDoc();
212     for (Enumeration JavaDoc e = clusters.keys(); e.hasMoreElements(); )
213       vect.add(e.nextElement().toString());
214     return vect;
215   }
216  
217   /**
218    * send to joiningQueue a JoinQueueCluster not.
219    *
220    * @param joiningQueue
221    * @param rateOfFlow
222    */

223   protected void addQueueCluster(String JavaDoc joiningQueue, float rateOfFlow) {
224     AgentId id = AgentId.fromString(joiningQueue);
225     if (clusters.containsKey(id)) return;
226
227 // clusters.put(id,new Float(rateOfFlow));
228

229     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
230       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
231                                     "--- " + this +
232                                     " ClusterQueueImpl.addQueueCluster in " + destId +
233                                     "\njoiningQueue=" + joiningQueue +
234                                     "\nclusters=" + clusters);
235
236     forward(id,
237             new JoinQueueCluster(loadingFactor.getRateOfFlow(),
238                                  clusters,
239                                  clients,
240                                  freeReading,
241                                  freeWriting));
242   }
243   
244   /**
245    * broadcast to cluster the removeQueue.
246    *
247    * @param removeQueue
248    */

249   protected void broadcastLeave(String JavaDoc removeQueue) {
250     sendToCluster(new LeaveQueueCluster(removeQueue));
251   }
252
253   /**
254    * removeQueue leave the cluster.
255    *
256    * @param removeQueue
257    */

258   public void removeQueueCluster(String JavaDoc removeQueue) {
259     AgentId id = AgentId.fromString(removeQueue);
260     if (destId.equals(id)) {
261       clusters.clear();
262     } else
263       clusters.remove(id);
264
265     for (Enumeration JavaDoc e = visitTable.elements(); e.hasMoreElements(); ) {
266       Vector JavaDoc visit = (Vector JavaDoc) e.nextElement();
267       if (visit.contains(id))
268         visit.remove(id);
269     }
270
271     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
272       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
273                                     "--- " + this +
274                                     " ClusterQueueImpl.removeQueueCluster in " + destId +
275                                     "\nremoveQueue=" + removeQueue +
276                                     "\nclusters=" + clusters);
277   }
278   
279   /**
280    * overload preProcess(AgentId, ClientMessages)
281    * store all msgId in timeTable and visitTable.
282    *
283    * @param from
284    * @param not
285    */

286   public ClientMessages preProcess(AgentId from, ClientMessages not) {
287     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
288       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
289                                     "--- " + this +
290                                     " " + not);
291     receiving = true;
292     long date = System.currentTimeMillis();
293     
294     Message msg;
295     // Storing each received message:
296
for (Enumeration JavaDoc msgs = not.getMessages().elements();
297          msgs.hasMoreElements();) {
298       msg = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement());
299       msg.order = arrivalsCounter++;
300       storeMsgIdInTimeTable(msg.getIdentifier(),
301                             new Long JavaDoc(date));
302       //storeMsgIdInVisitTable(msg.getIdentifier(), destId);
303
}
304     return not;
305   }
306   
307   /**
308    * call factorCheck to evaluate the loading factor,
309    * activity, ... and send message to cluster if need.
310    *
311    * @param not
312    */

313   public void postProcess(ClientMessages not) {
314     if (getMessageCounter() > loadingFactor.producThreshold)
315       loadingFactor.factorCheck(clusters,
316                                 getMessageCounter(),
317                                 getWaitingRequestCount());
318     else
319       loadingFactor.evalRateOfFlow(getMessageCounter(),
320                                    getWaitingRequestCount());
321     receiving = false;
322   }
323
324  
325
326   /**
327    * set the same right to all cluster
328    *
329    * @param not
330    */

331   public void setRightQueueCluster(SetRightQueueCluster not) {
332     try {
333       AgentId user = not.setRightRequest.getClient();
334       int right = not.setRightRequest.getRight();
335       super.processSetRight(user,right);
336     } catch (RequestException exc) {}
337     super.doRightRequest(not.setRightRequest);
338     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
339       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
340                                     "--- " + this +
341                                     " ClusterQueueImpl.setRightQueueCluster(" + not + ")" +
342                                     "\nclients=" + clients);
343   }
344
345   /**
346    * wake up, and call factorCheck to evaluate the loading factor...
347    * if msg stay more a periode time in timeTable send to an other
348    * (no visited) queue in cluster.
349    *
350    * @param not
351    */

352   public void wakeUpNot(WakeUpNot not) {
353     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
354       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
355                                     "--- " + this +
356                                     " ClusterQueueImpl.wakeUpNot(" + not + ")");
357     super.wakeUpNot(not);
358     
359     if (clusters.size() > 1)
360       loadingFactor.factorCheck(clusters,
361                                 getMessageCounter(),
362                                 getWaitingRequestCount());
363
364     // check if message arrived befor "period".
365
// if is true send message to the next (no visited) clusterQueue.
366
List JavaDoc toGive = new ArrayList JavaDoc();
367     long oldTime = System.currentTimeMillis() - period;
368     for (Enumeration JavaDoc e = timeTable.keys(); e.hasMoreElements(); ) {
369       String JavaDoc msgId = (String JavaDoc) e.nextElement();
370       if (((Long JavaDoc) timeTable.get(msgId)).longValue() < oldTime) {
371         toGive.add(msgId);
372         storeMsgIdInVisitTable(msgId,destId);
373       }
374     }
375
376     if (toGive.isEmpty()) return;
377
378     Hashtable JavaDoc table = new Hashtable JavaDoc();
379     for (int i = 0; i < toGive.size(); i++) {
380       String JavaDoc msgId = (String JavaDoc) toGive.get(i);
381       Vector JavaDoc visit = (Vector JavaDoc) visitTable.get(msgId);
382       for (Enumeration JavaDoc e = clusters.keys(); e.hasMoreElements(); ) {
383         AgentId id = (AgentId) e.nextElement();
384         if (! visit.contains(id)) {
385           Message message = getMessage(msgId, true);
386           if (message != null) {
387             LBCycleLife cycle = (LBCycleLife) table.get(id);
388             if (cycle == null) {
389               cycle = new LBCycleLife(loadingFactor.getRateOfFlow());
390               cycle.setClientMessages(new ClientMessages());
391             }
392             ClientMessages cm = cycle.getClientMessages();
393             cm.addMessage(message.msg);
394             cycle.putInVisitTable(msgId,visit);
395             table.put(id,cycle);
396             break;
397           }
398         }
399       }
400     }
401
402     for (Enumeration JavaDoc e = table.keys(); e.hasMoreElements(); ) {
403       AgentId id = (AgentId) e.nextElement();
404       forward(id,(LBCycleLife) table.get(id));
405     }
406   }
407
408   /**
409    * The messages are not consumed by an other cluster's queue
410    * in a periode time, try to consume in this queue.
411    * update visitTable, and process clientMessages.
412    *
413    * @param from
414    * @param not
415    */

416   public void lBCycleLife(AgentId from, LBCycleLife not) {
417
418     clusters.put(from,new Float JavaDoc(not.getRateOfFlow()));
419
420     Hashtable JavaDoc vT = not.getVisitTable();
421     for (Enumeration JavaDoc e = vT.keys(); e.hasMoreElements(); ) {
422       String JavaDoc msgId = (String JavaDoc) e.nextElement();
423       visitTable.put(msgId,vT.get(msgId));
424     }
425
426     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
427       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
428                                     "--- " + this +
429                                     " ClusterQueueImpl.lBCycleLife(" + not + ")" +
430                                     "\nvisitTable=" + clusters);
431     ClientMessages cm = not.getClientMessages();
432     if (cm != null)
433       doClientMessages(from, cm);
434   }
435
436   /**
437    * new queue come in cluster, update clusters.
438    * and spread to clusters the AckjoiningQueue.
439    *
440    * @param not JoinQueueCluster
441    */

442   public void joinQueueCluster(JoinQueueCluster not) {
443     for (Enumeration JavaDoc e = not.clusters.keys(); e.hasMoreElements(); ) {
444       AgentId id = (AgentId) e.nextElement();
445       if (! clusters.containsKey(id))
446         clusters.put(id,not.clusters.get(id));
447     }
448     for (Enumeration JavaDoc e = not.clients.keys(); e.hasMoreElements(); ) {
449       AgentId user = (AgentId) e.nextElement();
450       if (clients.containsKey(user)) {
451         Integer JavaDoc right = (Integer JavaDoc) not.clients.get(user);
452         if (right.compareTo((Integer JavaDoc) clients.get(user)) > 0)
453           clients.put(user,right);
454       } else
455         clients.put(user,not.clients.get(user));
456     }
457
458     freeReading = freeReading | not.freeReading;
459     freeWriting = freeWriting | not.freeWriting;
460
461     sendToCluster(
462       new AckJoinQueueCluster(loadingFactor.getRateOfFlow(),
463                               clusters,
464                               clients,
465                               freeReading,
466                               freeWriting));
467   
468     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
469       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
470                                     "--- " + this +
471                                     " ClusterQueueImpl.joinQueueCluster(" + not + ")" +
472                                     "\nclusters=" + clusters +
473                                     "\nclients=" + clients);
474   }
475
476   /**
477    *
478    * @param not AckJoinQueueCluster
479    */

480   public void ackJoinQueueCluster(AckJoinQueueCluster not) {
481     for (Enumeration JavaDoc e = not.clusters.keys(); e.hasMoreElements(); ) {
482       AgentId id = (AgentId) e.nextElement();
483       if (! clusters.containsKey(id))
484         clusters.put(id,not.clusters.get(id));
485     }
486     for (Enumeration JavaDoc e = not.clients.keys(); e.hasMoreElements(); ) {
487       AgentId user = (AgentId) e.nextElement();
488       if (clients.containsKey(user)) {
489         Integer JavaDoc right = (Integer JavaDoc) not.clients.get(user);
490         if (right.compareTo((Integer JavaDoc) clients.get(user)) > 0)
491           clients.put(user,right);
492       } else
493         clients.put(user,not.clients.get(user));
494     }
495
496     freeReading = freeReading | not.freeReading;
497     freeWriting = freeWriting | not.freeWriting;
498   
499     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
500       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
501                                     "--- " + this +
502                                     " ClusterQueueImpl.ackJoinQueueCluster(" + not + ")" +
503                                     "\nclusters=" + clusters +
504                                     "\nclients=" + clients);
505   }
506
507   /**
508    *
509    * @param not ReceiveRequest
510    */

511   public void receiveRequest(ReceiveRequest not) {
512     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
513       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
514                                     "--- " + this +
515                                     " ClusterQueueImpl.receiveRequest(" + not + ")");
516
517     //loadingFactor.setWait();
518

519     if (getWaitingRequestCount() > loadingFactor.consumThreshold)
520       loadingFactor.factorCheck(clusters,
521                                 getMessageCounter(),
522                                 getWaitingRequestCount());
523   }
524
525   /**
526    * load balancing message give by an other cluster queue.
527    * process ClientMessages, no need to check if sender is writer.
528    *
529    * @param from AgentId
530    * @param not LBMessageGive
531    * @throws UnknownNotificationException
532    */

533   public void lBMessageGive(AgentId from, LBMessageGive not)
534     throws UnknownNotificationException {
535     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
536       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
537                                     "--- " + this +
538                                     " ClusterQueueImpl.lBMessageGive(" + from + "," + not + ")");
539
540     clusters.put(from,new Float JavaDoc(not.getRateOfFlow()));
541
542     ClientMessages cm = not.getClientMessages();
543     if (cm != null)
544       doClientMessages(from, cm);
545   }
546
547   /**
548    * load balancing message hope by the "from" queue.
549    *
550    * @param from
551    * @param not LBMessageHope
552    */

553   public void lBMessageHope(AgentId from, LBMessageHope not) {
554     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
555       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
556                                     "--- " + this +
557                                     " ClusterQueueImpl.lBMessageHope(" + from + "," + not + ")");
558     
559     clusters.put(from,new Float JavaDoc(not.getRateOfFlow()));
560
561     int hope = not.getNbMsg();
562
563     long current = System.currentTimeMillis();
564     // Cleaning the possible expired messages.
565
ClientMessages deadMessages = cleanPendingMessage(current);
566     // If needed, sending the dead messages to the DMQ:
567
if (deadMessages != null)
568       sendToDMQ(deadMessages, null);
569     
570     if (loadingFactor.getRateOfFlow() < 1) {
571       int possibleGive = getMessageCounter() - getWaitingRequestCount();
572       LBMessageGive msgGive =
573         new LBMessageGive(waitAfterClusterReq,loadingFactor.getRateOfFlow());
574       
575       // get client messages, hope or possible give.
576
ClientMessages cm = null;
577       if (possibleGive > hope) {
578         cm = getClientMessages(hope, null, true);
579       } else {
580         cm = getClientMessages(possibleGive, null, true);
581       }
582
583       msgGive.setClientMessages(cm);
584       msgGive.setRateOfFlow(
585         loadingFactor.evalRateOfFlow(getMessageCounter(), getWaitingRequestCount()));
586       
587       // send notification contains ClientMessages.
588
forward(from, msgGive);
589       
590       if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
591         JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
592                                       "--- " + this +
593                                       " ClusterQueueImpl.lBMessageHope LBMessageHope : nbMsgSend = " +
594                                       cm.getMessages().size());
595     }
596   }
597
598   /**
599    * get a client message contain nb messages.
600    * add cluster monitoring value.
601    *
602    * @param nb number of messages returned in ClientMessage.
603    * @param selector jms selector
604    * @param remove delete all messages returned if true
605    * @return ClientMessages (contains nb Messages)
606    */

607   protected ClientMessages getClientMessages(int nb, String JavaDoc selector, boolean remove) {
608     ClientMessages cm = super.getClientMessages(nb, selector, remove);
609     // set information in cluster
610
for (Enumeration JavaDoc e = cm.getMessages().elements(); e.hasMoreElements(); ) {
611       org.objectweb.joram.shared.messages.Message message =
612         (org.objectweb.joram.shared.messages.Message) e.nextElement();
613       monitoringMsgSendToCluster(message.id);
614     }
615     return cm;
616   }
617   
618   /**
619    * get mom message, delete if remove = true.
620    * add cluster monitoring value.
621    *
622    * @param msgId message identification
623    * @param remove if true delete message
624    * @return mom message
625    */

626   protected Message getMessage(String JavaDoc msgId, boolean remove) {
627     Message msg = super.getMessage(msgId, remove);
628     if (msg != null) {
629       monitoringMsgSendToCluster(msg.getIdentifier());
630     }
631     return msg;
632   }
633   
634   /**
635    * send to all queue in cluster.
636    *
637    * @param not
638    */

639   protected void sendToCluster(QueueClusterNot not) {
640     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
641       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
642                                     "--- " + this +
643                                     " ClusterQueueImpl.sendToCluster(" + not + ")");
644
645     if (clusters.size() < 2) return;
646
647     for (Enumeration JavaDoc e = clusters.keys(); e.hasMoreElements(); ) {
648       AgentId id = (AgentId) e.nextElement();
649       if (! id.equals(destId))
650         forward(id,not);
651     }
652   }
653
654   /**
655    * return the number of Message send to cluster.
656    */

657   public long getClusterDeliveryCount() {
658     return clusterDeliveryCount;
659   }
660
661   /**
662    *
663    * @param msgId
664    * @param date
665    */

666   private void storeMsgIdInTimeTable(String JavaDoc msgId, Long JavaDoc date) {
667     try {
668       timeTable.put(msgId,date);
669     } catch (NullPointerException JavaDoc exc) {}
670   }
671
672   /**
673    *
674    * @param msgId
675    * @param destId
676    */

677   private void storeMsgIdInVisitTable(String JavaDoc msgId, AgentId destId) {
678     Vector JavaDoc alreadyVisit = (Vector JavaDoc) visitTable.get(msgId);
679     if (alreadyVisit == null) alreadyVisit = new Vector JavaDoc();
680     alreadyVisit.add(destId);
681     visitTable.put(msgId,alreadyVisit);
682   }
683
684   /**
685    *
686    * @param msgId
687    */

688   protected void messageDelivered(String JavaDoc msgId) {
689     timeTable.remove(msgId);
690     visitTable.remove(msgId);
691   }
692
693   /**
694    *
695    * @param msgId
696    */

697   protected void monitoringMsgSendToCluster(String JavaDoc msgId) {
698     timeTable.remove(msgId);
699     visitTable.remove(msgId);
700     clusterDeliveryCount++;
701   }
702   
703   /**
704    *
705    * @param waitAfterClusterReq
706    */

707   public void setWaitAfterClusterReq(long waitAfterClusterReq) {
708     this.waitAfterClusterReq = waitAfterClusterReq;
709     loadingFactor.validityPeriod = waitAfterClusterReq;
710   }
711   
712   /**
713    *
714    * @param producThreshold
715    */

716   public void setProducThreshold(int producThreshold) {
717     loadingFactor.producThreshold = producThreshold;
718   }
719   
720   /**
721    *
722    * @param consumThreshold
723    */

724   public void setConsumThreshold(int consumThreshold) {
725     loadingFactor.consumThreshold = consumThreshold;
726   }
727   
728   /**
729    *
730    * @param autoEvalThreshold
731    */

732   public void setAutoEvalThreshold(boolean autoEvalThreshold) {
733     loadingFactor.autoEvalThreshold = autoEvalThreshold;
734   }
735
736   /**
737    *
738    * @param in
739    * @throws IOException
740    * @throws ClassNotFoundException
741    */

742   private void readObject(java.io.ObjectInputStream JavaDoc in)
743     throws IOException JavaDoc, ClassNotFoundException JavaDoc {
744
745     in.defaultReadObject();
746
747     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
748       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
749                                     "--- " + this +
750                                     " ClusterQueueImpl.readObject" +
751                                     " loadingFactor = " + loadingFactor);
752   }
753 }
754
Popular Tags