KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > proxies > ProxyImpl


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies
4  * Copyright (C) 2004 France Telecom R&D
5  * Copyright (C) 2003 - 2004 Bull SA
6  * Copyright (C) 1996 - 2000 Dyade
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License as published by the Free Software Foundation; either
11  * version 2.1 of the License, or any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21  * USA.
22  *
23  * Initial developer(s): Frederic Maistre (INRIA)
24  * Contributor(s): ScalAgent Distributed Technologies
25  */

26 package org.objectweb.joram.mom.proxies;
27
28 import java.io.IOException JavaDoc;
29 import java.io.ObjectInputStream JavaDoc;
30 import java.io.ObjectOutputStream JavaDoc;
31
32 import java.util.Enumeration JavaDoc;
33 import java.util.Hashtable JavaDoc;
34 import java.util.Vector JavaDoc;
35
36 import fr.dyade.aaa.agent.AgentId;
37 import fr.dyade.aaa.agent.DeleteNot;
38 import fr.dyade.aaa.agent.Notification;
39 import fr.dyade.aaa.agent.UnknownAgent;
40 import fr.dyade.aaa.agent.UnknownNotificationException;
41 import fr.dyade.aaa.agent.Channel;
42
43 import org.objectweb.joram.mom.dest.*;
44 import org.objectweb.joram.mom.notifications.*;
45 import org.objectweb.joram.mom.messages.Message;
46
47 import org.objectweb.joram.shared.admin.DeleteUser;
48 import org.objectweb.joram.shared.admin.UpdateUser;
49
50 import org.objectweb.joram.shared.admin.GetSubscriptions;
51 import org.objectweb.joram.shared.admin.GetSubscriptionsRep;
52 import org.objectweb.joram.shared.admin.GetSubscriptionMessageIds;
53 import org.objectweb.joram.shared.admin.GetSubscriptionMessageIdsRep;
54 import org.objectweb.joram.shared.admin.GetSubscriptionMessage;
55 import org.objectweb.joram.shared.admin.GetSubscriptionMessageRep;
56 import org.objectweb.joram.shared.admin.DeleteSubscriptionMessage;
57 import org.objectweb.joram.shared.admin.GetSubscription;
58 import org.objectweb.joram.shared.admin.GetSubscriptionRep;
59 import org.objectweb.joram.shared.admin.ClearSubscription;
60
61 import org.objectweb.joram.shared.client.*;
62 import org.objectweb.joram.shared.excepts.*;
63
64 import javax.management.openmbean.CompositeDataSupport JavaDoc;
65
66 import fr.dyade.aaa.util.Debug;
67 import org.objectweb.util.monolog.api.BasicLevel;
68 import org.objectweb.util.monolog.api.Logger;
69
70 /**
71  * The <code>ProxyImpl</code> class implements the MOM proxy behaviour,
72  * basically forwarding client requests to MOM destinations and MOM
73  * destinations replies to clients.
74  */

75 public class ProxyImpl implements java.io.Serializable JavaDoc, ProxyImplMBean {
76   public static Logger logger = Debug.getLogger(ProxyImpl.class.getName());
77
78   /** period to run the cleaning task, by default 60s. */
79   protected long period = 60000L;
80
81   /**
82    * Returns the period value of this queue, -1 if not set.
83    *
84    * @return the period value of this queue; -1 if not set.
85    */

86   public long getPeriod() {
87     return period;
88   }
89
90   /**
91    * Sets or unsets the period for this queue.
92    *
93    * @param period The period value to be set or -1 for unsetting previous
94    * value.
95    */

96   public void setPeriod(long period) {
97     if ((this.period == -1L) && (period != -1L)) {
98       // Schedule the CleaningTask.
99
Channel.sendTo(proxyAgent.getId(), new WakeUpNot());
100     }
101     this.period = period;
102   }
103
104   /**
105    * Identifier of this proxy dead message queue, <code>null</code> for DMQ
106    * not set.
107    */

108   private AgentId dmqId = null;
109   /**
110    * Threshold value, 0 or negative for no threshold, <code>null</code> for
111    * value not set.
112    */

113   private Integer JavaDoc threshold = null;
114
115   /**
116    * Table of the proxy's <code>ClientContext</code> instances.
117    * <p>
118    * <b>Key:</b> context identifier<br>
119    * <b>Value:</b> context
120    */

121   private Hashtable JavaDoc contexts;
122   /**
123    * Table holding the <code>ClientSubscription</code> instances.
124    * <p>
125    * <b>Key:</b> subsription name<br>
126    * <b>Value:</b> client subscription
127    */

128   private Hashtable JavaDoc subsTable;
129   /**
130    * Table holding the recovered transactions branches.
131    * <p>
132    * <b>Key:</b> transaction identifier<br>
133    * <b>Value:</b> <code>XACnxPrepare</code> instance
134    */

135   private Hashtable JavaDoc recoveredTransactions;
136
137   /** Counter of message arrivals from topics. */
138   private long arrivalsCounter = 0;
139
140   /** The reference of the agent hosting the proxy. */
141   private ProxyAgentItf proxyAgent;
142   /**
143    * Table holding the <code>TopicSubscription</code> instances.
144    * <p>
145    * <b>Key:</b> topic identifier<br>
146    * <b>Value:</b> topic subscription
147    */

148   private transient Hashtable JavaDoc topicsTable;
149   /**
150    * Table holding the subsriptions' messages.
151    * <p>
152    * <b>Key:</b> message identifier<br>
153    * <b>Value:</b> message
154    */

155   private transient Hashtable JavaDoc messagesTable;
156
157   /**
158    * Identifier of the active context.
159    * Value -1 means that there's no active
160    * context.
161    */

162   private transient int activeCtxId;
163   /** Reference to the active <code>ClientContext</code> instance. */
164   private transient ClientContext activeCtx;
165
166   /**
167    * Constructs a <code>ProxyImpl</code> instance.
168    */

169   public ProxyImpl(ProxyAgentItf proxyAgent) {
170     contexts = new Hashtable JavaDoc();
171     subsTable = new Hashtable JavaDoc();
172     this.proxyAgent = proxyAgent;
173     if (logger.isLoggable(BasicLevel.DEBUG))
174       logger.log(BasicLevel.DEBUG, this + ": created.");
175   }
176
177   /**
178    * Returns a string representation of this user's proxy.
179    */

180   public String JavaDoc toString() {
181     if (proxyAgent == null)
182       return "ProxyImpl:";
183     else
184       return "ProxyImpl:" + proxyAgent.getId();
185   }
186
187
188   /**
189    * (Re)initializes the proxy.
190    *
191    * @exception Exception If the proxy state could not be fully retrieved,
192    * leading to an inconsistent state.
193    */

194   public void initialize(boolean firstTime) throws Exception JavaDoc {
195     if (logger.isLoggable(BasicLevel.DEBUG))
196       logger.log(BasicLevel.DEBUG, "--- " + this + " (re)initializing...");
197  
198     topicsTable = new Hashtable JavaDoc();
199     messagesTable = new Hashtable JavaDoc();
200
201     setActiveCtxId(-1);
202     
203     // Re-initializing after a crash or a server stop.
204

205     // Browsing the pre-crash contexts:
206
ClientContext activeCtx;
207     AgentId destId;
208     for (Enumeration JavaDoc ctxIds = contexts.keys(); ctxIds.hasMoreElements();) {
209       activeCtx = (ClientContext) contexts.remove(ctxIds.nextElement());
210
211       // Denying the non acknowledged messages:
212
for (Enumeration JavaDoc queueIds = activeCtx.getDeliveringQueues();
213            queueIds.hasMoreElements();) {
214         destId = (AgentId) queueIds.nextElement();
215         proxyAgent.sendNot(destId, new DenyRequest(activeCtx.getId()));
216
217         if (logger.isLoggable(BasicLevel.DEBUG))
218           logger.log(BasicLevel.DEBUG,
219                      "Denies messages on queue " + destId.toString());
220       }
221
222       // Saving the prepared transactions.
223
Enumeration JavaDoc xids = activeCtx.getTxIds();
224       Xid xid;
225       XACnxPrepare recoveredPrepare;
226       XACnxPrepare prepare;
227       while (xids.hasMoreElements()) {
228         if (recoveredTransactions == null)
229           recoveredTransactions = new Hashtable JavaDoc();
230
231         xid = (Xid) xids.nextElement();
232
233         recoveredPrepare = (XACnxPrepare) recoveredTransactions.get(xid);
234         prepare = activeCtx.getTxPrepare(xid);
235
236         if (recoveredPrepare == null)
237           recoveredTransactions.put(xid, prepare);
238         else {
239           recoveredPrepare.getSendings().addAll(prepare.getSendings());
240           recoveredPrepare.getAcks().addAll(prepare.getAcks());
241         }
242       }
243
244       // Deleting the temporary destinations:
245
for (Enumeration JavaDoc tempDests = activeCtx.getTempDestinations();
246            tempDests.hasMoreElements();) {
247         destId = (AgentId) tempDests.nextElement();
248         deleteTemporaryDestination(destId);
249   
250         if (logger.isLoggable(BasicLevel.DEBUG))
251           logger.log(BasicLevel.DEBUG,
252                      "Deletes temporary destination " + destId.toString());
253       }
254     }
255
256     // Retrieving the subscriptions' messages.
257
Vector JavaDoc messages = Message.loadAll(getMsgTxname());
258
259     if (subsTable.isEmpty()) {
260       // it is possible because we always save MessageSoftRef
261
// so we must delete all message.
262
Message.deleteAll(getMsgTxname());
263     }
264     
265     // Browsing the pre-crash subscriptions:
266
String JavaDoc subName;
267     ClientSubscription cSub;
268     Vector JavaDoc topics = new Vector JavaDoc();
269     TopicSubscription tSub;
270     for (Enumeration JavaDoc subNames = subsTable.keys();
271          subNames.hasMoreElements();) {
272       subName = (String JavaDoc) subNames.nextElement();
273       cSub = (ClientSubscription) subsTable.get(subName);
274       destId = cSub.getTopicId();
275       if (! topics.contains(destId))
276         topics.add(destId);
277       // Deleting the non durable subscriptions.
278
if (! cSub.getDurable())
279         subsTable.remove(subName);
280       // Reinitializing the durable ones.
281
else {
282         cSub.setProxyAgent(proxyAgent);
283         cSub.reinitialize(getStringId(),
284                           messagesTable,
285                           messages,
286                           true);
287         tSub = (TopicSubscription) topicsTable.get(destId);
288         if (tSub == null) {
289           tSub = new TopicSubscription();
290           topicsTable.put(destId, tSub);
291         }
292         tSub.putSubscription(subName, cSub.getSelector());
293       }
294     }
295     // Browsing the topics and updating their subscriptions.
296
for (Enumeration JavaDoc topicIds = topics.elements();
297          topicIds.hasMoreElements();)
298       updateSubscriptionToTopic((AgentId) topicIds.nextElement(), -1, -1);
299   }
300
301   private void setActiveCtxId(int activeCtxId) {
302     if (logger.isLoggable(BasicLevel.DEBUG))
303       logger.log(BasicLevel.DEBUG,
304                  "ProxyImpl.setActiveCtxId(" + activeCtxId + ')');
305     this.activeCtxId = activeCtxId;
306   }
307
308   /**
309    * Method processing clients requests.
310    * <p>
311    * Some of the client requests are directly forwarded, some others are
312    * sent to the proxy so that their processing occurs in a transaction.
313    * <p>
314    * A <code>MomExceptionReply</code> wrapping a <tt>DestinationException</tt>
315    * might be sent back if a target destination can't be identified.
316    */

317   public void reactToClientRequest(int key, AbstractJmsRequest request)
318   {
319     try {
320       if (logger.isLoggable(BasicLevel.DEBUG))
321         logger.log(BasicLevel.DEBUG,
322                    "--- " + this + " got " + request.getClass().getName() +
323                    " with id: " + request.getRequestId() +
324                    " through activeCtx: " + key);
325
326       if (request instanceof ProducerMessages)
327         reactToClientRequest(key, (ProducerMessages) request);
328       else if (request instanceof ConsumerReceiveRequest)
329         reactToClientRequest(key, (ConsumerReceiveRequest) request);
330       else if (request instanceof ConsumerSetListRequest)
331         reactToClientRequest(key, (ConsumerSetListRequest) request);
332       else if (request instanceof QBrowseRequest)
333         reactToClientRequest(key, (QBrowseRequest) request);
334       else if (request instanceof JmsRequestGroup)
335         reactToClientRequest(key, (JmsRequestGroup) request);
336       else {
337         doReact(key, request);
338       }
339     }
340     // Catching an exception due to an invalid agent identifier to
341
// forward the request to:
342
catch (IllegalArgumentException JavaDoc iE) {
343       DestinationException dE =
344         new DestinationException("Proxy could not forward the request to"
345                                  + " incorrectly identified destination: "
346                                  + iE);
347
348       doReply(key, new MomExceptionReply(request.getRequestId(), dE));
349     }
350   }
351
352   /**
353    * Forwards the messages sent by the client in a
354    * <code>ProducerMessages</code> request as a <code>ClientMessages</code>
355    * MOM request directly to a destination, and acknowledges them by sending
356    * a <code>ServerReply</code> back.
357    */

358   private void reactToClientRequest(int key, ProducerMessages req) {
359     if (logger.isLoggable(BasicLevel.DEBUG))
360       logger.log(BasicLevel.DEBUG,
361                  "ProxyImpl.reactToClientRequest(" + key + ',' + req + ')');
362
363     AgentId destId = AgentId.fromString(req.getTarget());
364     ClientMessages not = new ClientMessages(
365       key,
366       req.getRequestId(),
367       req.getMessages());
368
369     setDmq(not);
370     
371     
372     if (destId.getTo() == proxyAgent.getId().getTo()) {
373       if (logger.isLoggable(BasicLevel.DEBUG))
374         logger.log(BasicLevel.DEBUG, " -> local sending");
375       not.setPersistent(false);
376       if (req.getAsyncSend()) {
377         not.setAsyncSend(true);
378       }
379     } else {
380       if (logger.isLoggable(BasicLevel.DEBUG))
381         logger.log(BasicLevel.DEBUG, " -> remote sending");
382       if (!req.getAsyncSend()) {
383         proxyAgent.sendNot(proxyAgent.getId(),
384                            new SendReplyNot(key, req.getRequestId()));
385       }
386     }
387     
388     proxyAgent.sendNot(destId, not);
389   }
390   
391   private void setDmq(ClientMessages not) {
392     // Setting the producer's DMQ identifier field:
393
if (dmqId != null) {
394       not.setDMQId(dmqId);
395     } else {
396       not.setDMQId(DeadMQueueImpl.getId());
397     }
398   }
399
400   /**
401    * Either forwards the <code>ConsumerReceiveRequest</code> request as a
402    * <code>ReceiveRequest</code> directly to the target queue, or wraps it
403    * and sends it to the proxy if destinated to a subscription.
404    */

405   private void reactToClientRequest(int key, ConsumerReceiveRequest req)
406   {
407     if (req.getQueueMode()) {
408       ReceiveRequest not = new ReceiveRequest(
409         key,
410         req.getRequestId(),
411         req.getSelector(),
412         req.getTimeToLive(),
413         req.getReceiveAck(),
414         null,
415         1);
416       AgentId to = AgentId.fromString(req.getTarget());
417       if (to.getTo() == proxyAgent.getId().getTo()) {
418         if (logger.isLoggable(BasicLevel.DEBUG))
419           logger.log(BasicLevel.DEBUG, " -> local receiving");
420         not.setPersistent(false);
421         proxyAgent.sendNot(to, not);
422       } else {
423         proxyAgent.sendNot(to, not);
424       }
425     } else {
426       doReact(key, req);
427     }
428   }
429
430   /**
431    * Either forwards the <code>ConsumerSetListRequest</code> request as a
432    * <code>ReceiveRequest</code> directly to the target queue, or wraps it
433    * and sends it to the proxy if destinated to a subscription.
434    */

435   private void reactToClientRequest(int key, ConsumerSetListRequest req) {
436     if (logger.isLoggable(BasicLevel.DEBUG))
437       logger.log(BasicLevel.DEBUG,
438           "ProxyImp.reactToClientRequest(" + key + ',' + req + ')');
439     if (req.getQueueMode()) {
440       ReceiveRequest not = new ReceiveRequest(key,
441                                               req.getRequestId(),
442                                               req.getSelector(),
443                                               0,
444                                               false,
445                                               req.getMessageIdsToAck(),
446                                               req.getMessageCount());
447       AgentId to = AgentId.fromString(req.getTarget());
448       if (to.getTo() == proxyAgent.getId().getTo()) {
449         if (logger.isLoggable(BasicLevel.DEBUG))
450           logger.log(BasicLevel.DEBUG, " -> local sending");
451         not.setPersistent(false);
452         proxyAgent.sendNot(to, not);
453       } else {
454         proxyAgent.sendNot(to, not);
455       }
456     }
457     else {
458       doReact(key, req);
459     }
460   }
461
462   /**
463    * Forwards the client's <code>QBrowseRequest</code> request as
464    * a <code>BrowseRequest</code> MOM request directly to a destination.
465    */

466   private void reactToClientRequest(int key, QBrowseRequest req)
467   {
468     proxyAgent.sendNot(AgentId.fromString(req.getTarget()),
469                        new BrowseRequest(key,
470                                          req.getRequestId(),
471                                          req.getSelector()));
472   }
473   
474   private void reactToClientRequest(int key, JmsRequestGroup request) {
475     AbstractJmsRequest[] requests = request.getRequests();
476     RequestBuffer rm = new RequestBuffer(proxyAgent);
477     for (int i = 0; i < requests.length; i++) {
478       if (requests[i] instanceof ProducerMessages) {
479         ProducerMessages pm =(ProducerMessages) requests[i];
480         rm.put(key, pm);
481       } else {
482         reactToClientRequest(key, requests[i]);
483       }
484     }
485     
486     rm.flush();
487   }
488
489   /**
490    * Distributes the received notifications to the appropriate reactions.
491    * <p>
492    * A JMS proxy reacts to:
493    * <ul>
494    * <li><code>SyncReply</code> proxy synchronizing notification,</li>
495    * <li><code>SetDMQRequest</code> admin notification,</li>
496    * <li><code>SetThreshRequest</code> admin notification,</li>
497    * <li><code>SetNbMaxMsgRequest</code> admin notification,</li>
498    * <li><code>Monit_GetNbMaxMsg</code> admin notification,</li>
499    * <li><code>Monit_GetDMQSettings</code> monitoring notification,</li>
500    * <li><code>AbstractReply</code> destination replies,</li>
501    * <li><code>AdminReply</code> administration replies,</li>
502    * <li><code>fr.dyade.aaa.agent.UnknownAgent</code>.</li>
503    * </ul>
504    *
505    * @exception UnknownNotificationException
506    * If the notification is not expected.
507    */

508   public void react(AgentId from, Notification not)
509               throws UnknownNotificationException
510   {
511     // Administration and monitoring requests:
512
if (not instanceof SetDMQRequest)
513       doReact(from, (SetDMQRequest) not);
514     else if (not instanceof SetThreshRequest)
515       doReact(from, (SetThreshRequest) not);
516     else if (not instanceof SetNbMaxMsgRequest)
517       doReact(from, (SetNbMaxMsgRequest) not);
518     else if (not instanceof Monit_GetNbMaxMsg)
519       doReact(from, (Monit_GetNbMaxMsg) not);
520     else if (not instanceof Monit_GetDMQSettings)
521       doReact(from, (Monit_GetDMQSettings) not);
522     // Synchronization notification:
523
else if (not instanceof SyncReply)
524       doReact((SyncReply) not);
525     // Notifications sent by a destination:
526
else if (not instanceof AbstractReply)
527       doFwd(from, (AbstractReply) not);
528     else if (not instanceof AdminReply)
529       doReact((AdminReply) not);
530     // Platform notifications:
531
else if (not instanceof UnknownAgent)
532       doReact((UnknownAgent) not);
533     else if (not instanceof UserAdminRequestNot)
534       doReact((UserAdminRequestNot) not);
535     else
536       throw new UnknownNotificationException("Unexpected notification: "
537                                              + not.getClass().getName());
538   }
539
540   
541   /**
542    * Distributes the client requests to the appropriate reactions.
543    * <p>
544    * The proxy accepts the following requests:
545    * <ul>
546    * <li><code>GetAdminTopicRequest</code></li>
547    * <li><code>CnxConnectRequest</code></li>
548    * <li><code>CnxStartRequest</code></li>
549    * <li><code>CnxStopRequest</code></li>
550    * <li><code>SessCreateTQRequest</code></li>
551    * <li><code>SessCreateTTRequest</code></li>
552    * <li><code>ConsumerSubRequest</code></li>
553    * <li><code>ConsumerUnsubRequest</code></li>
554    * <li><code>ConsumerCloseSubRequest</code></li>
555    * <li><code>ConsumerSetListRequest</code></li>
556    * <li><code>ConsumerUnsetListRequest</code></li>
557    * <li><code>ConsumerReceiveRequest</code></li>
558    * <li><code>ConsumerAckRequest</code></li>
559    * <li><code>ConsumerDenyRequest</code></li>
560    * <li><code>SessAckRequest</code></li>
561    * <li><code>SessDenyRequest</code></li>
562    * <li><code>TempDestDeleteRequest</code></li>
563    * <li><code>XACnxPrepare</code></li>
564    * <li><code>XACnxCommit</code></li>
565    * <li><code>XACnxRollback</code></li>
566    * <li><code>XACnxRecoverRequest</code></li>
567    * </ul>
568    * <p>
569    * A <code>JmsExceptReply</code> is sent back to the client when an
570    * exception is thrown by the reaction.
571    */

572   private void doReact(int key, AbstractJmsRequest request)
573   {
574     try {
575       // Updating the active context if the request is not a new context
576
// request!
577
if (! (request instanceof CnxConnectRequest))
578         setCtx(key);
579
580       if (request instanceof GetAdminTopicRequest)
581         doReact(key, (GetAdminTopicRequest) request);
582       else if (request instanceof CnxConnectRequest)
583         doReact(key, (CnxConnectRequest) request);
584       else if (request instanceof CnxStartRequest)
585         doReact((CnxStartRequest) request);
586       else if (request instanceof CnxStopRequest)
587         doReact((CnxStopRequest) request);
588       else if (request instanceof SessCreateTQRequest)
589         doReact((SessCreateTQRequest) request);
590       else if (request instanceof SessCreateTTRequest)
591         doReact((SessCreateTTRequest) request);
592       else if (request instanceof ConsumerSubRequest)
593         doReact((ConsumerSubRequest) request);
594       else if (request instanceof ConsumerUnsubRequest)
595         doReact((ConsumerUnsubRequest) request);
596       else if (request instanceof ConsumerCloseSubRequest)
597         doReact((ConsumerCloseSubRequest) request);
598       else if (request instanceof ConsumerSetListRequest)
599         doReact((ConsumerSetListRequest) request);
600       else if (request instanceof ConsumerUnsetListRequest)
601         doReact((ConsumerUnsetListRequest) request);
602       else if (request instanceof ConsumerReceiveRequest)
603         doReact((ConsumerReceiveRequest) request);
604       else if (request instanceof ConsumerAckRequest)
605         doReact((ConsumerAckRequest) request);
606       else if (request instanceof ConsumerDenyRequest)
607         doReact((ConsumerDenyRequest) request);
608       else if (request instanceof SessAckRequest)
609         doReact((SessAckRequest) request);
610       else if (request instanceof SessDenyRequest)
611         doReact((SessDenyRequest) request);
612       else if (request instanceof TempDestDeleteRequest)
613         doReact((TempDestDeleteRequest) request);
614       else if (request instanceof XACnxPrepare)
615         doReact((XACnxPrepare) request);
616       else if (request instanceof XACnxCommit)
617         doReact((XACnxCommit) request);
618       else if (request instanceof XACnxRollback)
619         doReact((XACnxRollback) request);
620       else if (request instanceof XACnxRecoverRequest)
621         doReact((XACnxRecoverRequest) request);
622       else if (request instanceof CnxCloseRequest)
623         doReact(key, (CnxCloseRequest) request);
624       else if (request instanceof ActivateConsumerRequest)
625         doReact(key, (ActivateConsumerRequest) request);
626       else if (request instanceof CommitRequest)
627         doReact(key, (CommitRequest)request);
628     }
629     catch (MomException mE) {
630       if (logger.isLoggable(BasicLevel.ERROR))
631         logger.log(BasicLevel.ERROR, mE);
632
633       // Sending the exception to the client:
634
doReply(new MomExceptionReply(request.getRequestId(), mE));
635     }
636   }
637
638   /**
639    * Method implementing the reaction to a <code>GetAdminTopicRequest</code>
640    * requesting the identifier of the local admin topic.
641    * <p>
642    * It simply sends back a <code>GetAdminTopicReply</code> holding the
643    * admin topic identifier.
644    *
645    * @exception AccessException If the requester is not an administrator.
646    */

647   private void doReact(int key, GetAdminTopicRequest req)
648                throws AccessException
649   {
650 // if (! admin)
651
// throw new AccessException("Request forbidden to a non administrator.");
652

653     doReply(
654       key,
655       new GetAdminTopicReply(
656         req,
657         AdminTopicImpl.getReference().getId().toString()));
658   }
659
660   /**
661    * Method implementing the reaction to a <code>CnxConnectRequest</code>
662    * requesting the key of the active context.
663    * <p>
664    * It simply sends back a <code>ConnectReply</code> holding the active
665    * context's key.
666    *
667    * @exception DestinationException In case of a first administrator
668    * context, if the local administration topic reference
669    * is not available.
670    */

671   private void doReact(int key, CnxConnectRequest req)
672     throws DestinationException {
673     // state change, so save.
674
proxyAgent.setSave();
675
676     setActiveCtxId(key);
677     activeCtx = new ClientContext(proxyAgent.getId(), key);
678     activeCtx.setProxyAgent(proxyAgent);
679     contexts.put(new Integer JavaDoc(key), activeCtx);
680     
681     if (logger.isLoggable(BasicLevel.DEBUG))
682       logger.log(BasicLevel.DEBUG, "Connection " + key + " opened.");
683
684     doReply(new CnxConnectReply(req, key, proxyAgent.getId().toString()));
685   }
686
687   /**
688    * Method implementing the proxy reaction to a <code>CnxStartRequest</code>
689    * requesting to start a context.
690    * <p>
691    * This method sends the pending <code>ConsumerMessages</code> replies,
692    * if any.
693    */

694   private void doReact(CnxStartRequest req) {
695     activeCtx.setActivated(true);
696
697     // Delivering the pending deliveries, if any:
698
for (Enumeration JavaDoc deliveries = activeCtx.getPendingDeliveries();
699          deliveries.hasMoreElements();)
700       doReply((AbstractJmsReply) deliveries.nextElement());
701
702     // Clearing the pending deliveries.
703
activeCtx.clearPendingDeliveries();
704   }
705
706   /**
707    * Method implementing the JMS proxy reaction to a
708    * <code>CnxStopRequest</code> requesting to stop a context.
709    * <p>
710    * This method sends a <code>ServerReply</code> back.
711    */

712   private void doReact(CnxStopRequest req) {
713     activeCtx.setActivated(false);
714     doReply(new ServerReply(req));
715   }
716
717   /**
718    * Method implementing the JMS proxy reaction to a
719    * <code>SessCreateTQRequest</code> requesting the creation of a temporary
720    * queue.
721    * <p>
722    * Creates the queue, sends it a <code>SetRightRequest</code> for granting
723    * WRITE access to all, and wraps a <code>SessCreateTDReply</code> in a
724    * <code>SyncReply</code> notification it sends to itself. This latest
725    * action's purpose is to preserve causality.
726    *
727    * @exception RequestException If the queue could not be deployed.
728    */

729   private void doReact(SessCreateTQRequest req) throws RequestException {
730     try {
731       Queue queue = new Queue();
732       queue.init(proxyAgent.getId(), null);
733       AgentId qId = queue.getId();
734
735       queue.deploy();
736
737       // Setting free WRITE right on the queue:
738
proxyAgent.sendNot(qId, new SetRightRequest(null, null, 2));
739
740       activeCtx.addTemporaryDestination(qId);
741
742       SessCreateTDReply reply = new SessCreateTDReply(req, qId.toString());
743       proxyAgent.sendNot(proxyAgent.getId(),
744                          new SyncReply(activeCtxId, reply));
745
746       proxyAgent.sendNot(AdminTopic.getDefault(),
747                          new RegisterTmpDestNot(qId, false, true));
748
749       if (logger.isLoggable(BasicLevel.DEBUG))
750         logger.log(BasicLevel.DEBUG, "Temporary queue " + qId + " created.");
751     }
752     catch (java.io.IOException JavaDoc iE) {
753       throw new RequestException("Could not create temporary queue: " + iE);
754     }
755   }
756
757   /**
758    * Method implementing the JMS proxy reaction to a
759    * <code>SessCreateTTRequest</code> requesting the creation of a temporary
760    * topic.
761    * <p>
762    * Creates the topic, sends it a <code>SetRightRequest</code> for granting
763    * WRITE access to all, and wraps a <code>SessCreateTDReply</code> in a
764    * <code>SyncReply</code> notification it sends to itself. This latest
765    * action's purpose is to preserve causality.
766    *
767    * @exception RequestException If the topic could not be deployed.
768    */

769   private void doReact(SessCreateTTRequest req) throws RequestException {
770     Topic topic = new Topic();
771     topic.init(proxyAgent.getId(), null);
772     AgentId tId = topic.getId();
773
774     try {
775       topic.deploy();
776
777       // Setting free WRITE right on the topic:
778
proxyAgent.sendNot(tId, new SetRightRequest(null, null, 2));
779
780       activeCtx.addTemporaryDestination(tId);
781
782       SessCreateTDReply reply = new SessCreateTDReply(req, tId.toString());
783       proxyAgent.sendNot(proxyAgent.getId(),
784                          new SyncReply(activeCtxId, reply));
785
786       proxyAgent.sendNot(AdminTopic.getDefault(),
787                          new RegisterTmpDestNot(tId, true, true));
788
789       if (logger.isLoggable(BasicLevel.DEBUG))
790         logger.log(BasicLevel.DEBUG, "Temporary topic" + tId + " created.");
791     } catch (java.io.IOException JavaDoc iE) {
792       topic = null;
793       throw new RequestException("Could not deploy temporary topic "
794                                  + tId + ": " + iE);
795     }
796   }
797
798   /**
799    * Method implementing the JMS proxy reaction to a
800    * <code>ConsumerSubRequest</code> requesting to subscribe to a topic.
801    *
802    * @exception StateException If activating an already active durable
803    * subscription.
804    */

805   private void doReact(ConsumerSubRequest req) throws StateException {
806     AgentId topicId = AgentId.fromString(req.getTarget());
807     String JavaDoc subName = req.getSubName();
808     
809     boolean newTopic = ! topicsTable.containsKey(topicId);
810     boolean newSub = ! subsTable.containsKey(subName);
811
812     TopicSubscription tSub;
813     ClientSubscription cSub;
814
815     // true if a SubscribeRequest has been sent to the topic.
816
boolean sent = false;
817
818     if (newTopic) { // New topic...
819
tSub = new TopicSubscription();
820       topicsTable.put(topicId, tSub);
821     } else { // Known topic...
822
tSub = (TopicSubscription) topicsTable.get(topicId);
823     }
824
825     if (newSub) { // New subscription...
826
// state change, so save.
827
proxyAgent.setSave();
828       cSub = new ClientSubscription(proxyAgent.getId(),
829                                     activeCtxId,
830                                     req.getRequestId(),
831                                     req.getDurable(),
832                                     topicId,
833                                     req.getSubName(),
834                                     req.getSelector(),
835                                     req.getNoLocal(),
836                                     dmqId,
837                                     threshold,
838                                     messagesTable);
839       cSub.setProxyAgent(proxyAgent);
840      
841       if (logger.isLoggable(BasicLevel.DEBUG))
842         logger.log(BasicLevel.DEBUG, "Subscription " + subName + " created.");
843
844       subsTable.put(subName, cSub);
845       tSub.putSubscription(subName, req.getSelector());
846       sent =
847         updateSubscriptionToTopic(topicId, activeCtxId, req.getRequestId());
848     } else { // Existing durable subscription...
849
cSub = (ClientSubscription) subsTable.get(subName);
850
851       if (cSub.getActive())
852         throw new StateException("The durable subscription " + subName +
853                                  " has already been activated.");
854
855       // Updated topic: updating the subscription to the previous topic.
856
boolean updatedTopic = ! topicId.equals(cSub.getTopicId());
857       if (updatedTopic) {
858         TopicSubscription oldTSub =
859           (TopicSubscription) topicsTable.get(cSub.getTopicId());
860         oldTSub.removeSubscription(subName);
861         updateSubscriptionToTopic(cSub.getTopicId(), -1, -1);
862       }
863
864       // Updated selector?
865
boolean updatedSelector;
866       if (req.getSelector() == null && cSub.getSelector() != null)
867         updatedSelector = true;
868       else if (req.getSelector() != null && cSub.getSelector() == null)
869         updatedSelector = true;
870       else if (req.getSelector() == null && cSub.getSelector() == null)
871         updatedSelector = false;
872       else
873         updatedSelector = ! req.getSelector().equals(cSub.getSelector());
874
875       // Reactivating the subscription.
876
cSub.reactivate(activeCtxId,
877                       req.getRequestId(),
878                       topicId,
879                       req.getSelector(),
880                       req.getNoLocal());
881
882       if (logger.isLoggable(BasicLevel.DEBUG))
883         logger.log(BasicLevel.DEBUG,
884                    "Subscription " + subName + " reactivated.");
885
886       // Updated subscription: updating subscription to topic.
887
if (updatedTopic || updatedSelector) {
888         tSub.putSubscription(subName, req.getSelector());
889         sent = updateSubscriptionToTopic(topicId,
890                                          activeCtxId,
891                                          req.getRequestId());
892       }
893     }
894     // Activating the subscription.
895
activeCtx.addSubName(subName);
896
897     // Acknowledging the request, if needed.
898
if (! sent)
899       proxyAgent.sendNot(proxyAgent.getId(),
900                          new SyncReply(activeCtxId, new ServerReply(req)));
901   }
902
903   /**
904    * Method implementing the JMS proxy reaction to a
905    * <code>ConsumerSetListRequest</code> notifying the creation of a client
906    * listener.
907    * <p>
908    * Sets the listener for the subscription, launches a delivery sequence.
909    *
910    * @exception DestinationException If the subscription does not exist.
911    */

912   private void doReact(ConsumerSetListRequest req) throws DestinationException
913   {
914     // Getting the subscription:
915
String JavaDoc subName = req.getTarget();
916     ClientSubscription sub = null;
917     if (subName != null)
918       sub = (ClientSubscription) subsTable.get(subName);
919
920     if (sub == null)
921       throw new DestinationException("Can't set a listener on the non existing subscription: " + subName);
922
923     sub.setListener(req.getRequestId());
924
925     ConsumerMessages consM = sub.deliver();
926     if (consM != null) {
927       if (activeCtx.getActivated())
928         doReply(consM);
929       else
930         activeCtx.addPendingDelivery(consM);
931     }
932   }
933    
934   /**
935    * Method implementing the JMS proxy reaction to a
936    * <code>ConsumerUnsetListRequest</code> notifying that a consumer listener
937    * is unset.
938    *
939    * @exception DestinationException If the subscription does not exist.
940    */

941   private void doReact(ConsumerUnsetListRequest req)
942     throws DestinationException {
943     // If the listener was listening to a queue, cancelling any pending reply:
944
if (req.getQueueMode()) {
945       activeCtx.cancelReceive(req.getCancelledRequestId());
946       AgentId to = AgentId.fromString(req.getTarget());
947       proxyAgent.sendNot(
948         to,
949         new AbortReceiveRequest(activeCtx.getId(),
950                                 req.getRequestId(),
951                                 req.getCancelledRequestId()));
952     }
953   }
954
955   /**
956    * Method implementing the JMS proxy reaction to a
957    * <code>ConsumerCloseSubRequest</code> requesting to deactivate a durable
958    * subscription.
959    *
960    * @exception DestinationException If the subscription does not exist.
961    */

962   private void doReact(ConsumerCloseSubRequest req) throws DestinationException
963   {
964     // Getting the subscription:
965
String JavaDoc subName = req.getTarget();
966     ClientSubscription sub = null;
967     if (subName != null)
968       sub = (ClientSubscription) subsTable.get(subName);
969
970     if (sub == null)
971       throw new DestinationException("Can't desactivate non existing subscription: " + subName);
972
973     // De-activating the subscription:
974
activeCtx.removeSubName(subName);
975     sub.deactivate();
976
977     // Acknowledging the request:
978
doReply(new ServerReply(req));
979   }
980
981   /**
982    * Method implementing the JMS proxy reaction to a
983    * <code>ConsumerUnsubRequest</code> requesting to remove a subscription.
984    *
985    * @exception DestinationException If the subscription does not exist.
986    */

987   private void doReact(ConsumerUnsubRequest req) throws DestinationException {
988     // state change, so save.
989
proxyAgent.setSave();
990
991     // Getting the subscription.
992
String JavaDoc subName = req.getTarget();
993     ClientSubscription sub = null;
994     if (subName != null)
995       sub = (ClientSubscription) subsTable.get(subName);
996     if (sub == null)
997       throw new DestinationException("Can't unsubscribe non existing subscription: " + subName);
998
999     if (logger.isLoggable(BasicLevel.DEBUG))
1000      logger.log(BasicLevel.DEBUG, "Deleting subscription " + subName);
1001
1002    // Updating the proxy's subscription to the topic.
1003
AgentId topicId = sub.getTopicId();
1004    TopicSubscription tSub = (TopicSubscription) topicsTable.get(topicId);
1005    tSub.removeSubscription(subName);
1006    updateSubscriptionToTopic(topicId, -1, -1);
1007
1008    // Deleting the subscription.
1009
sub.delete();
1010    activeCtx.removeSubName(subName);
1011    subsTable.remove(subName);
1012
1013    // Acknowledging the request:
1014
proxyAgent.sendNot(proxyAgent.getId(),
1015                       new SyncReply(activeCtxId, new ServerReply(req)));
1016  }
1017
1018  /**
1019   * Method implementing the proxy reaction to a
1020   * <code>ConsumerReceiveRequest</code> instance, requesting a message from a
1021   * subscription.
1022   * <p>
1023   * This method registers the request and launches a delivery sequence.
1024   *
1025   * @exception DestinationException If the subscription does not exist.
1026   */

1027  private void doReact(ConsumerReceiveRequest req)
1028    throws DestinationException {
1029    if (logger.isLoggable(BasicLevel.DEBUG))
1030      logger.log(BasicLevel.DEBUG, "ProxyImpl.doReact(" + req + ')');
1031    
1032    String JavaDoc subName = req.getTarget();
1033    ClientSubscription sub = null;
1034    if (subName != null)
1035      sub = (ClientSubscription) subsTable.get(subName);
1036
1037    if (sub == null)
1038      throw new DestinationException("Can't request a message from the unknown subscription: " + subName);
1039
1040    // Getting a message from the subscription.
1041
sub.setReceiver(req.getRequestId(), req.getTimeToLive());
1042    ConsumerMessages consM = sub.deliver();
1043
1044    if (consM != null && req.getReceiveAck()) {
1045      Vector JavaDoc messageList = consM.getMessages();
1046      for (int i = 0; i < messageList.size(); i++) {
1047        Message msg = (Message)messageList.elementAt(i);
1048        sub.acknowledge(msg.getIdentifier());
1049      }
1050    }
1051
1052    // Nothing to deliver but immediate delivery request: building an empty
1053
// reply.
1054
if (consM == null && req.getTimeToLive() == -1) {
1055      if (logger.isLoggable(BasicLevel.DEBUG))
1056        logger.log(BasicLevel.DEBUG, " -> immediate delivery");
1057      sub.unsetReceiver();
1058      consM = new ConsumerMessages(req.getRequestId(), subName, false);
1059    }
1060    
1061    // Delivering.
1062
if (consM != null && activeCtx.getActivated()) {
1063      doReply(consM);
1064    } else if (consM != null) {
1065      activeCtx.addPendingDelivery(consM);
1066    }
1067  }
1068
1069  /**
1070   * Method implementing the JMS proxy reaction to a
1071   * <code>SessAckRequest</code> acknowledging messages either on a queue
1072   * or on a subscription.
1073   */

1074  private void doReact(SessAckRequest req)
1075  {
1076    if (req.getQueueMode()) {
1077      AgentId qId = AgentId.fromString(req.getTarget());
1078      Vector JavaDoc ids = req.getIds();
1079      
1080      AcknowledgeRequest not =
1081        new AcknowledgeRequest(activeCtxId,
1082                               req.getRequestId(),
1083                               ids);
1084      if (qId.getTo() == proxyAgent.getId().getTo()) {
1085        if (logger.isLoggable(BasicLevel.DEBUG))
1086          logger.log(BasicLevel.DEBUG, " -> local acking");
1087        not.setPersistent(false);
1088      }
1089      
1090      proxyAgent.sendNot(qId, not);
1091    }
1092    else {
1093      String JavaDoc subName = req.getTarget();
1094      ClientSubscription sub = (ClientSubscription) subsTable.get(subName);
1095      if (sub != null)
1096        sub.acknowledge(req.getIds().elements());
1097    }
1098  }
1099
1100  /**
1101   * Method implementing the JMS proxy reaction to a
1102   * <code>SessDenyRequest</code> denying messages either on a queue or on
1103   * a subscription.
1104   */

1105  private void doReact(SessDenyRequest req) {
1106    if (req.getQueueMode()) {
1107      AgentId qId = AgentId.fromString(req.getTarget());
1108      Vector JavaDoc ids = req.getIds();
1109      proxyAgent.sendNot(qId,
1110                         new DenyRequest(activeCtxId, req.getRequestId(), ids));
1111
1112      // Acknowledging the request unless forbidden:
1113
if (! req.getDoNotAck())
1114        proxyAgent.sendNot(proxyAgent.getId(),
1115                           new SyncReply(activeCtxId, new ServerReply(req)));
1116    }
1117    else {
1118      String JavaDoc subName = req.getTarget();
1119      ClientSubscription sub = (ClientSubscription) subsTable.get(subName);
1120
1121      if (sub == null)
1122        return;
1123
1124      sub.deny(req.getIds().elements());
1125
1126      // Launching a delivery sequence:
1127
ConsumerMessages consM = sub.deliver();
1128      // Delivering.
1129
if (consM != null && activeCtx.getActivated())
1130        doReply(consM);
1131      else if (consM != null)
1132        activeCtx.addPendingDelivery(consM);
1133    }
1134  }
1135
1136  /**
1137   * Method implementing the JMS proxy reaction to a
1138   * <code>ConsumerAckRequest</code> acknowledging a message either on a queue
1139   * or on a subscription.
1140   */

1141  private void doReact(ConsumerAckRequest req)
1142  {
1143    if (req.getQueueMode()) {
1144      AgentId qId = AgentId.fromString(req.getTarget());
1145      AcknowledgeRequest not = new AcknowledgeRequest(activeCtxId,
1146                                                      req.getRequestId(),
1147                                                      req.getIds());
1148      if (qId.getTo() == proxyAgent.getId().getTo()) {
1149        if (logger.isLoggable(BasicLevel.DEBUG))
1150          logger.log(BasicLevel.DEBUG, " -> local acking");
1151        not.setPersistent(false);
1152        proxyAgent.sendNot(qId, not);
1153      } else {
1154        proxyAgent.sendNot(qId, not);
1155      }
1156    } else {
1157      String JavaDoc subName = req.getTarget();
1158      ClientSubscription sub = (ClientSubscription) subsTable.get(subName);
1159      if (sub != null) {
1160        sub.acknowledge(req.getIds().elements());
1161      }
1162    }
1163  }
1164
1165  /**
1166   * Method implementing the JMS proxy reaction to a
1167   * <code>ConsumerDenyRequest</code> denying a message either on a queue
1168   * or on a subscription.
1169   * <p>
1170   * This request is acknowledged when destinated to a queue.
1171   */

1172  private void doReact(ConsumerDenyRequest req) {
1173    if (req.getQueueMode()) {
1174      AgentId qId = AgentId.fromString(req.getTarget());
1175      String JavaDoc id = req.getId();
1176      proxyAgent.sendNot(qId,
1177                         new DenyRequest(activeCtxId, req.getRequestId(), id));
1178
1179      // Acknowledging the request, unless forbidden:
1180
if (! req.getDoNotAck())
1181        proxyAgent.sendNot(proxyAgent.getId(),
1182                           new SyncReply(activeCtxId, new ServerReply(req)));
1183    } else {
1184      String JavaDoc subName = req.getTarget();
1185      ClientSubscription sub = (ClientSubscription) subsTable.get(subName);
1186
1187      if (sub == null)
1188        return;
1189
1190      Vector JavaDoc ids = new Vector JavaDoc();
1191      ids.add(req.getId());
1192      sub.deny(ids.elements());
1193
1194      // Launching a delivery sequence:
1195
ConsumerMessages consM = sub.deliver();
1196      // Delivering.
1197
if (consM != null && activeCtx.getActivated())
1198        doReply(consM);
1199      else if (consM != null)
1200        activeCtx.addPendingDelivery(consM);
1201    }
1202  }
1203
1204  /**
1205   * Method implementing the JMS proxy reaction to a
1206   * <code>TempDestDeleteRequest</code> request for deleting a temporary
1207   * destination.
1208   * <p>
1209   * This method sends a <code>fr.dyade.aaa.agent.DeleteNot</code> to the
1210   * destination and acknowledges the request.
1211   */

1212  private void doReact(TempDestDeleteRequest req) {
1213    // Removing the destination from the context's list:
1214
AgentId tempId = AgentId.fromString(req.getTarget());
1215    activeCtx.removeTemporaryDestination(tempId);
1216
1217    // Sending the request to the destination:
1218
deleteTemporaryDestination(tempId);
1219
1220    // Acknowledging the request:
1221
proxyAgent.sendNot(proxyAgent.getId(),
1222                       new SyncReply(activeCtxId, new ServerReply(req)));
1223  }
1224
1225  private void deleteTemporaryDestination(AgentId destId) {
1226    proxyAgent.sendNot(destId, new DeleteNot());
1227    proxyAgent.sendNot(AdminTopic.getDefault(),
1228                       new RegisterTmpDestNot(destId, false, false));
1229  }
1230
1231  /**
1232   * Method implementing the JMS proxy reaction to an
1233   * <code>XACnxPrepare</code> request holding messages and acknowledgements
1234   * produced in an XA transaction.
1235   *
1236   * @exception StateException If the proxy has already received a prepare
1237   * order for the same transaction.
1238   */

1239  private void doReact(XACnxPrepare req) throws StateException {
1240    try {
1241      Xid xid = new Xid(req.getBQ(), req.getFI(), req.getGTI());
1242      activeCtx.registerTxPrepare(xid, req);
1243      doReply(new ServerReply(req));
1244    }
1245    catch (Exception JavaDoc exc) {
1246      throw new StateException(exc.getMessage());
1247    }
1248  }
1249
1250  /**
1251   * Method implementing the JMS proxy reaction to an
1252   * <code>XACnxCommit</code> request commiting the operations performed
1253   * in a given transaction.
1254   * <p>
1255   * This method actually processes the objects sent at the prepare phase,
1256   * and acknowledges the request.
1257   *
1258   * @exception StateException If commiting an unknown transaction.
1259   */

1260  private void doReact(XACnxCommit req) throws StateException {
1261    Xid xid = new Xid(req.getBQ(), req.getFI(), req.getGTI());
1262
1263    XACnxPrepare prepare = activeCtx.getTxPrepare(xid);
1264
1265    if (prepare == null)
1266      throw new StateException("Unknown transaction identifier.");
1267
1268    Vector JavaDoc sendings = prepare.getSendings();
1269    Vector JavaDoc acks = prepare.getAcks();
1270
1271    ProducerMessages pM;
1272    ClientMessages not;
1273    while (! sendings.isEmpty()) {
1274      pM = (ProducerMessages) sendings.remove(0);
1275      not = new ClientMessages(activeCtxId,
1276                               pM.getRequestId(),
1277                               pM.getMessages());
1278      proxyAgent.sendNot(AgentId.fromString(pM.getTarget()), not);
1279    }
1280
1281    while (! acks.isEmpty())
1282      doReact((SessAckRequest) acks.remove(0));
1283
1284    doReply(new ServerReply(req));
1285  }
1286
1287  /**
1288   * Method implementing the JMS proxy reaction to an
1289   * <code>XACnxRollback</code> request rolling back the operations performed
1290   * in a given transaction.
1291   */

1292  private void doReact(XACnxRollback req) {
1293    Xid xid = new Xid(req.getBQ(), req.getFI(), req.getGTI());
1294
1295    String JavaDoc queueName;
1296    AgentId qId;
1297    Vector JavaDoc ids;
1298    for (Enumeration JavaDoc queues = req.getQueues(); queues.hasMoreElements();) {
1299      queueName = (String JavaDoc) queues.nextElement();
1300      qId = AgentId.fromString(queueName);
1301      ids = req.getQueueIds(queueName);
1302      proxyAgent.sendNot(qId,
1303                         new DenyRequest(activeCtxId, req.getRequestId(), ids));
1304    }
1305
1306    String JavaDoc subName;
1307    ClientSubscription sub;
1308    ConsumerMessages consM;
1309    for (Enumeration JavaDoc subs = req.getSubs(); subs.hasMoreElements();) {
1310      subName = (String JavaDoc) subs.nextElement();
1311      sub = (ClientSubscription) subsTable.get(subName);
1312      if (sub != null) {
1313        sub.deny(req.getSubIds(subName).elements());
1314
1315        consM = sub.deliver();
1316        if (consM != null && activeCtx.getActivated())
1317          doReply(consM);
1318        else if (consM != null)
1319          activeCtx.addPendingDelivery(consM);
1320      }
1321    }
1322
1323   XACnxPrepare prepare = activeCtx.getTxPrepare(xid);
1324
1325    if (prepare != null) {
1326      Vector JavaDoc acks = prepare.getAcks();
1327
1328      SessAckRequest ack;
1329      while (! acks.isEmpty()) {
1330        ack = (SessAckRequest) acks.remove(0);
1331        doReact(new SessDenyRequest(ack.getTarget(),
1332                                    ack.getIds(),
1333                                    ack.getQueueMode(),
1334                                    true));
1335      }
1336    }
1337
1338    proxyAgent.sendNot(proxyAgent.getId(),
1339                       new SyncReply(activeCtxId, new ServerReply(req)));
1340  }
1341
1342  /**
1343   * Reacts to a <code>XACnxRecoverRequest</code> request requesting the
1344   * identifiers of the prepared transactions.
1345   * <p>
1346   * Returns the identifiers of the recovered transactions, puts the prepared
1347   * data into the active context for future commit or rollback.
1348   *
1349   * @exception StateException If a recovered transaction branch is already
1350   * present in the context.
1351   */

1352  private void doReact(XACnxRecoverRequest req)
1353    throws StateException {
1354    // state change, so save.
1355
proxyAgent.setSave();
1356
1357    Vector JavaDoc bqs = new Vector JavaDoc();
1358    Vector JavaDoc fis = new Vector JavaDoc();
1359    Vector JavaDoc gtis = new Vector JavaDoc();
1360    if (recoveredTransactions != null) {
1361      Enumeration JavaDoc keys = recoveredTransactions.keys();
1362      Xid xid;
1363      while (keys.hasMoreElements()) {
1364        xid = (Xid) keys.nextElement();
1365        bqs.add(xid.bq);
1366        fis.add(new Integer JavaDoc(xid.fi));
1367        gtis.add(xid.gti);
1368        try {
1369          activeCtx.registerTxPrepare(xid,
1370                                      (XACnxPrepare) recoveredTransactions.remove(xid));
1371        }
1372        catch (Exception JavaDoc exc) {
1373          throw new StateException("Recovered transaction branch has already been prepared by the RM.");
1374        }
1375      }
1376    }
1377    recoveredTransactions = null;
1378    doReply(new XACnxRecoverReply(req, bqs, fis, gtis));
1379  }
1380
1381  /**
1382   * Method implementing the reaction to a <code>SetDMQRequest</code>
1383   * instance setting the dead message queue identifier for this proxy
1384   * and its subscriptions.
1385   */

1386  private void doReact(AgentId from, SetDMQRequest not) {
1387    // state change, so save.
1388
proxyAgent.setSave();
1389    
1390    dmqId = not.getDmqId();
1391
1392    for (Enumeration JavaDoc keys = subsTable.keys(); keys.hasMoreElements();)
1393      ((ClientSubscription) subsTable.get(keys.nextElement())).setDMQId(dmqId);
1394
1395    proxyAgent.sendNot(from, new AdminReply(not, true, "DMQ set: " + dmqId));
1396  }
1397
1398  /**
1399   * Method implementing the reaction to a <code>SetThreshRequest</code>
1400   * instance setting the threshold value for this proxy and its
1401   * subscriptions.
1402   */

1403  private void doReact(AgentId from, SetThreshRequest not) {
1404    // state change, so save.
1405
proxyAgent.setSave();
1406    
1407    threshold = not.getThreshold();
1408
1409    for (Enumeration JavaDoc keys = subsTable.keys(); keys.hasMoreElements();)
1410      ((ClientSubscription)
1411         subsTable.get(keys.nextElement())).setThreshold(not.getThreshold());
1412
1413    proxyAgent.sendNot(from,
1414                       new AdminReply(not,
1415                                      true,
1416                                      "Threshold set: " + threshold));
1417  }
1418
1419  /**
1420   * Method implementing the reaction to a <code>SetNbMaxMsgRequest</code>
1421   * instance setting the NbMaxMsg value for the subscription.
1422   */

1423  protected void doReact(AgentId from, SetNbMaxMsgRequest not) {
1424    int nbMaxMsg = not.getNbMaxMsg();
1425    String JavaDoc subName = not.getSubName();
1426
1427    ClientSubscription sub = (ClientSubscription) subsTable.get(subName);
1428    if (sub != null) {
1429      sub.setNbMaxMsg(nbMaxMsg);
1430      proxyAgent.sendNot(from,
1431                         new AdminReply(not,
1432                                        true,
1433                                        "NbMaxMsg set: " + nbMaxMsg + " on " + subName));
1434    } else {
1435      proxyAgent.sendNot(from,
1436                         new AdminReply(not,
1437                                        false,
1438                                        "NbMaxMsg not set: " + nbMaxMsg + " on " + subName));
1439    }
1440  }
1441
1442  /**
1443   * Method implementing the reaction to a
1444   * <code>Monit_GetNbMaxMsg</code> notification requesting the
1445   * number max of messages in the subscription.
1446   *
1447   * @exception AccessException If the requester is not the administrator.
1448   */

1449  protected void doReact(AgentId from, Monit_GetNbMaxMsg not) {
1450    int nbMaxMsg = -1;
1451    String JavaDoc subName = not.getSubName();
1452    ClientSubscription sub = (ClientSubscription) subsTable.get(subName);
1453    if (sub != null)
1454      nbMaxMsg = sub.getNbMaxMsg();
1455
1456    Channel.sendTo(from, new Monit_GetNbMaxMsgRep(not,nbMaxMsg));
1457  }
1458
1459  /**
1460   * Returns the maximum number of message for identified subscription.
1461   * The subscription is identified by its unique name, if the limit is unset
1462   * the method returns -1.
1463   *
1464   * @param subName The subscription unique name.
1465   * @return the maximum number of message for subscription if set;
1466   * -1 otherwise.
1467   */

1468  public int getNbMaxMsg(String JavaDoc subName) {
1469    int nbMaxMsg = -1;
1470    ClientSubscription sub = (ClientSubscription) subsTable.get(subName);
1471    if (sub != null)
1472      nbMaxMsg = sub.getNbMaxMsg();
1473    return nbMaxMsg;
1474  }
1475
1476  /**
1477   * Sets the maximum number of message for identified subscription.
1478   * The subscription is identified by its unique name.
1479   *
1480   * @param subName The subscription unique name.
1481   * @param nbMaxMsg the maximum number of message for subscription (-1 set
1482   * no limit).
1483   */

1484  public void setNbMaxMsg(String JavaDoc subName, int nbMaxMsg) {
1485    ClientSubscription sub = (ClientSubscription) subsTable.get(subName);
1486    if (sub != null)
1487      sub.setNbMaxMsg(nbMaxMsg);
1488  }
1489
1490  /**
1491   * Method implementing the reaction to a <code>Monit_GetDMQSettings</code>
1492   * instance requesting the DMQ settings of this proxy.
1493   */

1494  private void doReact(AgentId from, Monit_GetDMQSettings not)
1495  {
1496    String JavaDoc id = null;
1497    if (dmqId != null)
1498      id = dmqId.toString();
1499    proxyAgent.sendNot(from, new Monit_GetDMQSettingsRep(not, id, threshold));
1500  }
1501
1502  /**
1503   * Method implementing the JMS proxy reaction to a
1504   * <code>SyncReply</code> notification sent by itself, wrapping a reply
1505   * to be sent to a client.
1506   */

1507  private void doReact(SyncReply not)
1508  {
1509    doReply(not.key, not.reply);
1510  }
1511
1512  /**
1513   * The method closes a given context by denying the non acknowledged messages
1514   * delivered to this context, and deleting its temporary subscriptions and
1515   * destinations.
1516   */

1517  private void doReact(int key, CnxCloseRequest req) {
1518    // state change, so save.
1519
proxyAgent.setSave();
1520
1521    //setCtx(cKey);
1522

1523    // Denying the non acknowledged messages:
1524
AgentId id;
1525    for (Enumeration JavaDoc ids = activeCtx.getDeliveringQueues(); ids
1526        .hasMoreElements();) {
1527      id = (AgentId) ids.nextElement();
1528      proxyAgent.sendNot(id, new DenyRequest(key));
1529    }
1530
1531    // Removing or deactivating the subscriptions:
1532
String JavaDoc subName = null;
1533    ClientSubscription sub;
1534    Vector JavaDoc topics = new Vector JavaDoc();
1535    for (Enumeration JavaDoc subs = activeCtx.getActiveSubs(); subs.hasMoreElements();) {
1536      subName = (String JavaDoc) subs.nextElement();
1537      sub = (ClientSubscription) subsTable.get(subName);
1538
1539      if (logger.isLoggable(BasicLevel.DEBUG))
1540        logger.log(BasicLevel.DEBUG, "Deactivate subscription "
1541            + subName + ", topic id = " + sub.getTopicId());
1542
1543      if (sub.getDurable()) {
1544        sub.deactivate();
1545
1546        if (logger.isLoggable(BasicLevel.DEBUG))
1547          logger.log(BasicLevel.DEBUG, "Durable subscription"
1548              + subName + " de-activated.");
1549      } else {
1550        if (logger.isLoggable(BasicLevel.DEBUG))
1551          logger.log(BasicLevel.DEBUG, " -> topicsTable = "
1552              + topicsTable);
1553
1554        sub.delete();
1555        subsTable.remove(subName);
1556        TopicSubscription tSub = (TopicSubscription) topicsTable.get(sub
1557            .getTopicId());
1558        tSub.removeSubscription(subName);
1559
1560        if (!topics.contains(sub.getTopicId()))
1561          topics.add(sub.getTopicId());
1562
1563        if (logger.isLoggable(BasicLevel.DEBUG))
1564          logger.log(BasicLevel.DEBUG, "Temporary subscription"
1565              + subName + " deleted.");
1566      }
1567    }
1568    // Browsing the topics which at least have one subscription removed.
1569
for (Enumeration JavaDoc topicIds = topics.elements(); topicIds.hasMoreElements();)
1570      updateSubscriptionToTopic((AgentId) topicIds.nextElement(), -1, -1);
1571
1572    // Deleting the temporary destinations:
1573
AgentId destId;
1574    for (Enumeration JavaDoc dests = activeCtx.getTempDestinations(); dests
1575        .hasMoreElements();) {
1576      destId = (AgentId) dests.nextElement();
1577      activeCtx.removeTemporaryDestination(destId);
1578      deleteTemporaryDestination(destId);
1579
1580      if (logger.isLoggable(BasicLevel.DEBUG))
1581        logger.log(BasicLevel.DEBUG, "Deletes temporary"
1582            + " destination " + destId.toString());
1583    }
1584
1585    // Saving the prepared transactions.
1586
Enumeration JavaDoc xids = activeCtx.getTxIds();
1587    Xid xid;
1588    XACnxPrepare recoveredPrepare;
1589    XACnxPrepare prepare;
1590    while (xids.hasMoreElements()) {
1591      if (recoveredTransactions == null)
1592        recoveredTransactions = new Hashtable JavaDoc();
1593
1594      xid = (Xid) xids.nextElement();
1595
1596      recoveredPrepare = (XACnxPrepare) recoveredTransactions.get(xid);
1597      prepare = activeCtx.getTxPrepare(xid);
1598
1599      if (recoveredPrepare == null)
1600        recoveredTransactions.put(xid, prepare);
1601      else {
1602        recoveredPrepare.getSendings().addAll(prepare.getSendings());
1603        recoveredPrepare.getAcks().addAll(prepare.getAcks());
1604      }
1605    }
1606
1607    // Finally, deleting the context:
1608
contexts.remove(new Integer JavaDoc(key));
1609    activeCtx = null;
1610    setActiveCtxId(-1);
1611
1612    CnxCloseReply reply = new CnxCloseReply();
1613    reply.setCorrelationId(req.getRequestId());
1614    proxyAgent.sendToClient(key, reply);
1615  }
1616
1617  private void doReact(int key, ActivateConsumerRequest req) {
1618    String JavaDoc subName = req.getTarget();
1619    ClientSubscription sub = (ClientSubscription) subsTable.get(subName);
1620    sub.setActive(req.getActivate());
1621  }
1622  
1623  private void doReact(int key, CommitRequest req) {
1624    // The commit may involve some local agents
1625
int asyncReplyCount = 0;
1626    
1627    Enumeration JavaDoc pms = req.getProducerMessages();
1628    if (pms != null) {
1629      while (pms.hasMoreElements()) {
1630        ProducerMessages pm = (ProducerMessages) pms.nextElement();
1631        AgentId destId = AgentId.fromString(pm.getTarget());
1632        ClientMessages not = new ClientMessages(key,
1633            req.getRequestId(), pm.getMessages());
1634        setDmq(not);
1635        if (destId.getTo() == proxyAgent.getId().getTo()) {
1636          // local sending
1637
not.setPersistent(false);
1638          if (req.getAsyncSend()) {
1639            not.setAsyncSend(true);
1640          } else {
1641            asyncReplyCount++;
1642          }
1643        }
1644        proxyAgent.sendNot(destId, not);
1645      }
1646    }
1647    
1648    Enumeration JavaDoc acks = req.getAckRequests();
1649    if (acks != null) {
1650      while (acks.hasMoreElements()) {
1651        SessAckRequest sar = (SessAckRequest) acks.nextElement();
1652        if (sar.getQueueMode()) {
1653          AgentId qId = AgentId.fromString(sar.getTarget());
1654          Vector JavaDoc ids = sar.getIds();
1655          AcknowledgeRequest not = new AcknowledgeRequest(activeCtxId, req
1656              .getRequestId(), ids);
1657          if (qId.getTo() == proxyAgent.getId().getTo()) {
1658            // local sending
1659
not.setPersistent(false);
1660            // No reply to wait for
1661
}
1662
1663          proxyAgent.sendNot(qId, not);
1664        } else {
1665          String JavaDoc subName = sar.getTarget();
1666          ClientSubscription sub = (ClientSubscription) subsTable.get(subName);
1667          if (sub != null) {
1668            sub.acknowledge(sar.getIds().elements());
1669            proxyAgent.setSave();
1670          }
1671        }
1672      }
1673    }
1674   
1675    if (!req.getAsyncSend()) {
1676      if (asyncReplyCount == 0) {
1677        proxyAgent.sendNot(proxyAgent.getId(), new SendReplyNot(key, req
1678            .getRequestId()));
1679      } else {
1680        // we need to wait for the replies
1681
// from the local agents
1682
// before replying to the client.
1683
activeCtx.addMultiReplyContext(req.getRequestId(), asyncReplyCount);
1684      }
1685    }
1686    // else the client doesn't expect any ack
1687
}
1688
1689  /**
1690   * Distributes the JMS replies to the appropriate reactions.
1691   * <p>
1692   * JMS proxies react the following replies:
1693   * <ul>
1694   * <li><code>QueueMsgReply</code></li>
1695   * <li><code>BrowseReply</code></li>
1696   * <li><code>SubscribeReply</code></li>
1697   * <li><code>TopicMsgsReply</code></li>
1698   * <li><code>ExceptionReply</code></li>
1699   * </ul>
1700   */

1701  private void doFwd(AgentId from, AbstractReply rep) {
1702    if (logger.isLoggable(BasicLevel.DEBUG))
1703      logger.log(BasicLevel.DEBUG,
1704                 "--- " + this + " got " + rep.getClass().getName() +
1705                 " with id: " + rep.getCorrelationId() + " from: " + from);
1706
1707    if (rep instanceof QueueMsgReply)
1708      doFwd(from, (QueueMsgReply) rep);
1709    else if (rep instanceof BrowseReply)
1710      doFwd((BrowseReply) rep);
1711    else if (rep instanceof SubscribeReply)
1712      doFwd((SubscribeReply) rep);
1713    else if (rep instanceof TopicMsgsReply)
1714      doFwd(from, (TopicMsgsReply) rep);
1715    else if (rep instanceof ExceptionReply)
1716      doReact(from, (ExceptionReply) rep);
1717    else {
1718      if (logger.isLoggable(BasicLevel.ERROR))
1719        logger.log(BasicLevel.ERROR, "Unexpected reply: " + rep);
1720    }
1721  }
1722
1723
1724  /**
1725   * Actually forwards a <code>QueueMsgReply</code> coming from a destination
1726   * as a <code>ConsumerMessages</code> destinated to the requesting client.
1727   * <p>
1728   * If the corresponding context is stopped, stores the
1729   * <code>ConsumerMessages</code> for later delivery.
1730   */

1731  private void doFwd(AgentId from, QueueMsgReply rep) {
1732    if (logger.isLoggable(BasicLevel.DEBUG))
1733      logger.log(BasicLevel.DEBUG,
1734                 "ProxyImpl.doFwd(" + from + ',' + rep + ')');
1735    
1736    try {
1737      // Updating the active context:
1738
setCtx(rep.getClientContext());
1739
1740      // If the receive request being replied has been cancelled, denying
1741
// the message.
1742
if (rep.getCorrelationId() == activeCtx.getCancelledReceive()) {
1743        if (logger.isLoggable(BasicLevel.DEBUG))
1744          logger.log(BasicLevel.DEBUG,
1745            " -> cancelled receive: id=" + activeCtx.getCancelledReceive());
1746
1747        if (rep.getSize() > 0) {
1748          Vector JavaDoc msgList = rep.getMessages();
1749          for (int i = 0; i < msgList.size(); i++) {
1750            Message msg = new Message((org.objectweb.joram.shared.messages.Message) msgList.elementAt(i));
1751            String JavaDoc msgId = msg.getIdentifier();
1752            
1753            if (logger.isLoggable(BasicLevel.INFO))
1754              logger.log(BasicLevel.INFO, " -> denying message: " + msgId);
1755            
1756            proxyAgent.sendNot(from,
1757                               new DenyRequest(0, rep.getCorrelationId(), msgId));
1758          }
1759        }
1760      } else {
1761        if (logger.isLoggable(BasicLevel.DEBUG))
1762          logger.log(BasicLevel.DEBUG, " -> reply");
1763
1764        ConsumerMessages jRep;
1765
1766        // Building the reply and storing the wrapped message id for later
1767
// denying in the case of a failure:
1768
if (rep.getSize() > 0) {
1769          jRep = new ConsumerMessages(
1770            rep.getCorrelationId(),
1771            rep.getMessages(),
1772            from.toString(),
1773            true);
1774          activeCtx.addDeliveringQueue(from);
1775        } else {
1776          jRep = new ConsumerMessages(
1777              rep.getCorrelationId(),
1778              (Vector JavaDoc)null,
1779              from.toString(),
1780              true);
1781        }
1782
1783        // If the context is started, delivering the message, or buffering it:
1784
if (activeCtx.getActivated()) {
1785          doReply(jRep);
1786        } else {
1787          if (logger.isLoggable(BasicLevel.DEBUG))
1788            logger.log(BasicLevel.DEBUG, " -> buffer the reply");
1789          activeCtx.addPendingDelivery(jRep);
1790        }
1791      }
1792    } catch (StateException pE) {
1793      // The context is lost: denying the message:
1794
if (logger.isLoggable(BasicLevel.DEBUG))
1795        logger.log(BasicLevel.DEBUG, "", pE);
1796      if (rep.getMessages().size() > 0) {
1797        Vector JavaDoc msgList = rep.getMessages();
1798        for (int i = 0; i < msgList.size(); i++) {
1799          Message msg = new Message((org.objectweb.joram.shared.messages.Message) msgList.elementAt(i));
1800          String JavaDoc msgId = msg.getIdentifier();
1801          
1802          if (logger.isLoggable(BasicLevel.INFO))
1803            logger.log(BasicLevel.INFO, "Denying message: " + msgId);
1804          
1805          proxyAgent.sendNot(from,
1806                             new DenyRequest(0, rep.getCorrelationId(), msgId));
1807        }
1808      }
1809    }
1810  }
1811
1812
1813  /**
1814   * Actually forwards a <code>BrowseReply</code> coming from a
1815   * destination as a <code>QBrowseReply</code> destinated to the
1816   * requesting client.
1817   */

1818  private void doFwd(BrowseReply rep) {
1819    try {
1820      // Updating the active context:
1821
setCtx(rep.getClientContext());
1822      doReply(new QBrowseReply(rep.getCorrelationId(),
1823                               rep.getMessages()));
1824    } catch (StateException pE) {
1825      // The context is lost; nothing to do.
1826
}
1827  }
1828
1829  /**
1830   * Forwards the topic's <code>SubscribeReply</code> as a
1831   * <code>ServerReply</code>.
1832   */

1833  private void doFwd(SubscribeReply rep) {
1834    try {
1835      setCtx(rep.getClientContext());
1836      doReply(new ServerReply(rep.getCorrelationId()));
1837    } catch (StateException pE) {
1838      // The context is lost; nothing to do.
1839
}
1840  }
1841
1842  transient String JavaDoc msgTxname = null;
1843
1844  protected final String JavaDoc getMsgTxname() {
1845    if (msgTxname == null)
1846      msgTxname = 'M' + proxyAgent.getId().toString() + '_';
1847    return msgTxname;
1848  }
1849
1850  protected final void setMsgTxName(Message msg) {
1851    if (msg.getTxName() == null)
1852      msg.setTxName(getMsgTxname() + msg.order);
1853  }
1854
1855  /**
1856   * Method implementing the proxy reaction to a <code>TopicMsgsReply</code>
1857   * holding messages published by a topic.
1858   */

1859  private void doFwd(AgentId from, TopicMsgsReply rep) {
1860    // Browsing the target subscriptions:
1861
TopicSubscription tSub = (TopicSubscription) topicsTable.get(from);
1862    if (tSub == null || tSub.isEmpty()) return;
1863
1864    String JavaDoc subName;
1865    ClientSubscription sub;
1866
1867    // AF: TODO we should parse each message for each subscription
1868
// see ClientSubscription.browseNewMessages
1869
Vector JavaDoc messages = new Vector JavaDoc();
1870    for (Enumeration JavaDoc msgs = rep.getMessages().elements();
1871         msgs.hasMoreElements();) {
1872      Message message = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement());
1873      // Setting the arrival order of the messages
1874
message.order = arrivalsCounter++;
1875      messages.add(message);
1876    }
1877
1878    for (Enumeration JavaDoc names = tSub.getNames(); names.hasMoreElements();) {
1879      subName = (String JavaDoc) names.nextElement();
1880      sub = (ClientSubscription) subsTable.get(subName);
1881      if (sub == null) continue;
1882
1883      // Browsing the delivered messages.
1884
sub.browseNewMessages(messages);
1885    }
1886
1887    // Save message if it is delivered to a durable subscription.
1888
for (Enumeration JavaDoc msgs = messages.elements(); msgs.hasMoreElements();) {
1889      Message message = (Message) msgs.nextElement();
1890      
1891      if (message.durableAcksCounter > 0) {
1892        if (logger.isLoggable(BasicLevel.DEBUG))
1893          logger.log(BasicLevel.DEBUG, " -> save message " + message);
1894        proxyAgent.setSave();
1895        // Persisting the message.
1896
setMsgTxName(message);
1897        message.save();
1898      }
1899    }
1900
1901    for (Enumeration JavaDoc names = tSub.getNames(); names.hasMoreElements();) {
1902      subName = (String JavaDoc) names.nextElement();
1903      sub = (ClientSubscription) subsTable.get(subName);
1904      if (sub == null) continue;
1905
1906      // If the subscription is active, lauching a delivery sequence.
1907
if (sub.getActive()) {
1908        ConsumerMessages consM = sub.deliver();
1909        
1910        if (consM != null) {
1911          try {
1912            setCtx(sub.getContextId());
1913            if (activeCtx.getActivated())
1914              doReply(consM);
1915            else
1916              activeCtx.addPendingDelivery(consM);
1917          } catch (StateException pE) {
1918            // The context is lost: nothing to do.
1919
}
1920        }
1921      }
1922    }
1923  }
1924
1925  /**
1926   * Actually forwards an <code>ExceptionReply</code> coming from a destination
1927   * as a <code>MomExceptionReply</code> destinated to the requesting client.
1928   * <p>
1929   * If the wrapped exception is an <code>AccessException</code> thrown by
1930   * a <code>Topic</code> as a reply to a <code>SubscribeRequest</code>,
1931   * removing the corresponding subscriptions.
1932   */

1933  private void doReact(AgentId from, ExceptionReply rep) {
1934    if (logger.isLoggable(BasicLevel.DEBUG))
1935      logger.log(BasicLevel.DEBUG,
1936                 "ProxyImpl.doReact(" + from + ',' + rep + ')');
1937    MomException exc = rep.getException();
1938
1939    // The exception comes from a topic refusing the access: deleting the subs.
1940
if (exc instanceof AccessException) {
1941      if (logger.isLoggable(BasicLevel.DEBUG))
1942        logger.log(BasicLevel.DEBUG, " -> topicsTable.remove(" + from + ')');
1943      TopicSubscription tSub = (TopicSubscription) topicsTable.remove(from);
1944      if (tSub != null) {
1945        String JavaDoc name;
1946        ClientSubscription sub;
1947        for (Enumeration JavaDoc e = tSub.getNames(); e.hasMoreElements();) {
1948          name = (String JavaDoc) e.nextElement();
1949          sub = (ClientSubscription) subsTable.remove(name);
1950          sub.delete();
1951
1952          try {
1953            setCtx(sub.getContextId());
1954            activeCtx.removeSubName(name);
1955            doReply(new MomExceptionReply(rep.getCorrelationId(), exc));
1956          } catch (StateException pExc) {}
1957        }
1958        return;
1959      }
1960    }
1961    // Forwarding the exception to the client.
1962
try {
1963      setCtx(rep.getClientContext());
1964      doReply(new MomExceptionReply(rep.getCorrelationId(), exc));
1965    } catch (StateException pExc) {}
1966  }
1967
1968  /**
1969   * An <code>AdminReply</code> acknowledges the setting of a temporary
1970   * destination; nothing needs to be done.
1971   */

1972  private void doReact(AdminReply reply)
1973  {}
1974
1975  /**
1976   * Method implementing the JMS proxy reaction to an <code>UnknownAgent</code>
1977   * notification notifying that a destination does not exist or is deleted.
1978   * <p>
1979   * If it notifies of a deleted topic, the method removes the
1980   * corresponding subscriptions. If the wrapped request is messages sending,
1981   * the messages are sent to the DMQ.
1982   * <p>
1983   * A <code>JmsExceptReply</code> is sent to the concerned requester.
1984   * <p>
1985   * This case might also happen when sending a <code>ClientMessages</code>
1986   * to a dead message queue. In that case, the invalid DMQ identifier is set
1987   * to null.
1988   */

1989  private void doReact(UnknownAgent uA) {
1990    if (logger.isLoggable(BasicLevel.DEBUG))
1991      logger.log(BasicLevel.DEBUG, "ProxyImpl.doReact(" + uA + ')');
1992    Notification not = uA.not;
1993    AgentId agId = uA.agent;
1994
1995    if (logger.isLoggable(BasicLevel.INFO))
1996      logger.log(BasicLevel.INFO,
1997                 "--- " + this + " notified of invalid destination: " + agId.toString());
1998    
1999    // The deleted destination is a topic: deleting its subscriptions.
2000
if (logger.isLoggable(BasicLevel.DEBUG))
2001        logger.log(BasicLevel.DEBUG,
2002                   " -> topicsTable.remove(" + agId + ')');
2003    TopicSubscription tSub = (TopicSubscription) topicsTable.remove(agId);
2004    if (tSub != null) {
2005      String JavaDoc name;
2006      ClientSubscription sub;
2007      DestinationException exc;
2008      exc = new DestinationException("Destination " + agId +
2009                                     " does not exist.");
2010      for (Enumeration JavaDoc e = tSub.getNames(); e.hasMoreElements();) {
2011        name = (String JavaDoc) e.nextElement();
2012        sub = (ClientSubscription) subsTable.remove(name);
2013        sub.delete();
2014
2015        try {
2016          setCtx(sub.getContextId());
2017          activeCtx.removeSubName(name);
2018          doReply(new MomExceptionReply(sub.getSubRequestId(), exc));
2019        } catch (StateException pExc) {}
2020      }
2021      return;
2022    }
2023
2024    if (not instanceof AbstractRequest) {
2025      AbstractRequest req = (AbstractRequest) not;
2026
2027      // If the wrapped request is messages sending,forwarding them to the DMQ:
2028
if (req instanceof ClientMessages) {
2029        // If the queue actually was a dead message queue, updating its
2030
// identifier:
2031
if (dmqId != null && agId.equals(dmqId)) {
2032          // state change, so save.
2033
proxyAgent.setSave();
2034          dmqId = null;
2035          for (Enumeration JavaDoc keys = subsTable.keys(); keys.hasMoreElements();)
2036            ((ClientSubscription)
2037               subsTable.get(keys.nextElement())).setDMQId(null);
2038        }
2039        // Sending the messages again if not coming from the default DMQ:
2040
if (DeadMQueueImpl.getId() != null
2041            && ! agId.equals(DeadMQueueImpl.getId())) {
2042          // Setting 'deletedDest' attribute for each message
2043
for (Enumeration JavaDoc msgs = ((ClientMessages) req).getMessages().elements();
2044               msgs.hasMoreElements();) {
2045            org.objectweb.joram.shared.messages.Message msg = (org.objectweb.joram.shared.messages.Message) msgs.nextElement();
2046            msg.deletedDest = true;
2047          }
2048          sendToDMQ((ClientMessages) req);
2049        }
2050
2051        DestinationException exc;
2052        exc = new DestinationException("Destination " + agId +
2053                                       " does not exist.");
2054        MomExceptionReply mer = new MomExceptionReply(req.getRequestId(), exc);
2055        try {
2056          setCtx(req.getClientContext());
2057          // Contrary to a receive, send the error even if the
2058
// connection is not started.
2059
doReply(mer);
2060        } catch (StateException se) {
2061          if (logger.isLoggable(BasicLevel.DEBUG))
2062            logger.log(BasicLevel.DEBUG, "", se);
2063          // Do nothing (the context doesn't exist any more).
2064
}
2065      } else if (req instanceof ReceiveRequest) {
2066        DestinationException exc = new DestinationException(
2067          "Destination " + agId + " does not exist.");
2068        MomExceptionReply mer = new MomExceptionReply(
2069          req.getRequestId(), exc);
2070        try {
2071          setCtx(req.getClientContext());
2072          if (activeCtx.getActivated()) {
2073            doReply(mer);
2074          } else {
2075            activeCtx.addPendingDelivery(mer);
2076          }
2077        } catch (StateException se) {
2078          if (logger.isLoggable(BasicLevel.DEBUG))
2079            logger.log(BasicLevel.DEBUG, "", se);
2080          // Do nothing (the contexte doesn't exist any more).
2081
}
2082      }
2083      if (logger.isLoggable(BasicLevel.INFO))
2084        logger.log(BasicLevel.INFO,
2085                   "Connection " + req.getClientContext() +
2086                   " notified of the deletion of destination " + agId);
2087    }
2088  }
2089
2090  private void doReact(UserAdminRequestNot not) {
2091    org.objectweb.joram.shared.admin.AdminRequest adminRequest =
2092      not.getRequest();
2093    if (adminRequest instanceof GetSubscriptions) {
2094      doReact((GetSubscriptions)adminRequest,
2095              not.getReplyTo(),
2096              not.getRequestMsgId(),
2097              not.getReplyMsgId());
2098    } else if (adminRequest instanceof GetSubscriptionMessageIds) {
2099      doReact((GetSubscriptionMessageIds)adminRequest,
2100              not.getReplyTo(),
2101              not.getRequestMsgId(),
2102              not.getReplyMsgId());
2103    } else if (adminRequest instanceof GetSubscriptionMessage) {
2104      doReact((GetSubscriptionMessage)adminRequest,
2105              not.getReplyTo(),
2106              not.getRequestMsgId(),
2107              not.getReplyMsgId());
2108    } else if (adminRequest instanceof DeleteSubscriptionMessage) {
2109      doReact((DeleteSubscriptionMessage)adminRequest,
2110              not.getReplyTo(),
2111              not.getRequestMsgId(),
2112              not.getReplyMsgId());
2113    } else if (adminRequest instanceof GetSubscription) {
2114      doReact((GetSubscription)adminRequest,
2115              not.getReplyTo(),
2116              not.getRequestMsgId(),
2117              not.getReplyMsgId());
2118    } else if (adminRequest instanceof ClearSubscription) {
2119      doReact((ClearSubscription)adminRequest,
2120              not.getReplyTo(),
2121              not.getRequestMsgId(),
2122              not.getReplyMsgId());
2123    }
2124  }
2125
2126  private void doReact(GetSubscriptions request,
2127                       AgentId replyTo,
2128                       String JavaDoc requestMsgId,
2129                       String JavaDoc replyMsgId) {
2130    Enumeration JavaDoc keys = subsTable.keys();
2131    Enumeration JavaDoc values = subsTable.elements();
2132    String JavaDoc[] subNames = new String JavaDoc[subsTable.size()];
2133    String JavaDoc[] topicIds = new String JavaDoc[subsTable.size()];
2134    int[] messageCounts = new int[subsTable.size()];
2135    boolean[] durable = new boolean[subsTable.size()];
2136    int i = 0;
2137    while (keys.hasMoreElements()) {
2138      subNames[i] = (String JavaDoc)keys.nextElement();
2139      ClientSubscription cs =
2140        (ClientSubscription)values.nextElement();
2141      topicIds[i] = cs.getTopicId().toString();
2142      messageCounts[i] = cs.getMessageCount();
2143      durable[i] = cs.getDurable();
2144      i++;
2145    }
2146    GetSubscriptionsRep reply = new GetSubscriptionsRep(
2147      subNames, topicIds, messageCounts, durable);
2148    replyToTopic(reply, replyTo, requestMsgId, replyMsgId);
2149  }
2150
2151  /**
2152   * Returns the list of subscriptions for this user. Each subscription is
2153   * identified by its unique 'symbolic' name.
2154   *
2155   * @return The list of subscriptions for this user.
2156   */

2157  public String JavaDoc[] getSubscriptionNames() {
2158    Enumeration JavaDoc keys = subsTable.keys();
2159    String JavaDoc[] res = new String JavaDoc[subsTable.size()];
2160    int i = 0;
2161    while (keys.hasMoreElements()) {
2162      res[i] = (String JavaDoc)keys.nextElement();
2163      i++;
2164    }
2165    return res;
2166  }
2167
2168  /**
2169   * Returns the number of pending messages for an identified subscription.
2170   * The subscription must be identified by its unique 'symbolic' name.
2171   *
2172   * @return The number of pending message for the subscription.
2173   */

2174  public int getSubscriptionMessageCount(String JavaDoc subName) {
2175    ClientSubscription cs =
2176      (ClientSubscription)subsTable.get(subName);
2177    return cs.getMessageCount();
2178  }
2179
2180  /**
2181   * Returns the unique identifier of the topic related to this subscription.
2182   *
2183   * @param subName The subscription unique name.
2184   * @return the unique identifier of the topic related to this subscription.
2185   */

2186  public String JavaDoc getSubscriptionTopicId(String JavaDoc subName) {
2187    ClientSubscription cs =
2188      (ClientSubscription)subsTable.get(subName);
2189    return cs.getTopicId().toString();
2190  }
2191
2192  private void doReact(GetSubscriptionMessageIds request,
2193                       AgentId replyTo,
2194                       String JavaDoc requestMsgId,
2195                       String JavaDoc replyMsgId) {
2196    String JavaDoc subName = request.getSubscriptionName();
2197    ClientSubscription cs = null;
2198    if (subName != null) {
2199      cs = (ClientSubscription)subsTable.get(subName);
2200    }
2201    if (cs != null) {
2202      GetSubscriptionMessageIdsRep reply =
2203        new GetSubscriptionMessageIdsRep(
2204          cs.getMessageIds());
2205      replyToTopic(reply, replyTo, requestMsgId, replyMsgId);
2206    } else {
2207      replyToTopic(
2208        new org.objectweb.joram.shared.admin.AdminReply(
2209          false, "Subscription not found: " +
2210          request.getSubscriptionName()),
2211        replyTo, requestMsgId, replyMsgId);
2212    }
2213  }
2214
2215  /**
2216   * Returns the list of message's identifiers for a subscription.
2217   * The subscription must be identified by its unique 'symbolic' name.
2218   *
2219   * @param subName The subscription unique name.
2220   * @return the list of message's identifiers for the subscription.
2221   */

2222  public String JavaDoc[] getSubscriptionMessageIds(String JavaDoc subName) {
2223    ClientSubscription cs =
2224      (ClientSubscription)subsTable.get(subName);
2225    if (cs != null) {
2226      return cs.getMessageIds();
2227    } else return null;
2228  }
2229
2230  private void doReact(GetSubscription request,
2231                       AgentId replyTo,
2232                       String JavaDoc requestMsgId,
2233                       String JavaDoc replyMsgId) {
2234    String JavaDoc subName = request.getSubscriptionName();
2235    ClientSubscription cs = null;
2236    if (subName != null) {
2237      cs = (ClientSubscription)subsTable.get(subName);
2238    }
2239    if (cs != null) {
2240      GetSubscriptionRep reply =
2241        new GetSubscriptionRep(
2242          cs.getTopicId().toString(),
2243          cs.getMessageCount(),
2244          cs.getDurable());
2245      replyToTopic(reply, replyTo, requestMsgId, replyMsgId);
2246    } else {
2247      replyToTopic(
2248        new org.objectweb.joram.shared.admin.AdminReply(
2249          false, "Subscription not found: " +
2250          request.getSubscriptionName()),
2251        replyTo, requestMsgId, replyMsgId);
2252    }
2253  }
2254
2255  private void doReact(GetSubscriptionMessage request,
2256                       AgentId replyTo,
2257                       String JavaDoc requestMsgId,
2258                       String JavaDoc replyMsgId) {
2259    ClientSubscription cs = null;
2260    String JavaDoc subName = request.getSubscriptionName();
2261    if (subName != null) {
2262      cs = (ClientSubscription)subsTable.get(subName);
2263    }
2264    if (cs != null) {
2265      String JavaDoc msgId = request.getMessageId();
2266      Message message = null;
2267      if (msgId != null) {
2268        message = cs.getMessage(msgId);
2269      }
2270      if (message != null) {
2271        GetSubscriptionMessageRep reply =
2272          new GetSubscriptionMessageRep(message.msg);
2273        replyToTopic(reply, replyTo, requestMsgId, replyMsgId);
2274      } else {
2275        replyToTopic(
2276          new org.objectweb.joram.shared.admin.AdminReply(
2277            false, "Message not found: " +
2278            request.getMessageId()),
2279          replyTo, requestMsgId, replyMsgId);
2280      }
2281    } else {
2282      replyToTopic(
2283        new org.objectweb.joram.shared.admin.AdminReply(
2284          false, "Subscription not found: " +
2285          subName),
2286        replyTo, requestMsgId, replyMsgId);
2287    }
2288  }
2289
2290  /**
2291   * Returns the description of a particular pending message in a subscription.
2292   * The subscription is identified by its unique name, the message is pointed
2293   * out through its unique identifier.
2294   * The description includes the type and priority of the message.
2295   *
2296   * @param subName The subscription unique name.
2297   * @param msgId The unique message's identifier.
2298   * @return the description of the message.
2299   */

2300  public CompositeDataSupport JavaDoc getSubscriptionMessage(
2301    String JavaDoc subName,
2302    String JavaDoc msgId) throws Exception JavaDoc {
2303    if (logger.isLoggable(BasicLevel.DEBUG))
2304      logger.log(BasicLevel.DEBUG,
2305                 "ProxyImpl.getSubscriptionMessage(" +
2306                 subName + ',' + msgId + ')');
2307    ClientSubscription cs =
2308      (ClientSubscription) subsTable.get(subName);
2309    if (cs != null) {
2310      Message msg = cs.getMessage(msgId);
2311      if (msg != null) {
2312        return MessageJMXWrapper.createCompositeDataSupport(msg);
2313      } else {
2314        throw new Exception JavaDoc("Message not found");
2315      }
2316    } else {
2317      throw new Exception JavaDoc("Subscription not found");
2318    }
2319  }
2320
2321  private void doReact(DeleteSubscriptionMessage request,
2322                       AgentId replyTo,
2323                       String JavaDoc requestMsgId,
2324                       String JavaDoc replyMsgId) {
2325    String JavaDoc subName = request.getSubscriptionName();
2326    ClientSubscription cs = null;
2327    if (subName != null) {
2328      cs = (ClientSubscription)subsTable.get(subName);
2329    }
2330    if (cs != null) {
2331      cs.deleteMessage(request.getMessageId());
2332      replyToTopic(
2333        new org.objectweb.joram.shared.admin.AdminReply(
2334          true, null),
2335        replyTo, requestMsgId, replyMsgId);
2336    } else {
2337      replyToTopic(
2338        new org.objectweb.joram.shared.admin.AdminReply(
2339          false, "Subscription not found: " +
2340          request.getSubscriptionName()),
2341        replyTo, requestMsgId, replyMsgId);
2342    }
2343  }
2344
2345  /**
2346   * Deletes a particular pending message in a subscription.
2347   * The subscription is identified by its unique name, the message is pointed
2348   * out through its unique identifier.
2349   *
2350   * @param subName The subscription unique name.
2351   * @param msgId The unique message's identifier.
2352   */

2353  public void deleteSubscriptionMessage(String JavaDoc subName,
2354                                        String JavaDoc msgId) {
2355    ClientSubscription cs = (ClientSubscription)subsTable.get(subName);
2356    if (cs != null) {
2357      cs.deleteMessage(msgId);
2358    }
2359  }
2360  
2361  private void doReact(ClearSubscription request,
2362                       AgentId replyTo,
2363                       String JavaDoc requestMsgId,
2364                       String JavaDoc replyMsgId) {
2365    String JavaDoc subName = request.getSubscriptionName();
2366    ClientSubscription cs = null;
2367    if (subName != null) {
2368      cs = (ClientSubscription) subsTable.get(subName);
2369    }
2370    if (cs != null) {
2371      cs.clear();
2372      replyToTopic(
2373        new org.objectweb.joram.shared.admin.AdminReply(
2374          true, null),
2375        replyTo, requestMsgId, replyMsgId);
2376    } else {
2377      replyToTopic(
2378        new org.objectweb.joram.shared.admin.AdminReply(
2379          false, "Subscription not found: " +
2380          request.getSubscriptionName()),
2381        replyTo, requestMsgId, replyMsgId);
2382    }
2383  }
2384
2385  private void replyToTopic(
2386    org.objectweb.joram.shared.admin.AdminReply reply,
2387    AgentId replyTo,
2388    String JavaDoc requestMsgId,
2389    String JavaDoc replyMsgId) {
2390    org.objectweb.joram.shared.messages.Message message = new org.objectweb.joram.shared.messages.Message();
2391    message.correlationId = requestMsgId;
2392    message.timestamp = System.currentTimeMillis();
2393    message.setDestination(replyTo.toString(), Topic.TOPIC_TYPE);
2394    message.id = replyMsgId;;
2395    try {
2396      message.setObject(reply);
2397      ClientMessages clientMessages = new ClientMessages(-1, -1, message);
2398      Channel.sendTo(replyTo, clientMessages);
2399    } catch (Exception JavaDoc exc) {
2400      if (logger.isLoggable(BasicLevel.ERROR))
2401        logger.log(BasicLevel.ERROR, "", exc);
2402      throw new Error JavaDoc(exc.getMessage());
2403    }
2404  }
2405
2406  /**
2407   * Updates the reference to the active context.
2408   *
2409   * @param key Key of the activated context.
2410   *
2411   * @exception StateException If the context has actually been closed or
2412   * lost.
2413   */

2414  private void setCtx(int key) throws StateException {
2415    if (logger.isLoggable(BasicLevel.DEBUG))
2416      logger.log(BasicLevel.DEBUG, "ProxyImpl.setCtx(" + key + ')');
2417
2418    if (key < 0) throw new StateException("Invalid context: " + key);
2419
2420    // If the required context is the last used, no need to update the
2421
// references:
2422
if (key == activeCtxId) return;
2423
2424    // Else, updating the activeCtx reference:
2425
setActiveCtxId(key);
2426    activeCtx = (ClientContext) contexts.get(new Integer JavaDoc(key));
2427
2428    // If context not found, throwing an exception:
2429
if (activeCtx == null) {
2430      setActiveCtxId(-1);
2431      activeCtx = null;
2432      throw new StateException("Context " + key + " is closed or broken.");
2433    }
2434  }
2435 
2436  /**
2437   * Method used for sending an <code>AbstractJmsReply</code> back to an
2438   * external client within the active context.
2439   *
2440   * @param rep The reply to send.
2441   */

2442  private void doReply(AbstractJmsReply reply) {
2443    doReply(activeCtxId, reply);
2444  }
2445
2446  ClientContext getClientContext(int ctxId) {
2447    return (ClientContext)contexts.get(
2448      new Integer JavaDoc(ctxId));
2449  }
2450
2451  /**
2452   * Method used for sending an <code>AbstractJmsReply</code> back to an
2453   * external client through a given context.
2454   *
2455   * @param key The context through witch replying.
2456   * @param rep The reply to send.
2457   */

2458  private void doReply(int key, AbstractJmsReply reply) {
2459    if (logger.isLoggable(BasicLevel.DEBUG))
2460      logger.log(BasicLevel.DEBUG,
2461                 "ProxyImpl.doReply(" + key + ',' + reply + ')');
2462    proxyAgent.sendToClient(key, reply);
2463  }
2464
2465  /**
2466   * Method used for sending messages to the appropriate dead message queue.
2467   */

2468  private void sendToDMQ(ClientMessages messages)
2469  {
2470    if (dmqId != null)
2471      proxyAgent.sendNot(dmqId, messages);
2472    else if (DeadMQueueImpl.getId() != null)
2473      proxyAgent.sendNot(DeadMQueueImpl.getId(), messages);
2474  }
2475
2476  void cleanPendingMessages(long currentTime) {
2477    if (logger.isLoggable(BasicLevel.DEBUG))
2478      logger.log(BasicLevel.DEBUG,
2479                 "ProxyImpl.cleanPendingMessages(" + messagesTable.size() + ')');
2480
2481    
2482    String JavaDoc id = null;
2483    Message message = null;
2484    ClientMessages deadMessages = null;
2485
2486    for (Enumeration JavaDoc ids = messagesTable.keys(); ids.hasMoreElements(); ) {
2487      id = (String JavaDoc) ids.nextElement();
2488      message = (Message) messagesTable.get(id);
2489      if ((message == null) || message.isValid(currentTime)) continue;
2490
2491      messagesTable.remove(id);
2492      message.delete();
2493      message.msg.expired = true;
2494
2495      if (deadMessages == null)
2496        deadMessages = new ClientMessages();
2497      deadMessages.addMessage(message.msg);
2498
2499      if (logger.isLoggable(BasicLevel.DEBUG))
2500        logger.log(BasicLevel.DEBUG,
2501                   "ProxyImpl expired message " + message.getIdentifier());
2502    }
2503    // If needed, sending the dead messages to the DMQ:
2504
if (deadMessages != null) sendToDMQ(deadMessages);
2505
2506    if (logger.isLoggable(BasicLevel.DEBUG))
2507      logger.log(BasicLevel.DEBUG,
2508                 "ProxyImpl.cleanPendingMessages -> " + messagesTable.size());
2509  }
2510
2511  /**
2512   * This method deletes the proxy by notifying its connected clients,
2513   * denying the non acknowledged messages, deleting the temporary
2514   * destinations, removing the subscriptions.
2515   *
2516   * @exception Exception If the requester is not an administrator.
2517   */

2518  public void deleteProxy(AgentId from) throws Exception JavaDoc
2519  {
2520    if (logger.isLoggable(BasicLevel.DEBUG))
2521      logger.log(BasicLevel.DEBUG,
2522                 "--- " + this + " notified to be deleted.");
2523
2524    if (! from.equals(AdminTopicImpl.getReference().getId()))
2525      throw new Exception JavaDoc();
2526
2527    // Notifying the connected clients:
2528
Enumeration JavaDoc keys = contexts.keys();
2529    int key;
2530    while (keys.hasMoreElements()) {
2531      key = ((Integer JavaDoc) keys.nextElement()).intValue();
2532      try {
2533        setCtx(key);
2534
2535        doReply(new MomExceptionReply(new StateException("Client proxy is deleted.")));
2536
2537        // Denying the non acknowledged messages:
2538
AgentId id;
2539        for (Enumeration JavaDoc ids = activeCtx.getDeliveringQueues();
2540             ids.hasMoreElements();) {
2541          id = (AgentId) ids.nextElement();
2542          proxyAgent.sendNot(id, new DenyRequest(activeCtxId));
2543        }
2544
2545        // Deleting the temporary destinations:
2546
AgentId destId;
2547        for (Enumeration JavaDoc dests = activeCtx.getTempDestinations();
2548             dests.hasMoreElements();) {
2549          destId = (AgentId) dests.nextElement();
2550          deleteTemporaryDestination(destId);
2551  
2552          if (logger.isLoggable(BasicLevel.DEBUG))
2553            logger.log(BasicLevel.DEBUG,
2554                       "Sending DeleteNot to temporary destination " +
2555                       destId.toString());
2556        }
2557      } catch (StateException pE) {}
2558    }
2559
2560    // Removing all proxy's subscriptions:
2561
AgentId destId;
2562    for (Enumeration JavaDoc topics = topicsTable.keys(); topics.hasMoreElements();) {
2563      destId = (AgentId) topics.nextElement();
2564      if (logger.isLoggable(BasicLevel.DEBUG))
2565        logger.log(BasicLevel.DEBUG, " -> topicsTable.remove(" + destId + ')');
2566      topicsTable.remove(destId);
2567      updateSubscriptionToTopic(destId, -1, -1);
2568    }
2569  }
2570
2571  
2572  /**
2573   * Updates the proxy's subscription to a topic.
2574   *
2575   * @param topicId Identifier of the topic to subscribe to.
2576   * @param contextId Identifier of the subscription context.
2577   * @param requestId Identifier of the subscription request.
2578   *
2579   * @return <code>true</code> if a <code>SubscribeRequest</code> has been
2580   * sent to the topic.
2581   */

2582  private boolean updateSubscriptionToTopic(AgentId topicId,
2583                                            int contextId,
2584                                            int requestId)
2585  {
2586    if (logger.isLoggable(BasicLevel.DEBUG))
2587      logger.log(BasicLevel.DEBUG,
2588                 "ProxyImpl.updateSubscriptionToTopic(" +
2589                 topicId + ',' + contextId + ',' + requestId + ')');
2590    TopicSubscription tSub = (TopicSubscription) topicsTable.get(topicId);
2591
2592    // No more subs to this topic: unsubscribing.
2593
if (tSub == null || tSub.isEmpty()) {
2594      if (logger.isLoggable(BasicLevel.DEBUG))
2595        logger.log(BasicLevel.DEBUG,
2596                   " -> topicsTable.remove(" + topicId + ')');
2597      topicsTable.remove(topicId);
2598      proxyAgent.sendNot(topicId,
2599                         new UnsubscribeRequest(contextId, requestId));
2600      return false;
2601    }
2602
2603    // Otherwise, updating the subscription if the selector evolved.
2604
String JavaDoc builtSelector = tSub.buildSelector();
2605    if (tSub.getLastSelector() != null
2606        && builtSelector.equals(tSub.getLastSelector()))
2607      return false;
2608
2609    tSub.setLastSelector(builtSelector);
2610    proxyAgent.sendNot(topicId, new SubscribeRequest(contextId,
2611                                                     requestId,
2612                                                     builtSelector));
2613   
2614    return true;
2615  }
2616
2617  public AgentId getId() {
2618    return proxyAgent.getId();
2619  }
2620
2621  public String JavaDoc getStringId() {
2622    return proxyAgent.getId().toString();
2623  }
2624
2625  public void readBag(ObjectInputStream JavaDoc in)
2626    throws IOException JavaDoc, ClassNotFoundException JavaDoc {
2627    if (logger.isLoggable(BasicLevel.DEBUG))
2628      logger.log(BasicLevel.DEBUG,
2629                 "ProxyImpl[" + proxyAgent.getId() + "].readbag()");
2630
2631    activeCtxId = in.readInt();
2632    /* // Orders elements is unknown, not use read bag in the same order
2633       Enumeration elements = contexts.elements();
2634       while (elements.hasMoreElements()) {
2635          ClientContext cc = (ClientContext)elements.nextElement();
2636          cc.setProxyAgent(proxyAgent);
2637          cc.readBag(in);
2638       }
2639       elements = subsTable.elements();
2640       while (elements.hasMoreElements()) {
2641          ClientSubscription cs = (ClientSubscription)elements.nextElement();
2642          cs.setProxyAgent(proxyAgent);
2643          cs.readBag(in);
2644       }*/

2645    /*** part modified */
2646    int size = in.readInt();
2647    Object JavaDoc obj=null;
2648    for(int j=0;j<size;j++){
2649    obj=in.readObject();
2650    ClientContext cc = (ClientContext) contexts.get((Integer JavaDoc)obj);
2651    cc.setProxyAgent(proxyAgent);
2652    cc.readBag(in);
2653    }
2654    size = in.readInt();
2655    for(int j=0;j<size;j++){
2656    obj=in.readObject();
2657    ClientSubscription cs = (ClientSubscription)subsTable.get((String JavaDoc)obj);
2658    cs.setProxyAgent(proxyAgent);
2659    cs.readBag(in);
2660    }
2661    /*** end part modified */
2662    
2663    activeCtx = (ClientContext)contexts.get(
2664      new Integer JavaDoc(activeCtxId));
2665
2666    Vector JavaDoc messages = (Vector JavaDoc)in.readObject();
2667
2668    if (logger.isLoggable(BasicLevel.DEBUG))
2669      logger.log(BasicLevel.DEBUG, " -> messages = " + messages);
2670    
2671    topicsTable = new Hashtable JavaDoc();
2672    messagesTable = new Hashtable JavaDoc();
2673
2674    Vector JavaDoc topics = new Vector JavaDoc();
2675    TopicSubscription tSub;
2676
2677
2678    for (Enumeration JavaDoc subNames = subsTable.keys();
2679         subNames.hasMoreElements();) {
2680      String JavaDoc subName = (String JavaDoc) subNames.nextElement();
2681
2682      if (logger.isLoggable(BasicLevel.DEBUG))
2683        logger.log(BasicLevel.DEBUG, " -> subName = " + subName);
2684      
2685      ClientSubscription cSub = (ClientSubscription) subsTable.get(subName);
2686      AgentId destId = cSub.getTopicId();
2687      if (! topics.contains(destId))
2688        topics.add(destId);
2689      cSub.reinitialize(getStringId(),
2690                        messagesTable,
2691                        messages,
2692                        false);
2693
2694      if (logger.isLoggable(BasicLevel.DEBUG))
2695        logger.log(BasicLevel.DEBUG, " -> destId = " + destId + ')');
2696      
2697      tSub = (TopicSubscription) topicsTable.get(destId);
2698      if (tSub == null) {
2699        tSub = new TopicSubscription();
2700        topicsTable.put(destId, tSub);
2701      }
2702      tSub.putSubscription(subName, cSub.getSelector());
2703    }
2704    if (logger.isLoggable(BasicLevel.DEBUG))
2705        logger.log(BasicLevel.DEBUG, " -> topicsTable = " + topicsTable);
2706
2707    // DF: seems not useful here
2708
// for (Enumeration topicIds = topics.elements();
2709
// topicIds.hasMoreElements();) {
2710
// updateSubscriptionToTopic((AgentId) topicIds.nextElement(), -1, -1);
2711
// }
2712
}
2713
2714  public void writeBag(ObjectOutputStream JavaDoc out)
2715    throws IOException JavaDoc {
2716    if (logger.isLoggable(BasicLevel.DEBUG))
2717      logger.log(BasicLevel.DEBUG,
2718                 "ProxyImpl[" + proxyAgent.getId() + "].writeBag()");
2719
2720    out.writeInt(activeCtxId);
2721    
2722    /* Enumeration elements = contexts.elements();
2723    while (elements.hasMoreElements()) {
2724       ((ClientContext)elements.nextElement()).writeBag(out);
2725    }
2726    elements = subsTable.elements();
2727    while (elements.hasMoreElements()) {
2728       ((ClientSubscription)elements.nextElement()).writeBag(out);
2729    }*/

2730    /*** part modified */
2731    // the number of keys in contexts hashtable
2732
out.writeInt(contexts.size());
2733    Enumeration JavaDoc elements = contexts.keys();
2734    Object JavaDoc obj=null;
2735    while (elements.hasMoreElements()) {
2736    obj=elements.nextElement();
2737    out.writeObject(obj);
2738    ((ClientContext)contexts.get((Integer JavaDoc)obj)).writeBag(out);
2739    }
2740    // the number of keys in subsTable hashtable
2741
out.writeInt(subsTable.size());
2742       elements = subsTable.keys();
2743    while (elements.hasMoreElements()) {
2744    obj=elements.nextElement();
2745    out.writeObject(obj);
2746    ((ClientSubscription)subsTable.get((String JavaDoc)obj)).writeBag(out);
2747    }
2748    /*** end part modified */
2749
2750    Vector JavaDoc messages = new Vector JavaDoc();
2751    elements = messagesTable.elements();
2752    while (elements.hasMoreElements()) {
2753      messages.addElement(elements.nextElement());
2754    }
2755
2756    if (logger.isLoggable(BasicLevel.DEBUG))
2757      logger.log(BasicLevel.DEBUG, " -> messages = " + messages + ')');
2758
2759    out.writeObject(messages);
2760  
2761  }
2762}
2763
2764/**
2765 * The <code>Xid</code> internal class is a utility class representing
2766 * a global transaction identifier.
2767 */

2768class Xid implements java.io.Serializable JavaDoc {
2769  byte[] bq;
2770  int fi;
2771  byte[] gti;
2772
2773
2774  Xid(byte[] bq, int fi, byte[] gti)
2775  {
2776    this.bq = bq;
2777    this.fi = fi;
2778    this.gti = gti;
2779  }
2780  
2781  public boolean equals(Object JavaDoc o)
2782  {
2783    if (! (o instanceof Xid))
2784      return false;
2785
2786    Xid other = (Xid) o;
2787
2788    return java.util.Arrays.equals(bq, other.bq)
2789           && fi == other.fi
2790           && java.util.Arrays.equals(gti, other.gti);
2791  }
2792
2793  public int hashCode()
2794  {
2795    return (new String JavaDoc(bq) + "-" + new String JavaDoc(gti)).hashCode();
2796  }
2797}
2798
Popular Tags