KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > dest > TopicImpl


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2007 ScalAgent Distributed Technologies
4  * Copyright (C) 2003 - 2004 Bull SA
5  * Copyright (C) 1996 - 2000 Dyade
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the Free Software
19  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20  * USA.
21  *
22  * Initial developer(s): Frederic Maistre (INRIA)
23  * Contributor(s): ScalAgent Distributed Technologies
24  */

25 package org.objectweb.joram.mom.dest;
26
27 import java.util.Enumeration JavaDoc;
28 import java.util.Hashtable JavaDoc;
29 import java.util.Properties JavaDoc;
30 import java.util.Vector JavaDoc;
31
32 import org.objectweb.joram.mom.notifications.AdminReply;
33 import org.objectweb.joram.mom.notifications.ClientMessages;
34 import org.objectweb.joram.mom.notifications.ClusterRequest;
35 import org.objectweb.joram.mom.notifications.DestinationAdminRequestNot;
36 import org.objectweb.joram.mom.notifications.ExceptionReply;
37 import org.objectweb.joram.mom.notifications.Monit_GetCluster;
38 import org.objectweb.joram.mom.notifications.Monit_GetClusterRep;
39 import org.objectweb.joram.mom.notifications.Monit_GetFather;
40 import org.objectweb.joram.mom.notifications.Monit_GetFatherRep;
41 import org.objectweb.joram.mom.notifications.Monit_GetNumberRep;
42 import org.objectweb.joram.mom.notifications.Monit_GetSubscriptions;
43 import org.objectweb.joram.mom.notifications.SetFatherRequest;
44 import org.objectweb.joram.mom.notifications.SetRightRequest;
45 import org.objectweb.joram.mom.notifications.SubscribeReply;
46 import org.objectweb.joram.mom.notifications.SubscribeRequest;
47 import org.objectweb.joram.mom.notifications.TopicMsgsReply;
48 import org.objectweb.joram.mom.notifications.UnclusterRequest;
49 import org.objectweb.joram.mom.notifications.UnsetFatherRequest;
50 import org.objectweb.joram.mom.notifications.UnsubscribeRequest;
51 import org.objectweb.joram.shared.JoramTracing;
52 import org.objectweb.joram.shared.admin.GetSubscriberIds;
53 import org.objectweb.joram.shared.admin.GetSubscriberIdsRep;
54 import org.objectweb.joram.shared.excepts.AccessException;
55 import org.objectweb.joram.shared.excepts.MomException;
56 import org.objectweb.joram.shared.messages.Message;
57 import org.objectweb.joram.shared.selectors.Selector;
58 import org.objectweb.util.monolog.api.BasicLevel;
59
60 import fr.dyade.aaa.agent.AgentId;
61 import fr.dyade.aaa.agent.AgentServer;
62 import fr.dyade.aaa.agent.DeleteNot;
63 import fr.dyade.aaa.agent.Notification;
64 import fr.dyade.aaa.agent.UnknownAgent;
65
66 /**
67  * The <code>TopicImpl</code> class implements the MOM topic behaviour,
68  * basically distributing the received messages to subscribers.
69  * <p>
70  * A Topic might be part of a hierarchy; if it is the case, and if the topic
71  * is not on top of that hierarchy, it will have a father to forward messages
72  * to.
73  * <p>
74  * A topic might also be part of a cluster; if it is the case, it will have
75  * friends to forward messages to.
76  * <p>
77  * A topic can't be part of a hierarchy and of a cluster at the same time.
78  */

79 public class TopicImpl extends DestinationImpl implements TopicImplMBean {
80   /** Identifier of this topic's father, if any. */
81   protected AgentId fatherId = null;
82   /** Vector of cluster fellows, if any. */
83   protected Vector JavaDoc friends = null;
84   
85   /** Vector of subscribers' identifiers. */
86   protected Vector JavaDoc subscribers;
87   /** Table of subscribers' selectors. */
88   protected Hashtable JavaDoc selectors;
89
90   /** Internal boolean used for tagging local sendings. */
91   protected transient boolean alreadySentLocally;
92
93   /**
94    * Constructs a <code>TopicImpl</code> instance.
95    *
96    * @param destId Identifier of the agent hosting the topic.
97    * @param adminId Identifier of the administrator of the topic.
98    * @param prop The initial set of properties.
99    */

100   public TopicImpl(AgentId destId, AgentId adminId, Properties JavaDoc prop) {
101     super(destId, adminId, prop);
102     subscribers = new Vector JavaDoc();
103     selectors = new Hashtable JavaDoc();
104   }
105
106   /**
107    * Returns a string representation of this destination.
108    */

109   public String JavaDoc toString() {
110     return "TopicImpl:" + destId.toString();
111   }
112
113   /**
114    * Method implementing the reaction to a <code>ClusterRequest</code>
115    * instance requesting to add a topic to the cluster, or to set a
116    * cluster with a given topic.
117    *
118    * @exception AccessException If the requester is not an administrator.
119    */

120   public void clusterRequest(AgentId from, ClusterRequest req) throws AccessException
121   {
122     if (! isAdministrator(from))
123       throw new AccessException("ADMIN right not granted");
124
125     String JavaDoc info = null;
126     if (fatherId != null) {
127       info = strbuf.append("Request [").append(req.getClass().getName())
128         .append("], sent to Topic [").append(destId)
129         .append("], successful [false]: topic part of a hierarchy").toString();
130       strbuf.setLength(0);
131       forward(from, new AdminReply(req, false, info));
132       return;
133     }
134
135     AgentId newFriendId = req.getTopicId();
136
137     if (friends == null) {
138       // state change, so save.
139
setSave();
140       friends = new Vector JavaDoc();
141     }
142
143     if (friends.contains(newFriendId) || destId.equals(newFriendId)) {
144       info = strbuf.append("Request [").append(req.getClass().getName())
145         .append("], sent to Topic [").append(destId)
146         .append("], successful [false]: joining topic already")
147         .append(" part of cluster").toString();
148       strbuf.setLength(0);
149       forward(from, new AdminReply(req, false, info));
150       return;
151     }
152
153     ClusterTest not = new ClusterTest(req, from);
154     forward(newFriendId, not);
155   }
156
157   /**
158    * Method implementing the reaction to a <code>ClusterTest</code>
159    * notification sent by a fellow topic for testing if this topic might be
160    * part of a cluster.
161    */

162   public void clusterTest(AgentId from, ClusterTest not) {
163     String JavaDoc info = null;
164     // The topic is already part of a cluster: can't join an other cluster.
165
if (friends != null && ! friends.isEmpty()) {
166       info = strbuf.append("Topic [").append(destId)
167         .append("] can't join cluster of topic [").append(from)
168         .append("] as it is already part of a cluster").toString();
169       strbuf.setLength(0);
170       forward(from, new ClusterAck(not, false, info));
171     // The topic is already part of a hierarchy: can't join a cluster.
172
} else if (fatherId != null) {
173       info = strbuf.append("Topic [").append(destId)
174         .append("] can't join cluster of topic [").append(from)
175         .append("] as it is already part of a hierarchy").toString();
176       strbuf.setLength(0);
177       forward(from, new ClusterAck(not, false, info));
178     // The topic is free: joining the cluster.
179
} else {
180       // state change, so save.
181
setSave();
182       friends = new Vector JavaDoc();
183       friends.add(from);
184       info = strbuf.append("Topic [").append(destId)
185         .append("] ok for joining cluster of topic [").append(from)
186         .append(']').toString();
187       strbuf.setLength(0);
188       forward(from, new ClusterAck(not, true, info));
189
190       if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
191         JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "Topic "
192                                       + destId.toString() + " joins cluster"
193                                       + "cluster of topic " + from.toString());
194     }
195   }
196
197   /**
198    * Method implementing the reaction to a <code>ClusterAck</code>
199    * notification sent by a topic requested to join the cluster.
200    */

201   public void clusterAck(AgentId from, ClusterAck ack){
202     // The topic does not accept to join the cluster: doing nothing.
203
if (! ack.ok) {
204       forward(ack.requester, new AdminReply(ack.request, false, ack.info));
205       return;
206     }
207   
208     AgentId fellowId;
209     ClusterNot fellowNot;
210     ClusterNot newFriendNot = new ClusterNot(from);
211     for (int i = 0; i < friends.size(); i++) {
212       fellowId = (AgentId) friends.get(i);
213       fellowNot = new ClusterNot(fellowId);
214       // Notifying the joining topic of the current fellow.
215
forward(from, fellowNot);
216       // Notifying the current fellow of the joining topic.
217
forward(fellowId, newFriendNot);
218     }
219     // state change, so save.
220
setSave();
221     friends.add(from);
222
223     String JavaDoc info = strbuf.append("Request [")
224       .append(ack.request.getClass().getName())
225       .append("], sent to Topic [").append(destId)
226       .append("], successful [true]: topic [")
227       .append(from).append("] joined cluster").toString();
228     strbuf.setLength(0);
229     forward(ack.requester, new AdminReply(ack.request, true, info));
230
231     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
232       JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info);
233   }
234
235   /**
236    * Method implementing the reaction to a <code>ClusterNot</code>
237    * notification sent by a fellow topic for notifying this topic
238    * of a new cluster fellow.
239    */

240   public void clusterNot(AgentId from, ClusterNot not) {
241     // state change, so save.
242
setSave();
243     friends.add(not.topicId);
244       
245     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
246       JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "Topic "
247                                     + not.topicId.toString()
248                                     + " set as a fellow.");
249   }
250  
251   /**
252    * Method implementing the reaction to an <code>UnclusterRequest</code>
253    * instance requesting this topic to leave the cluster it is part of.
254    *
255    * @exception AccessException If the requester is not an administrator.
256    */

257   public void unclusterRequest(AgentId from, UnclusterRequest request) throws MomException {
258     if (! isAdministrator(from))
259       throw new AccessException("ADMIN right not granted");
260
261     if (friends == null || friends.isEmpty()) {
262       String JavaDoc info = strbuf.append("Request [")
263         .append(request.getClass().getName())
264         .append("], sent to Topic [").append(destId)
265         .append("], successful [false]: topic not part of a cluster")
266         .toString();
267       strbuf.setLength(0);
268       forward(from, new AdminReply(request, false, info));
269       return;
270     }
271
272     UnclusterNot not = new UnclusterNot();
273     AgentId fellowId;
274     // Notifying each fellow of the leave.
275
while (! friends.isEmpty()) {
276       // state change, so save.
277
setSave();
278       fellowId = (AgentId) friends.remove(0);
279       forward(fellowId, not);
280     }
281     friends = null;
282
283     String JavaDoc info = strbuf.append("Request [")
284       .append(request.getClass().getName())
285       .append("], sent to Topic [").append(destId)
286       .append("], successful [true]: topic left the cluster").toString();
287     strbuf.setLength(0);
288     forward(from, new AdminReply(request, true, info));
289
290     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
291       JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info);
292   }
293  
294   /**
295    * Method implementing the reaction to an <code>UnclusterNot</code>
296    * notification sent by a topic leaving the cluster.
297    */

298   public void unclusterNot(AgentId from, UnclusterNot not) {
299     // state change, so save.
300
setSave();
301     friends.remove(from);
302
303     if (friends.isEmpty()) friends = null;
304
305     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
306       JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "Topic "
307                                     + from.toString() + " removed from"
308                                     + " cluster.");
309   }
310
311   /**
312    * Method implementing the reaction to a <code>SetFatherRequest</code>
313    * instance notifying this topic it is part of a hierarchy as a son.
314    *
315    * @exception AccessException If the requester is not an administrator.
316    */

317   public void setFatherRequest(AgentId from, SetFatherRequest request) throws MomException {
318     if (! isAdministrator(from))
319       throw new AccessException("ADMIN right not granted");
320
321     if (fatherId != null) {
322       strbuf.append("Request [").append(request.getClass().getName())
323         .append("], sent to Topic [").append(destId)
324         .append("], successful [false]: topic already part of a hierarchy");
325       forward(from, new AdminReply(request, false, strbuf.toString()));
326       strbuf.setLength(0);
327       return;
328     }
329
330     if (friends != null) {
331       strbuf.append("Request [").append(request.getClass().getName())
332         .append("], sent to Topic [").append(destId)
333         .append("], successful [false]: topic already part of a cluster");
334       forward(from, new AdminReply(request, false, strbuf.toString()));
335       strbuf.setLength(0);
336       return;
337     }
338
339     forward(request.getFatherId(), new FatherTest(request, from));
340   }
341
342   /**
343    * Method reacting to a <code>FatherTest</code> notification checking if it
344    * can be a father to a topic.
345    */

346   public void fatherTest(AgentId from, FatherTest not) {
347     if (friends != null && ! friends.isEmpty()) {
348       strbuf.append("Topic [").append(destId)
349         .append("] can't accept topic [").append(from)
350         .append("] as a son as it is part of a cluster");
351       forward(from, new FatherAck(not, false, strbuf.toString()));
352       strbuf.setLength(0);
353     } else {
354       strbuf.append("Topic [").append(destId)
355         .append("] accepts topic [").append(from).append("] as a son");
356       forward(from, new FatherAck(not, true, strbuf.toString()));
357       strbuf.setLength(0);
358     }
359   }
360
361   /**
362    * Method reacting to a <code>FatherAck</code> notification coming from
363    * the topic this topic requested as a father.
364    */

365   public void fatherAck(AgentId from, FatherAck not) {
366     // The topic does not accept to join the hierarchy: doing nothing.
367
if (! not.ok) {
368       forward(not.requester, new AdminReply(not.request, false, not.info));
369       return;
370     }
371   
372     // state change, so save.
373
setSave();
374     // The topic accepts to be a father: setting it.
375
fatherId = from;
376   
377     String JavaDoc info = strbuf.append("Request [")
378       .append(not.request.getClass().getName())
379       .append("], sent to Topic [").append(destId)
380       .append("], successful [true]: topic [")
381       .append(from).append("] set as father").toString();
382     strbuf.setLength(0);
383     forward(not.requester, new AdminReply(not.request, true, info));
384   
385     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
386       JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info);
387   }
388
389   /**
390    * Method implementing the reaction to an <code>UnsetFatherRequest</code>
391    * instance notifying this topic to leave its father.
392    *
393    * @exception AccessException If the requester is not an administrator.
394    */

395   public void unsetFatherRequest(AgentId from, UnsetFatherRequest request) throws MomException {
396     if (! isAdministrator(from))
397       throw new AccessException("ADMIN right not granted");
398
399     String JavaDoc info = null;
400     if (fatherId == null) {
401       info = strbuf.append("Request [").append(request.getClass().getName())
402         .append("], sent to Topic [").append(destId)
403         .append("], successful [false]: topic is not a son").toString();
404       strbuf.setLength(0);
405       forward(from, new AdminReply(request, false, info));
406       return;
407     }
408
409     // state change, so save.
410
setSave();
411     fatherId = null;
412
413     info = strbuf.append("Request [").append(request.getClass().getName())
414       .append("], sent to Topic [").append(destId)
415       .append("], successful [true]: father unset").toString();
416     strbuf.setLength(0);
417     forward(from, new AdminReply(request, true, info));
418
419     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
420       JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info);
421   }
422
423   /**
424    * Method implementing the reaction to a
425    * <code>Monit_GetSubscriptions</code> notification requesting the
426    * number of subscriptions.
427    *
428    * @exception AccessException If the requester is not the administrator.
429    */

430   public void monitGetSubscriptions(AgentId from, Monit_GetSubscriptions not) throws AccessException {
431     if (! isAdministrator(from))
432       throw new AccessException("ADMIN right not granted");
433
434     forward(from, new Monit_GetNumberRep(not, subscribers.size()));
435   }
436
437   /**
438    * Method implementing the reaction to a <code>Monit_GetFather</code>
439    * notification requesting the identifier of the hierarchical father.
440    *
441    * @exception AccessException If the requester is not the administrator.
442    */

443   public void monitGetFather(AgentId from, Monit_GetFather not) throws AccessException {
444     if (! isAdministrator(from))
445       throw new AccessException("ADMIN right not granted");
446
447     String JavaDoc id = null;
448     if (fatherId != null)
449       id = fatherId.toString();
450
451     forward(from, new Monit_GetFatherRep(not, id));
452   }
453
454   /**
455    * Method implementing the reaction to a <code>Monit_GetCluster</code>
456    * notification requesting the identifiers of the cluster's topics.
457    *
458    * @exception AccessException If the requester is not the administrator.
459    */

460   public void monitGetCluster(AgentId from, Monit_GetCluster not) throws AccessException {
461     if (! isAdministrator(from))
462       throw new AccessException("ADMIN right not granted");
463
464     Vector JavaDoc cluster = null;
465     if (friends != null) {
466       cluster = new Vector JavaDoc();
467       for (int i = 0; i < friends.size(); i++)
468         cluster.add(friends.get(i).toString());
469       cluster.add(destId.toString());
470     }
471
472     forward(from, new Monit_GetClusterRep(not, cluster));
473   }
474
475   public void preSubscribe(SubscribeRequest not) {
476     // do nothing
477
}
478   
479   public void postSubscribe(SubscribeRequest not) {
480     // do nothing
481
}
482   
483   /**
484    * Method implementing the reaction to a <code>SubscribeRequest</code>
485    * instance.
486    *
487    * @exception AccessException If the sender is not a READER.
488    */

489   public void subscribeRequest(AgentId from, SubscribeRequest not) throws AccessException {
490     if (! isReader(from))
491       throw new AccessException("READ right not granted");
492
493     preSubscribe(not);
494     
495     // Adding new subscriber.
496
if (! subscribers.contains(from)) {
497       // state change, so save.
498
setSave();
499       subscribers.add(from);
500     }
501
502     // state change, so save.
503
setSave();
504
505     // The requester might either be a new subscriber, or an existing one;
506
// setting the selector, possibly by removing or modifying an already set
507
// expression.
508
if (not.getSelector() != null && ! not.getSelector().equals(""))
509       selectors.put(from, not.getSelector());
510     else
511       selectors.remove(from);
512
513     forward(from, new SubscribeReply(not));
514
515     postSubscribe(not);
516     
517     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
518       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
519                                       "Client " + from
520                                       + " set as a subscriber with selector "
521                                       + not.getSelector());
522   }
523
524   public void preUnsubscribe(UnsubscribeRequest not) {
525     // do nothing
526
}
527   
528   public void postUnsubscribe(UnsubscribeRequest not) {
529     // do nothing
530
}
531   
532   /**
533    * Method implementing the reaction to an <code>UnsubscribeRequest</code>
534    * instance, requesting to remove a subscriber.
535    */

536   public void unsubscribeRequest(AgentId from, UnsubscribeRequest not) {
537     
538     preUnsubscribe(not);
539     
540     // state change, so save.
541
setSave();
542     subscribers.remove(from);
543     selectors.remove(from);
544
545     postUnsubscribe(not);
546     
547     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
548       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
549                                     "Client " + from
550                                     + " removed from the subscribers.");
551   }
552
553   /**
554    * Method implementing the reaction to a <code>TopicForwardNot</code>
555    * instance, carrying messages forwarded by a cluster fellow or a
556    * hierarchical son.
557    */

558   public void topicForwardNot(AgentId from, TopicForwardNot not) {
559     // If the forward comes from a son, forwarding it to the father, if any.
560
if (not.toFather && fatherId != null) {
561       forward(fatherId, not);
562       alreadySentLocally = fatherId.getTo() == AgentServer.getServerId();
563     }
564     
565     // Processing the received messages.
566
processMessages(not.messages);
567   }
568
569   public void destinationAdminRequestNot(AgentId from, DestinationAdminRequestNot not) {
570     org.objectweb.joram.shared.admin.AdminRequest adminRequest =
571       not.getRequest();
572     if (adminRequest instanceof GetSubscriberIds) {
573       getSubscriberIds((GetSubscriberIds)adminRequest,
574               not.getReplyTo(),
575               not.getRequestMsgId(),
576               not.getReplyMsgId());
577     }
578   }
579
580   private void getSubscriberIds(GetSubscriberIds request,
581                                 AgentId replyTo,
582                                 String JavaDoc requestMsgId,
583                                 String JavaDoc replyMsgId) {
584     GetSubscriberIdsRep reply =
585       new GetSubscriberIdsRep(getSubscriberIds());
586     replyToTopic(reply, replyTo, requestMsgId, replyMsgId);
587   }
588
589   /**
590    * Returns the list of unique identifiers of all subscribers. Each user
591    * appears once even if there is multiples subscriptions, the different
592    * subscriptions can be enumerate through the proxy MBean.
593    *
594    * @return the list of unique identifiers of all subscribers.
595    */

596   public String JavaDoc[] getSubscriberIds() {
597     String JavaDoc[] res = new String JavaDoc[subscribers.size()];
598     for (int i = 0; i < res.length; i++) {
599       AgentId aid = (AgentId)subscribers.elementAt(i);
600       res[i] = aid.toString();
601     }
602     return res;
603   }
604   
605   /**
606    * Method specifically processing a <code>SetRightRequest</code> instance.
607    * <p>
608    * When a reader is removed, deleting this reader's subscription if any,
609    * and sending an <code>ExceptionReply</code> notification to the client.
610    */

611   protected void doRightRequest(SetRightRequest not) {
612     // If the request does not unset a reader, doing nothing.
613
if (not.getRight() != -READ)
614       return;
615
616     SetRightRequest rightRequest = preProcess(not);
617     
618     if (rightRequest != null) {
619       AgentId user = rightRequest.getClient();
620       AccessException exc = new AccessException("READ right removed.");
621
622       // Identified user: removing it.
623
if (user != null) {
624         // state change, so save.
625
setSave();
626         subscribers.remove(user);
627         selectors.remove(user);
628         forward(user, new ExceptionReply(exc));
629       }
630       // Free reading right removed: removing all non readers.
631
else {
632         for (Enumeration JavaDoc subs = subscribers.elements();
633         subs.hasMoreElements();) {
634           user = (AgentId) subs.nextElement();
635           if (! isReader(user)) {
636             // state change, so save.
637
setSave();
638             subscribers.remove(user);
639             selectors.remove(user);
640             forward(user, new ExceptionReply(exc));
641           }
642         }
643       }
644
645       postProcess(rightRequest);
646     }
647   }
648
649   /**
650    * Method specifically processing a <code>ClientMessages</code> instance.
651    * <p>
652    * This method may forward the messages to the topic father if any, or
653    * to the cluster fellows if any.It may finally send
654    * <code>TopicMsgsReply</code> instances to the valid subscribers.
655    */

656   protected void doClientMessages(AgentId from, ClientMessages not) {
657     ClientMessages clientMsgs = preProcess(from, not);
658
659     if (clientMsgs != null) {
660       // Forwarding the messages to the father or the cluster fellows, if any:
661
forwardMessages(clientMsgs);
662       // Processing the messages:
663
processMessages(clientMsgs);
664       
665       postProcess(clientMsgs);
666     }
667   }
668
669   /**
670    * Method specifically processing an <code>UnknownAgent</code> instance.
671    * <p>
672    * This method notifies the administrator of the failing cluster or
673    * hierarchy building request, if needed, or removes the subscriptions of
674    * the deleted client, if any, or sets the father identifier to null if it
675    * comes from a deleted father.
676    */

677   protected void doUnknownAgent(UnknownAgent uA) {
678     AgentId agId = uA.agent;
679     Notification not = uA.not;
680
681     // Deleted topic was requested to join the cluster: notifying the
682
// requester:
683
String JavaDoc info = null;
684     if (not instanceof ClusterTest) {
685       ClusterTest cT = (ClusterTest) not;
686       info = strbuf.append("Topic [").append(agId)
687         .append("] can't join cluster as it does not exist").toString();
688       strbuf.setLength(0);
689       forward(cT.requester, new AdminReply(cT.request, false, info));
690     } else if (not instanceof FatherTest) {
691     // Deleted topic was requested as a father: notifying the requester:
692
FatherTest fT = (FatherTest) not;
693       info = strbuf.append("Topic [").append(agId)
694         .append("] can't join hierarchy as it does not exist").toString();
695       strbuf.setLength(0);
696       forward(fT.requester, new AdminReply(fT.request, false, info));
697     } else {
698       // state change, so save.
699
setSave();
700       // Removing the deleted client's subscriptions, if any.
701
subscribers.remove(agId);
702       selectors.remove(agId);
703
704       // Removing the father identifier, if needed.
705
if (fatherId != null && agId.equals(fatherId)) {
706         // state change, so save.
707
setSave();
708         fatherId = null;
709       }
710     }
711   }
712
713   /**
714    * Method specifically processing a
715    * <code>fr.dyade.aaa.agent.DeleteNot</code> instance.
716    * <p>
717    * <code>UnknownAgent</code> notifications are sent to each subscriber
718    * and <code>UnclusterNot</code> notifications to the cluster
719    * fellows.
720    */

721   protected void doDeleteNot(DeleteNot not) {
722     AgentId clientId;
723     Vector JavaDoc subs;
724     SubscribeRequest sub;
725
726     // For each subscriber...
727
for (int i = 0; i < subscribers.size(); i++) {
728       clientId = (AgentId) subscribers.get(i);
729       forward(clientId, new UnknownAgent(destId, null));
730     }
731
732     // For each cluster fellow if any...
733
if (friends != null) {
734       AgentId topicId;
735       while (! friends.isEmpty()) {
736         // state change, so save.
737
setSave();
738         topicId = (AgentId) friends.remove(0);
739         forward(topicId, new UnclusterNot());
740       }
741     }
742   }
743
744   /**
745    * Actually forwards a vector of messages to the father or the cluster
746    * fellows, if any.
747    */

748   protected void forwardMessages(ClientMessages messages)
749   {
750     if (friends != null && ! friends.isEmpty()) {
751       AgentId topicId;
752       for (int i = 0; i < friends.size(); i++) {
753         topicId = (AgentId) friends.get(i);
754         forward(topicId, new TopicForwardNot(messages, false));
755
756         if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
757           JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "Messages "
758                                         + "forwarded to fellow "
759                                         + topicId.toString());
760       }
761     } else if (fatherId != null) {
762       forward(fatherId, new TopicForwardNot(messages, true));
763
764       if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
765         JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "Messages "
766                                       + "forwarded to father "
767                                       + fatherId.toString());
768     }
769   }
770
771   /**
772    * Actually processes the distribution of the received messages to the
773    * valid subscriptions by sending a <code>TopicMsgsReply</code> notification
774    * to the valid subscribers.
775    */

776   protected void processMessages(ClientMessages not) {
777     Vector JavaDoc messages = not.getMessages();
778     AgentId subscriber;
779     boolean local;
780     String JavaDoc selector;
781     Vector JavaDoc deliverables;
782     Message message;
783
784     nbMsgsReceiveSinceCreation = nbMsgsReceiveSinceCreation + messages.size();
785
786     setNoSave();
787     boolean persistent = false;
788
789     for (Enumeration JavaDoc subs = subscribers.elements(); subs.hasMoreElements();) {
790       // Browsing the subscribers.
791
subscriber = (AgentId) subs.nextElement();
792       local = (subscriber.getTo() == AgentServer.getServerId());
793       selector = (String JavaDoc) selectors.get(subscriber);
794
795       if (selector == null || selector.equals("")) {
796         // Current subscriber does not filter messages: all messages
797
// will be sent.
798
if (! local) {
799           // Subscriber not local, or no other sending occured locally:
800
// directly sending the messages.
801
deliverables = messages;
802           persistent = true;
803         } else if (! alreadySentLocally) {
804           deliverables = messages;
805           alreadySentLocally = true;
806         }
807         // A local sending already occured: cloning the messages.
808
else {
809           deliverables = new Vector JavaDoc();
810           for (Enumeration JavaDoc msgs = messages.elements(); msgs.hasMoreElements();)
811             deliverables.add(((Message) msgs.nextElement()).clone());
812         }
813       } else {
814         // Current subscriber filters messages; sending the matching messages.
815
deliverables = new Vector JavaDoc();
816         for (int i = 0; i < messages.size(); i++) {
817           message = (Message) messages.get(i);
818         
819           if (Selector.matches(message, selector)) {
820
821             // Subscriber not local, or no other sending occured locally:
822
// directly sending the message.
823
if (! local) {
824               deliverables.add(message);
825               persistent = true;
826             } else if (! alreadySentLocally) {
827               deliverables.add(message);
828               alreadySentLocally = true;
829             }
830             // A local sending already occured: cloning the message.
831
else
832               deliverables.add(message.clone());
833           }
834         }
835       }
836       // There are messages to send.
837
if (! deliverables.isEmpty()) {
838         TopicMsgsReply topicMsgsReply = new TopicMsgsReply(deliverables);
839         topicMsgsReply.setPersistent(persistent);
840         forward(subscriber, topicMsgsReply);
841         nbMsgsDeliverSinceCreation = nbMsgsDeliverSinceCreation + deliverables.size();
842       }
843     }
844   }
845
846   public void setAlreadySentLocally(boolean alreadySentLocally) {
847     this.alreadySentLocally = alreadySentLocally;
848   }
849 }
850
Popular Tags