KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > proxies > ProxyImpl


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies
4  * Copyright (C) 2004 France Telecom R&D
5  * Copyright (C) 2003 - 2004 Bull SA
6  * Copyright (C) 1996 - 2000 Dyade
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License as published by the Free Software Foundation; either
11  * version 2.1 of the License, or any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21  * USA.
22  *
23  * Initial developer(s): Frederic Maistre (INRIA)
24  * Contributor(s): ScalAgent Distributed Technologies
25  */

26 package org.objectweb.joram.mom.proxies;
27
28 import java.io.IOException JavaDoc;
29 import java.io.ObjectInputStream JavaDoc;
30 import java.io.ObjectOutputStream JavaDoc;
31
32 import java.util.Enumeration JavaDoc;
33 import java.util.Hashtable JavaDoc;
34 import java.util.Vector JavaDoc;
35
36 import fr.dyade.aaa.agent.AgentId;
37 import fr.dyade.aaa.agent.DeleteNot;
38 import fr.dyade.aaa.agent.Notification;
39 import fr.dyade.aaa.agent.UnknownAgent;
40 import fr.dyade.aaa.agent.UnknownNotificationException;
41 import fr.dyade.aaa.agent.Channel;
42
43 import org.objectweb.joram.mom.dest.*;
44 import org.objectweb.joram.mom.notifications.*;
45 import org.objectweb.joram.mom.messages.Message;
46
47 import org.objectweb.joram.shared.admin.DeleteUser;
48 import org.objectweb.joram.shared.admin.UpdateUser;
49
50 import org.objectweb.joram.shared.admin.GetSubscriptions;
51 import org.objectweb.joram.shared.admin.GetSubscriptionsRep;
52 import org.objectweb.joram.shared.admin.GetSubscriptionMessageIds;
53 import org.objectweb.joram.shared.admin.GetSubscriptionMessageIdsRep;
54 import org.objectweb.joram.shared.admin.GetSubscriptionMessage;
55 import org.objectweb.joram.shared.admin.GetSubscriptionMessageRep;
56 import org.objectweb.joram.shared.admin.DeleteSubscriptionMessage;
57 import org.objectweb.joram.shared.admin.GetSubscription;
58 import org.objectweb.joram.shared.admin.GetSubscriptionRep;
59 import org.objectweb.joram.shared.admin.ClearSubscription;
60
61 import org.objectweb.joram.shared.client.*;
62 import org.objectweb.joram.shared.excepts.*;
63
64 import javax.management.openmbean.CompositeDataSupport JavaDoc;
65
66 import fr.dyade.aaa.util.Debug;
67 import org.objectweb.util.monolog.api.BasicLevel;
68 import org.objectweb.util.monolog.api.Logger;
69
70 /**
71  * The <code>ProxyImpl</code> class implements the MOM proxy behaviour,
72  * basically forwarding client requests to MOM destinations and MOM
73  * destinations replies to clients.
74  */

75 public class ProxyImpl implements java.io.Serializable JavaDoc, ProxyImplMBean {
76   public static Logger logger = Debug.getLogger(ProxyImpl.class.getName());
77
78   /** period to run the cleaning task, by default 60s. */
79   protected long period = 60000L;
80
81   /**
82    * Returns the period value of this queue, -1 if not set.
83    *
84    * @return the period value of this queue; -1 if not set.
85    */

86   public long getPeriod() {
87     return period;
88   }
89
90   /**
91    * Sets or unsets the period for this queue.
92    *
93    * @param period The period value to be set or -1 for unsetting previous
94    * value.
95    */

96   public void setPeriod(long period) {
97     if ((this.period == -1L) && (period != -1L)) {
98       // Schedule the CleaningTask.
99
Channel.sendTo(proxyAgent.getId(), new WakeUpNot());
100     }
101     this.period = period;
102   }
103
104   /**
105    * Identifier of this proxy dead message queue, <code>null</code> for DMQ
106    * not set.
107    */

108   private AgentId dmqId = null;
109   /**
110    * Threshold value, 0 or negative for no threshold, <code>null</code> for
111    * value not set.
112    */

113   private Integer JavaDoc threshold = null;
114
115   /**
116    * Table of the proxy's <code>ClientContext</code> instances.
117    * <p>
118    * <b>Key:</b> context identifier<br>
119    * <b>Value:</b> context
120    */

121   private Hashtable JavaDoc contexts;
122   /**
123    * Table holding the <code>ClientSubscription</code> instances.
124    * <p>
125    * <b>Key:</b> subsription name<br>
126    * <b>Value:</b> client subscription
127    */

128   private Hashtable JavaDoc subsTable;
129   /**
130    * Table holding the recovered transactions branches.
131    * <p>
132    * <b>Key:</b> transaction identifier<br>
133    * <b>Value:</b> <code>XACnxPrepare</code> instance
134    */

135   private Hashtable JavaDoc recoveredTransactions;
136
137   /** Counter of message arrivals from topics. */
138   private long arrivalsCounter = 0;
139
140   /** The reference of the agent hosting the proxy. */
141   private ProxyAgentItf proxyAgent;
142   /**
143    * Table holding the <code>TopicSubscription</code> instances.
144    * <p>
145    * <b>Key:</b> topic identifier<br>
146    * <b>Value:</b> topic subscription
147    */

148   private transient Hashtable JavaDoc topicsTable;
149   /**
150    * Table holding the subsriptions' messages.
151    * <p>
152    * <b>Key:</b> message identifier<br>
153    * <b>Value:</b> message
154    */

155   private transient Hashtable JavaDoc messagesTable;
156
157   /**
158    * Identifier of the active context.
159    * Value -1 means that there's no active
160    * context.
161    */

162   private transient int activeCtxId;
163   /** Reference to the active <code>ClientContext</code> instance. */
164   private transient ClientContext activeCtx;
165
166   /**
167    * Constructs a <code>ProxyImpl</code> instance.
168    */

169   public ProxyImpl(ProxyAgentItf proxyAgent) {
170     contexts = new Hashtable JavaDoc();
171     subsTable = new Hashtable JavaDoc();
172     this.proxyAgent = proxyAgent;
173     if (logger.isLoggable(BasicLevel.DEBUG))
174       logger.log(BasicLevel.DEBUG, this + ": created.");
175   }
176
177   /**
178    * Returns a string representation of this user's proxy.
179    */

180   public String JavaDoc toString() {
181     if (proxyAgent == null)
182       return "ProxyImpl:";
183     else
184       return "ProxyImpl:" + proxyAgent.getId();
185   }
186
187
188   /**
189    * (Re)initializes the proxy.
190    *
191    * @exception Exception If the proxy state could not be fully retrieved,
192    * leading to an inconsistent state.
193    */

194   public void initialize(boolean firstTime) throws Exception JavaDoc {
195     if (logger.isLoggable(BasicLevel.DEBUG))
196       logger.log(BasicLevel.DEBUG, "--- " + this + " (re)initializing...");
197  
198     topicsTable = new Hashtable JavaDoc();
199     messagesTable = new Hashtable JavaDoc();
200
201     setActiveCtxId(-1);
202     
203     // Re-initializing after a crash or a server stop.
204

205     // Browsing the pre-crash contexts:
206
ClientContext activeCtx;
207     AgentId destId;
208     for (Enumeration JavaDoc ctxIds = contexts.keys(); ctxIds.hasMoreElements();) {
209       activeCtx = (ClientContext) contexts.remove(ctxIds.nextElement());
210
211       // Denying the non acknowledged messages:
212
for (Enumeration JavaDoc queueIds = activeCtx.getDeliveringQueues();
213            queueIds.hasMoreElements();) {
214         destId = (AgentId) queueIds.nextElement();
215         proxyAgent.sendNot(destId, new DenyRequest(activeCtx.getId()));
216
217         if (logger.isLoggable(BasicLevel.DEBUG))
218           logger.log(BasicLevel.DEBUG,
219                      "Denies messages on queue " + destId.toString());
220       }
221
222       // Saving the prepared transactions.
223
Enumeration JavaDoc xids = activeCtx.getTxIds();
224       Xid xid;
225       XACnxPrepare recoveredPrepare;
226       XACnxPrepare prepare;
227       while (xids.hasMoreElements()) {
228         if (recoveredTransactions == null)
229           recoveredTransactions = new Hashtable JavaDoc();
230
231         xid = (Xid) xids.nextElement();
232
233         recoveredPrepare = (XACnxPrepare) recoveredTransactions.get(xid);
234         prepare = activeCtx.getTxPrepare(xid);
235
236         if (recoveredPrepare == null)
237           recoveredTransactions.put(xid, prepare);
238         else {
239           recoveredPrepare.getSendings().addAll(prepare.getSendings());
240           recoveredPrepare.getAcks().addAll(prepare.getAcks());
241         }
242       }
243
244       // Deleting the temporary destinations:
245
for (Enumeration JavaDoc tempDests = activeCtx.getTempDestinations();
246            tempDests.hasMoreElements();) {
247         destId = (AgentId) tempDests.nextElement();
248         deleteTemporaryDestination(destId);
249   
250         if (logger.isLoggable(BasicLevel.DEBUG))
251           logger.log(BasicLevel.DEBUG,
252                      "Deletes temporary destination " + destId.toString());
253       }
254     }
255
256     // Retrieving the subscriptions' messages.
257
Vector JavaDoc messages = Message.loadAll(getMsgTxname());
258
259     if (subsTable.isEmpty()) {
260       // it is possible because we always save MessageSoftRef
261
// so we must delete all message.
262
Message.deleteAll(getMsgTxname());
263     }
264     
265     // Browsing the pre-crash subscriptions:
266
String JavaDoc subName;
267     ClientSubscription cSub;
268     Vector JavaDoc topics = new Vector JavaDoc();
269     TopicSubscription tSub;
270     for (Enumeration JavaDoc subNames = subsTable.keys();
271          subNames.hasMoreElements();) {
272       subName = (String JavaDoc) subNames.nextElement();
273       cSub = (ClientSubscription) subsTable.get(subName);
274       destId = cSub.getTopicId();
275       if (! topics.contains(destId))
276         topics.add(destId);
277       // Deleting the non durable subscriptions.
278
if (! cSub.getDurable())
279         subsTable.remove(subName);
280       // Reinitializing the durable ones.
281
else {
282         cSub.setProxyAgent(proxyAgent);
283         cSub.reinitialize(getStringId(),
284                           messagesTable,
285                           messages,
286                           true);
287         tSub = (TopicSubscription) topicsTable.get(destId);
288         if (tSub == null) {
289           tSub = new TopicSubscription();
290           topicsTable.put(destId, tSub);
291         }
292         tSub.putSubscription(subName, cSub.getSelector());
293       }
294     }
295     // Browsing the topics and updating their subscriptions.
296
for (Enumeration JavaDoc topicIds = topics.elements();
297          topicIds.hasMoreElements();)
298       updateSubscriptionToTopic((AgentId) topicIds.nextElement(), -1, -1);
299   }
300
301   private void setActiveCtxId(int activeCtxId) {
302     if (logger.isLoggable(BasicLevel.DEBUG))
303       logger.log(BasicLevel.DEBUG,
304                  "ProxyImpl.setActiveCtxId(" + activeCtxId + ')');
305     this.activeCtxId = activeCtxId;
306   }
307
308   /**
309    * Method processing clients requests.
310    * <p>
311    * Some of the client requests are directly forwarded, some others are
312    * sent to the proxy so that their processing occurs in a transaction.
313    * <p>
314    * A <code>MomExceptionReply</code> wrapping a <tt>DestinationException</tt>
315    * might be sent back if a target destination can't be identified.
316    */

317   public void reactToClientRequest(int key, AbstractJmsRequest request)
318   {
319     try {
320       if (logger.isLoggable(BasicLevel.DEBUG))
321         logger.log(BasicLevel.DEBUG,
322                    "--- " + this + " got " + request.getClass().getName() +
323                    " with id: " + request.getRequestId() +
324                    " through activeCtx: " + key);
325
326       if (request instanceof ProducerMessages)
327         reactToClientRequest(key, (ProducerMessages) request);
328       else if (request instanceof ConsumerReceiveRequest)
329         reactToClientRequest(key, (ConsumerReceiveRequest) request);
330       else if (request instanceof ConsumerSetListRequest)
331         reactToClientRequest(key, (ConsumerSetListRequest) request);
332       else if (request instanceof QBrowseRequest)
333         reactToClientRequest(key, (QBrowseRequest) request);
334       else if (request instanceof JmsRequestGroup)
335         reactToClientRequest(key, (JmsRequestGroup) request);
336       else {
337         doReact(key, request);
338       }
339     }
340     // Catching an exception due to an invalid agent identifier to
341
// forward the request to:
342
catch (IllegalArgumentException JavaDoc iE) {
343       DestinationException dE =
344         new DestinationException("Proxy could not forward the request to"
345                                  + " incorrectly identified destination: "
346                                  + iE);
347
348       doReply(key, new MomExceptionReply(request.getRequestId(), dE));
349     }
350   }
351
352   /**
353    * Forwards the messages sent by the client in a
354    * <code>ProducerMessages</code> request as a <code>ClientMessages</code>
355    * MOM request directly to a destination, and acknowledges them by sending
356    * a <code>ServerReply</code> back.
357    */

358   private void reactToClientRequest(int key, ProducerMessages req) {
359     if (logger.isLoggable(BasicLevel.DEBUG))
360       logger.log(BasicLevel.DEBUG,
361                  "ProxyImpl.reactToClientRequest(" + key + ',' + req + ')');
362
363     AgentId destId = AgentId.fromString(req.getTarget());
364     ClientMessages not = new ClientMessages(
365       key,
366       req.getRequestId(),
367       req.getMessages());
368
369     setDmq(not);
370     
371     
372     if (destId.getTo() == proxyAgent.getId().getTo()) {
373       if (logger.isLoggable(BasicLevel.DEBUG))
374         logger.log(BasicLevel.DEBUG, " -> local sending");
375       not.setPersistent(false);
376       if (req.getAsyncSend()) {
377         not.setAsyncSend(true);
378       }
379     } else {
380       if (logger.isLoggable(BasicLevel.DEBUG))
381         logger.log(BasicLevel.DEBUG, " -> remote sending");
382       if (!req.getAsyncSend()) {
383         proxyAgent.sendNot(proxyAgent.getId(),
384                            new SendReplyNot(key, req.getRequestId()));
385       }
386     }
387     
388     proxyAgent.sendNot(destId, not);
389   }
390   
391   private void setDmq(ClientMessages not) {
392     // Setting the producer's DMQ identifier field:
393
if (dmqId != null) {
394       not.setDMQId(dmqId);
395     } else {
396       not.setDMQId(DeadMQueueImpl.getId());
397     }
398   }
399
400   /**
401    * Either forwards the <code>ConsumerReceiveRequest</code> request as a
402    * <code>ReceiveRequest</code> directly to the target queue, or wraps it
403    * and sends it to the proxy if destinated to a subscription.
404    */

405   private void reactToClientRequest(int key, ConsumerReceiveRequest req)
406   {
407     if (req.getQueueMode()) {
408       ReceiveRequest not = new ReceiveRequest(
409         key,
410         req.getRequestId(),
411         req.getSelector(),
412         req.getTimeToLive(),
413         req.getReceiveAck(),
414         null,
415         1);
416       AgentId to = AgentId.fromString(req.getTarget());
417       if (to.getTo() == proxyAgent.getId().getTo()) {
418         if (logger.isLoggable(BasicLevel.DEBUG))
419           logger.log(BasicLevel.DEBUG, " -> local receiving");
420         not.setPersistent(false);
421         proxyAgent.sendNot(to, not);
422       } else {
423         proxyAgent.sendNot(to, not);
424       }
425     } else {
426       doReact(key, req);
427     }
428   }
429
430   /**
431    * Either forwards the <code>ConsumerSetListRequest</code> request as a
432    * <code>ReceiveRequest</code> directly to the target queue, or wraps it
433    * and sends it to the proxy if destinated to a subscription.
434    */

435   private void reactToClientRequest(int key, ConsumerSetListRequest req) {
436     if (logger.isLoggable(BasicLevel.DEBUG))
437       logger.log(BasicLevel.DEBUG,
438           "ProxyImp.reactToClientRequest(" + key + ',' + req + ')');
439     if (req.getQueueMode()) {
440       ReceiveRequest not = new ReceiveRequest(key,
441                                               req.getRequestId(),
442                                               req.getSelector(),
443                                               0,
444                                               false,
445                                               req.getMessageIdsToAck(),
446                                               req.getMessageCount());
447       AgentId to = AgentId.fromString(req.getTarget());
448       if (to.getTo() == proxyAgent.getId().getTo()) {
449         if (logger.isLoggable(BasicLevel.DEBUG))
450           logger.log(BasicLevel.DEBUG, " -> local sending");
451         not.setPersistent(false);
452         proxyAgent.sendNot(to, not);
453       } else {
454         proxyAgent.sendNot(to, not);
455       }
456     }
457     else {
458       doReact(key, req);
459     }
460   }
461
462   /**
463    * Forwards the client's <code>QBrowseRequest</code> request as
464    * a <code>BrowseRequest</code> MOM request directly to a destination.
465    */

466   private void reactToClientRequest(int key, QBrowseRequest req)
467   {
468     proxyAgent.sendNot(AgentId.fromString(req.getTarget()),
469                        new BrowseRequest(key,
470                                          req.getRequestId(),
471                                          req.getSelector()));
472   }
473   
474   private void reactToClientRequest(int key, JmsRequestGroup request) {
475     AbstractJmsRequest[] requests = request.getRequests();
476     RequestBuffer rm = new RequestBuffer(proxyAgent);
477     for (int i = 0; i < requests.length; i++) {
478       if (requests[i] instanceof ProducerMessages) {
479         ProducerMessages pm =(ProducerMessages) requests[i];
480         rm.put(key, pm);
481       } else {
482         reactToClientRequest(key, requests[i]);
483       }
484     }
485     
486     rm.flush();
487   }
488
489   /**
490    * Distributes the received notifications to the appropriate reactions.
491    * <p>
492    * A JMS proxy reacts to:
493    * <ul>
494    * <li><code>SyncReply</code> proxy synchronizing notification,</li>
495    * <li><code>SetDMQRequest</code> admin notification,</li>
496    * <li><code>SetThreshRequest</code> admin notification,</li>
497    * <li><code>SetNbMaxMsgRequest</code> admin notification,</li>
498    * <li><code>Monit_GetNbMaxMsg</code> admin notification,</li>
499    * <li><code>Monit_GetDMQSettings</code> monitoring notification,</li>
500    * <li><code>AbstractReply</code> destination replies,</li>
501    * <li><code>AdminReply</code> administration replies,</li>
502    * <li><code>fr.dyade.aaa.agent.UnknownAgent</code>.</li>
503    * </ul>
504    *
505    * @exception UnknownNotificationException
506    * If the notification is not expected.
507    */

508   public void react(AgentId from, Notification not)
509               throws UnknownNotificationException
510   {
511     // Administration and monitoring requests:
512
if (not instanceof SetDMQRequest)
513       doReact(from, (SetDMQRequest) not);
514     else if (not instanceof SetThreshRequest)
515       doReact(from, (SetThreshRequest) not);
516     else if (not instanceof SetNbMaxMsgRequest)
517       doReact(from, (SetNbMaxMsgRequest) not);
518     else if (not instanceof Monit_GetNbMaxMsg)
519       doReact(from, (Monit_GetNbMaxMsg) not);
520     else if (not instanceof Monit_GetDMQSettings)
521       doReact(from, (Monit_GetDMQSettings) not);
522     // Synchronization notification:
523
else if (not instanceof SyncReply)
524       doReact((SyncReply) not);
525     // Notifications sent by a destination:
526
else if (not instanceof AbstractReply)
527       doFwd(from, (AbstractReply) not);
528     else if (not instanceof AdminReply)
529       doReact((AdminReply) not);
530     // Platform notifications:
531
else if (not instanceof UnknownAgent)
532       doReact((UnknownAgent) not);
533     else if (not instanceof UserAdminRequestNot)
534       doReact((UserAdminRequestNot) not);
535     else
536       throw new UnknownNotificationException("Unexpected notification: "
537                                              + not.getClass().getName());
538   }
539
540   
541   /**
542    * Distributes the client requests to the appropriate reactions.
543    * <p>
544    * The proxy accepts the following requests:
545    * <ul>
546    * <li><code>GetAdminTopicRequest</code></li>
547    * <li><code>CnxConnectRequest</code></li>
548    * <li><code>CnxStartRequest</code></li>
549    * <li><code>CnxStopRequest</code></li>
550    * <li><code>SessCreateTQRequest</code></li>
551    * <li><code>SessCreateTTRequest</code></li>
552    * <li><code>ConsumerSubRequest</code></li>
553    * <li><code>ConsumerUnsubRequest</code></li>
554    * <li><code>ConsumerCloseSubRequest</code></li>
555    * <li><code>ConsumerSetListRequest</code></li>
556    * <li><code>ConsumerUnsetListRequest</code></li>
557    * <li><code>ConsumerReceiveRequest</code></li>
558    * <li><code>ConsumerAckRequest</code></li>
559    * <li><code>ConsumerDenyRequest</code></li>
560    * <li><code>SessAckRequest</code></li>
561    * <li><code>SessDenyRequest</code></li>
562    * <li><code>TempDestDeleteRequest</code></li>
563    * <li><code>XACnxPrepare</code></li>
564    * <li><code>XACnxCommit</code></li>
565    * <li><code>XACnxRollback</code></li>
566    * <li><code>XACnxRecoverRequest</code></li>
567    * </ul>
568    * <p>
569    * A <code>JmsExceptReply</code> is sent back to the client when an
570    * exception is thrown by the reaction.
571    */

572   private void doReact(int key, AbstractJmsRequest request)
573   {
574     try {
575       // Updating the active context if the request is not a new context
576
// request!
577
if (! (request instanceof CnxConnectRequest))
578         setCtx(key);
579
580       if (request instanceof GetAdminTopicRequest)
581         doReact(key, (GetAdminTopicRequest) request);
582       else if (request instanceof CnxConnectRequest)
583         doReact(key, (CnxConnectRequest) request);
584       else if (request instanceof CnxStartRequest)
585         doReact((CnxStartRequest) request);
586       else if (request instanceof CnxStopRequest)
587         doReact((CnxStopRequest) request);
588       else if (request instanceof SessCreateTQRequest)
589         doReact((SessCreateTQRequest) request);
590       else if (request instanceof SessCreateTTRequest)
591         doReact((SessCreateTTRequest) request);
592       else if (request instanceof ConsumerSubRequest)
593         doReact((ConsumerSubRequest) request);
594       else if (request instanceof ConsumerUnsubRequest)
595         doReact((ConsumerUnsubRequest) request);
596       else if (request instanceof ConsumerCloseSubRequest)
597         doReact((ConsumerCloseSubRequest) request);
598       else if (request instanceof ConsumerSetListRequest)
599         doReact((ConsumerSetListRequest) request);
600       else if (request instanceof ConsumerUnsetListRequest)
601         doReact((ConsumerUnsetListRequest) request);
602       else if (request instanceof ConsumerReceiveRequest)
603         doReact((ConsumerReceiveRequest) request);
604       else if (request instanceof ConsumerAckRequest)
605         doReact((ConsumerAckRequest) request);
606       else if (request instanceof ConsumerDenyRequest)
607         doReact((ConsumerDenyRequest) request);
608       else if (request instanceof SessAckRequest)
609         doReact((SessAckRequest) request);
610       else if (request instanceof SessDenyRequest)
611         doReact((SessDenyRequest) request);
612       else if (request instanceof TempDestDeleteRequest)
613         doReact((TempDestDeleteRequest) request);
614       else if (request instanceof XACnxPrepare)
615         doReact((XACnxPrepare) request);
616       else if (request instanceof XACnxCommit)
617         doReact((XACnxCommit) request);
618       else if (request instanceof XACnxRollback)
619         doReact((XACnxRollback) request);
620       else if (request instanceof XACnxRecoverRequest)
621         doReact((XACnxRecoverRequest) request);
622       else if (request instanceof CnxCloseRequest)
623         doReact(key, (CnxCloseRequest) request);
624       else if (request instanceof ActivateConsumerRequest)
625         doReact(key, (ActivateConsumerRequest) request);
626       else if (request instanceof CommitRequest)
627         doReact(key, (CommitRequest)request);
628     }
629     catch (MomException mE) {
630       if (logger.isLoggable(BasicLevel.ERROR))
631         logger.log(BasicLevel.ERROR, mE);
632
633       // Sending the exception to the client:
634
doReply(new MomExceptionReply(request.getRequestId(), mE));
635     }
636   }
637
638   /**
639    * Method implementing the reaction to a <code>GetAdminTopicRequest</code>
640    * requesting the identifier of the local admin topic.
641    * <p>
642    * It simply sends back a <code>GetAdminTopicReply</code> holding the
643    * admin topic identifier.
644    *
645    * @exception AccessException If the requester is not an administrator.
646    */

647   private void doReact(int key, GetAdminTopicRequest req)
648                throws AccessException
649   {
650 // if (! admin)
651
// throw new AccessException("Request forbidden to a non administrator.");
652

653     doReply(
654       key,
655       new GetAdminTopicReply(
656         req,
657         AdminTopicImpl.getReference().getId().toString()));
658   }
659
660   /**
661    * Method implementing the reaction to a <code>CnxConnectRequest</code>
662    * requesting the key of the active context.
663    * <p>
664    * It simply sends back a <code>ConnectReply</code> holding the active
665    * context's key.
666    *
667    * @exception DestinationException In case of a first administrator
668    * context, if the local administration topic reference
669    * is not available.
670    */

671   private void doReact(int key, CnxConnectRequest req)
672     throws DestinationException {
673     // state change, so save.
674
proxyAgent.setSave();
675
676     setActiveCtxId(key);
677     activeCtx = new ClientContext(proxyAgent.getId(), key);
678     activeCtx.setProxyAgent(proxyAgent);
679     contexts.put(new Integer JavaDoc(key), activeCtx);
680     
681     if (logger.isLoggable(BasicLevel.DEBUG))
682       logger.log(BasicLevel.DEBUG, "Connection " + key + " opened.");
683
684     doReply(new CnxConnectReply(req, key, proxyAgent.getId().toString()));
685   }
686
687   /**
688    * Method implementing the proxy reaction to a <code>CnxStartRequest</code>
689    * requesting to start a context.
690    * <p>
691    * This method sends the pending <code>ConsumerMessages</code> replies,
692    * if any.
693    */

694   private void doReact(CnxStartRequest req) {
695     activeCtx.setActivated(true);
696
697     // Delivering the pending deliveries, if any:
698
for (Enumeration JavaDoc deliveries = activeCtx.getPendingDeliveries();
699          deliveries.hasMoreElements();)
700       doReply((AbstractJmsReply) deliveries.nextElement());
701
702     // Clearing the pending deliveries.
703
activeCtx.clearPendingDeliveries();
704   }
705
706   /**
707    * Method implementing the JMS proxy reaction to a
708    * <code>CnxStopRequest</code> requesting to stop a context.
709    * <p>
710    * This method sends a <code>ServerReply</code> back.
711    */

712   private void doReact(CnxStopRequest req) {
713     activeCtx.setActivated(false);
714     doReply(new ServerReply(req));
715   }
716
717   /**
718    * Method implementing the JMS proxy reaction to a
719    * <code>SessCreateTQRequest</code> requesting the creation of a temporary
720    * queue.
721    * <p>
722    * Creates the queue, sends it a <code>SetRightRequest</code> for granting
723    * WRITE access to all, and wraps a <code>SessCreateTDReply</code> in a
724    * <code>SyncReply</code> notification it sends to itself. This latest
725    * action's purpose is to preserve causality.
726    *
727    * @exception RequestException If the queue could not be deployed.
728    */

729   private void doReact(SessCreateTQRequest req) throws RequestException {
730     try {
731       Queue queue = new Queue();
732       queue.init(proxyAgent.getId(), null);
733       AgentId qId = queue.getId();
734
735       queue.deploy();
736
737       // Setting free WRITE right on the queue:
738
proxyAgent.sendNot(qId, new SetRightRequest(null, null, 2));
739
740       activeCtx.addTemporaryDestination(qId);
741
742       SessCreateTDReply reply = new SessCreateTDReply(req, qId.toString());
743       proxyAgent.sendNot(proxyAgent.getId(),
744                          new SyncReply(activeCtxId, reply));
745
746       proxyAgent.sendNot(AdminTopic.getDefault(),
747                          new RegisterTmpDestNot(qId, false, true));
748
749       if (logger.isLoggable(BasicLevel.DEBUG))
750         logger.log(BasicLevel.DEBUG, "Temporary queue " + qId + " created.");
751     }
752     catch (java.io.IOException JavaDoc iE) {
753       throw new RequestException("Could not create temporary queue: " + iE);
754     }
755   }
756
757   /**
758    * Method implementing the JMS proxy reaction to a
759    * <code>SessCreateTTRequest</code> requesting the creation of a temporary
760    * topic.
761    * <p>
762    * Creates the topic, sends it a <code>SetRightRequest</code> for granting
763    * WRITE access to all, and wraps a <code>SessCreateTDReply</code> in a
764    * <code>SyncReply</code> notification it sends to itself. This latest
765    * action's purpose is to preserve causality.
766    *
767    * @exception RequestException If the topic could not be deployed.
768    */

769   private void doReact(SessCreateTTRequest req) throws RequestException {
770     Topic topic = new Topic();
771     topic.init(proxyAgent.getId(), null);
772     AgentId tId = topic.getId();
773
774     try {
775       topic.deploy();
776
777       // Setting free WRITE right on the topic:
778
proxyAgent.sendNot(tId, new SetRightRequest(null, null, 2));
779
780       activeCtx.addTemporaryDestination(tId);
781
782       SessCreateTDReply reply = new SessCreateTDReply(req, tId.toString());
783       proxyAgent.sendNot(proxyAgent.getId(),
784                          new SyncReply(activeCtxId, reply));
785
786       proxyAgent.sendNot(AdminTopic.getDefault(),
787                          new RegisterTmpDestNot(tId, true, true));
788
789       if (logger.isLoggable(BasicLevel.DEBUG))
790         logger.log(BasicLevel.DEBUG, "Temporary topic" + tId + " created.");
791     } catch (java.io.IOException JavaDoc iE) {
792       topic = null;
793       throw new RequestException("Could not deploy temporary topic "
794                                  + tId + ": " + iE);
795     }
796   }
797
798   /**
799    * Method implementing the JMS proxy reaction to a
800    * <code>ConsumerSubRequest</code> requesting to subscribe to a topic.
801    *
802    * @exception StateException If activating an already active durable
803    * subscription.
804    */

805   private void doReact(ConsumerSubRequest req) throws StateException {
806     AgentId topicId = AgentId.fromString(req.getTarget());
807     String JavaDoc subName = req.getSubName();
808     
809     boolean newTopic = ! topicsTable.containsKey(topicId);
810     boolean newSub = ! subsTable.containsKey(subName);
811
812     TopicSubscription tSub;
813     ClientSubscription cSub;
814
815     // true if a SubscribeRequest has been sent to the topic.
816
boolean sent = false;
817
818     if (newTopic) { // New topic...
819
tSub = new TopicSubscription();
820       topicsTable.put(topicId, tSub);
821     } else { // Known topic...
822
tSub = (TopicSubscription) topicsTable.get(topicId);
823     }
824
825     if (newSub) { // New subscription...
826
// state change, so save.
827
proxyAgent.setSave();
828       cSub = new ClientSubscription(proxyAgent.getId(),
829                                     activeCtxId,
830                                     req.getRequestId(),
831                                     req.getDurable(),
832                                     topicId,
833                                     req.getSubName(),
834                                     req.getSelector(),
835                                     req.getNoLocal(),
836                                     dmqId,
837                                     threshold,
838                                     messagesTable);
839       cSub.setProxyAgent(proxyAgent);
840      
841       if (logger.isLoggable(BasicLevel.DEBUG))
842         logger.log(BasicLevel.DEBUG, "Subscription " + subName + " created.");
843
844       subsTable.put(subName, cSub);
845       tSub.putSubscription(subName, req.getSelector());
846       sent =
847         updateSubscriptionToTopic(topicId, activeCtxId, req.getRequestId());
848     } else { // Existing durable subscription...
849
cSub = (ClientSubscription) subsTable.get(subName);
850
851       if (cSub.getActive())
852         throw new StateException("The durable subscription " + subName +
853                                  " has already been activated.");
854
855       // Updated topic: updating the subscription to the previous topic.
856
boolean updatedTopic = ! topicId.equals(cSub.getTopicId());
857       if (updatedTopic) {
858         TopicSubscription oldTSub =
859           (TopicSubscription) topicsTable.get(cSub.getTopicId());
860         oldTSub.removeSubscription(subName);
861         updateSubscriptionToTopic(cSub.getTopicId(), -1, -1);
862       }
863
864       // Updated selector?
865
boolean updatedSelector;
866       if (req.getSelector() == null && cSub.getSelector() != null)
867         updatedSelector = true;
868       else if (req.getSelector() != null && cSub.getSelector() == null)
869         updatedSelector = true;
870       else if (req.getSelector() == null && cSub.getSelector() == null)
871         updatedSelector = false;
872       else
873         updatedSelector = ! req.getSelector().equals(cSub.getSelector());
874
875       // Reactivating the subscription.
876
cSub.reactivate(activeCtxId,
877                       req.getRequestId(),
878                       topicId,
879                       req.getSelector(),
880                       req.getNoLocal());
881
882       if (logger.isLoggable(BasicLevel.DEBUG))
883         logger.log(BasicLevel.DEBUG,
884                    "Subscription " + subName + " reactivated.");
885
886       // Updated subscription: updating subscription to topic.
887
if (updatedTopic || updatedSelector) {
888         tSub.putSubscription(subName, req.getSelector());
889         sent = updateSubscriptionToTopic(topicId,
890                                          activeCtxId,
891                                          req.getRequestId());
892       }
893     }
894     // Activating the subscription.
895
activeCtx.addSubName(subName);
896
897     // Acknowledging the request, if needed.
898
if (! sent)
899       proxyAgent.sendNot(proxyAgent.getId(),
900                          new SyncReply(activeCtxId, new ServerReply(req)));
901   }
902
903   /**
904    * Method implementing the JMS proxy reaction to a
905    * <code>ConsumerSetListRequest</code> notifying the creation of a client
906    * listener.
907    * <p>
908    * Sets the listener for the subscription, launches a delivery sequence.
909    *
910    * @exception DestinationException If the subscription does not exist.
911    */

912   private void doReact(ConsumerSetListRequest req) throws DestinationException
913   {
914     // Getting the subscription:
915
String JavaDoc subName = req.getTarget();
916     ClientSubscription sub = null;
917     if (subName != null)
918       sub = (ClientSubscription) subsTable.get(subName);
919
920     if (sub == null)
921       throw new DestinationException("Can't set a listener on the non existing subscription: " + subName);
922