KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > client > jms > Connection


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - 2000 Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (INRIA)
22  * Contributor(s): ScalAgent Distributed Technologies
23  */

24 package org.objectweb.joram.client.jms;
25
26 import java.util.Vector JavaDoc;
27
28 import javax.jms.IllegalStateException JavaDoc;
29 import javax.jms.InvalidDestinationException JavaDoc;
30 import javax.jms.InvalidSelectorException JavaDoc;
31 import javax.jms.JMSException JavaDoc;
32 import javax.jms.JMSSecurityException JavaDoc;
33
34 import org.objectweb.joram.client.jms.connection.RequestChannel;
35 import org.objectweb.joram.client.jms.connection.RequestMultiplexer;
36 import org.objectweb.joram.client.jms.connection.Requestor;
37 import org.objectweb.joram.shared.client.AbstractJmsReply;
38 import org.objectweb.joram.shared.client.AbstractJmsRequest;
39 import org.objectweb.joram.shared.client.CnxCloseRequest;
40 import org.objectweb.joram.shared.client.CnxConnectReply;
41 import org.objectweb.joram.shared.client.CnxConnectRequest;
42 import org.objectweb.joram.shared.client.CnxStartRequest;
43 import org.objectweb.joram.shared.client.CnxStopRequest;
44 import org.objectweb.joram.shared.client.ConsumerSubRequest;
45
46 import org.objectweb.util.monolog.api.BasicLevel;
47 import org.objectweb.util.monolog.api.Logger;
48 import fr.dyade.aaa.util.Debug;
49
50 /**
51  * Implements the <code>javax.jms.Connection</code> interface.
52  */

53 public class Connection implements javax.jms.Connection JavaDoc {
54   public static Logger logger = Debug.getLogger(Connection.class.getName());
55   
56   /**
57    * Status of the connection.
58    */

59   private static class Status {
60     /**
61      * Status of the connection when it is stopped.
62      * This is the initial status.
63      */

64     public static final int STOP = 0;
65
66     /**
67      * Status of the connection when it is started.
68      */

69     public static final int START = 1;
70
71     /**
72      * Status of the conenction when it is closed.
73      */

74     public static final int CLOSE = 2;
75
76     private static final String JavaDoc[] names = {
77       "STOP", "START", "CLOSE"};
78
79     public static String JavaDoc toString(int status) {
80       return names[status];
81     }
82   }
83
84   /**
85    * The request multiplexer used to communicate
86    * with the user proxy.
87    */

88   private RequestMultiplexer mtpx;
89
90   /**
91    * The requestor used to communicate
92    * with the user proxy.
93    */

94   private Requestor requestor;
95
96   /** Connection meta data. */
97   private ConnectionMetaData metaData = null;
98
99   /** Sessions counter. */
100   private int sessionsC = 0;
101
102   /** Messages counter. */
103   private int messagesC = 0;
104
105   /** Subscriptions counter. */
106   private int subsC = 0;
107
108   /** Client's agent proxy identifier. */
109   String JavaDoc proxyId;
110
111   /** Connection key. */
112   private int key;
113
114   /** The factory's parameters. */
115   private FactoryParameters factoryParameters;
116
117   /**
118    * Status of the connection.
119    * STOP, START, CLOSE
120    */

121   private int status;
122
123   /** Vector of the connection's sessions. */
124   private Vector JavaDoc sessions;
125
126   /** Vector of the connection's consumers. */
127   private Vector JavaDoc cconsumers;
128
129   /**
130    * Used to synchronize the method close()
131    */

132   private Closer closer;
133
134   private String JavaDoc stringImage = null;
135   private int hashCode;
136
137   /**
138    * Creates a <code>Connection</code> instance.
139    *
140    * @param factoryParameters The factory parameters.
141    * @param connectionImpl The actual connection to wrap.
142    *
143    * @exception JMSSecurityException If the user identification is incorrect.
144    * @exception IllegalStateException If the server is not listening.
145    */

146   public Connection(FactoryParameters factoryParameters,
147                     RequestChannel requestChannel)
148     throws JMSException JavaDoc {
149     if (logger.isLoggable(BasicLevel.DEBUG))
150       logger.log(BasicLevel.DEBUG,
151                  "Connection.<init>(" + factoryParameters +
152                  ',' + requestChannel + ')');
153     this.factoryParameters = factoryParameters;
154     mtpx = new RequestMultiplexer(this,
155                                   requestChannel,
156                                   factoryParameters.cnxPendingTimer);
157     if (factoryParameters.multiThreadSync) {
158       mtpx.setMultiThreadSync(factoryParameters.multiThreadSyncDelay,
159                               factoryParameters.multiThreadSyncThreshold);
160     }
161     
162     requestor = new Requestor(mtpx);
163     sessions = new Vector JavaDoc();
164     cconsumers = new Vector JavaDoc();
165     
166     closer = new Closer();
167     
168     setStatus(Status.STOP);
169
170     // Requesting the connection key and proxy identifier:
171
CnxConnectRequest req = new CnxConnectRequest();
172     CnxConnectReply rep = (CnxConnectReply) requestor.request(req);
173     proxyId = rep.getProxyId();
174     key = rep.getCnxKey();
175     
176     stringImage = "Cnx:" + proxyId + ':' + key;
177     hashCode = stringImage.hashCode();
178
179     mtpx.setDemultiplexerDaemonName(toString());
180   }
181
182   private final String JavaDoc newTrace(String JavaDoc trace) {
183     return "Connection[" + proxyId + ':' + key + ']' + trace;
184   }
185
186   private void setStatus(int status) {
187     if (logger.isLoggable(BasicLevel.DEBUG))
188       logger.log(BasicLevel.DEBUG,
189                  newTrace(".setStatus(" + Status.toString(status) + ')'));
190     this.status = status;
191   }
192
193   boolean isStopped() {
194     return (status == Status.STOP);
195   }
196
197   /** String image of the connection. */
198   public String JavaDoc toString() {
199     return stringImage;
200   }
201
202   public int hashCode() {
203     return hashCode;
204   }
205   
206   /**
207    * Specializes this Object method; returns <code>true</code> if the
208    * parameter is a <code>Connection</code> instance sharing the same
209    * proxy identifier and connection key.
210    */

211   public boolean equals(Object JavaDoc obj) {
212     return (obj instanceof Connection) && (hashCode() == obj.hashCode()) && toString().equals(obj.toString());
213   }
214
215   final long getTxPendingTimer() {
216     return factoryParameters.txPendingTimer;
217   }
218   
219   final boolean getAsyncSend() {
220     return factoryParameters.asyncSend;
221   }
222   
223   final int getQueueMessageReadMax() {
224     return factoryParameters.queueMessageReadMax;
225   }
226   
227   final int getTopicAckBufferMax() {
228     return factoryParameters.topicAckBufferMax;
229   }
230   
231   final int getTopicActivationThreshold() {
232     return factoryParameters.topicActivationThreshold;
233   }
234
235   final int getTopicPassivationThreshold() {
236     return factoryParameters.topicPassivationThreshold;
237   }
238   
239   /**
240    * Checks if the connecion is closed. If true
241    * raises an IllegalStateException.
242    */

243   final protected synchronized void checkClosed() throws IllegalStateException JavaDoc {
244     if (status == Status.CLOSE || mtpx.isClosed())
245       throw new IllegalStateException JavaDoc("Forbidden call on a closed connection.");
246   }
247
248   /**
249    * API method.
250    *
251    * @exception IllegalStateException If the connection is closed.
252    * @exception InvalidSelectorException If the selector syntax is wrong.
253    * @exception InvalidDestinationException If the target destination does
254    * not exist.
255    * @exception JMSException If the method fails for any other reason.
256    */

257   public synchronized javax.jms.ConnectionConsumer JavaDoc
258       createConnectionConsumer(
259         javax.jms.Destination JavaDoc dest,
260         String JavaDoc selector,
261         javax.jms.ServerSessionPool JavaDoc sessionPool,
262         int maxMessages) throws JMSException JavaDoc {
263     if (logger.isLoggable(BasicLevel.DEBUG))
264       logger.log(BasicLevel.DEBUG,
265                  newTrace(".createConnectionConsumer(" + dest +
266                           ',' +selector + ',' + sessionPool +
267                           ',' + maxMessages + ')'));
268     checkClosed();
269     return createConnectionConsumer(dest, null, selector, sessionPool, maxMessages);
270   }
271
272   /**
273    * API method.
274    *
275    * @exception IllegalStateException If the connection is closed.
276    * @exception InvalidSelectorException If the selector syntax is wrong.
277    * @exception InvalidDestinationException If the target topic does
278    * not exist.
279    * @exception JMSException If the method fails for any other reason.
280    */

281   public javax.jms.ConnectionConsumer JavaDoc
282       createDurableConnectionConsumer(javax.jms.Topic JavaDoc topic,
283                                       String JavaDoc subName,
284                                       String JavaDoc selector,
285                                       javax.jms.ServerSessionPool JavaDoc sessPool,
286                                       int maxMessages) throws JMSException JavaDoc {
287     if (logger.isLoggable(BasicLevel.DEBUG))
288       logger.log(BasicLevel.DEBUG,
289                  newTrace(".createDurableConnectionConsumer(" +
290                           topic + ',' + subName + ',' + selector + ',' +
291                           sessPool + ',' + maxMessages + ')'));
292     checkClosed();
293     if (subName == null)
294       throw new JMSException JavaDoc("Invalid subscription name: " + subName);
295     return createConnectionConsumer((Destination) topic, subName, selector, sessPool, maxMessages);
296   }
297   
298   private synchronized javax.jms.ConnectionConsumer JavaDoc
299     createConnectionConsumer(
300         javax.jms.Destination JavaDoc dest,
301         String JavaDoc subName,
302         String JavaDoc selector,
303         javax.jms.ServerSessionPool JavaDoc sessionPool,
304         int maxMessages) throws JMSException JavaDoc {
305     checkClosed();
306     
307     try {
308       org.objectweb.joram.shared.selectors.Selector.checks(selector);
309     } catch (org.objectweb.joram.shared.excepts.SelectorException sE) {
310       throw new InvalidSelectorException JavaDoc("Invalid selector syntax: " + sE);
311     }
312
313     if (sessionPool == null)
314       throw new JMSException JavaDoc("Invalid ServerSessionPool parameter: "
315                              + sessionPool);
316     if (maxMessages <= 0)
317       throw new JMSException JavaDoc("Invalid maxMessages parameter: " + maxMessages);
318     
319     boolean queueMode;
320     String JavaDoc targetName;
321     boolean durable;
322     
323     if (dest instanceof javax.jms.Queue JavaDoc) {
324       queueMode = true;
325       targetName = ((Destination) dest).getName();
326       durable = false;
327     } else {
328       queueMode = false;
329       if (subName == null) {
330         targetName = nextSubName();
331         durable = false;
332       } else {
333         targetName = subName;
334         durable = true;
335       }
336       requestor.request(new ConsumerSubRequest(((Destination) dest).getName(),
337           targetName, selector, false, durable));
338     }
339     
340     MultiSessionConsumer msc =
341       new MultiSessionConsumer(
342           queueMode,
343           durable,
344           selector,
345           targetName,
346           sessionPool,
347           factoryParameters.queueMessageReadMax,
348           factoryParameters.topicActivationThreshold,
349           factoryParameters.topicPassivationThreshold,
350           factoryParameters.topicAckBufferMax,
351           mtpx,
352           this,
353           maxMessages);
354     
355     msc.start();
356     
357     cconsumers.addElement(msc);
358     
359     return msc;
360   }
361
362   /**
363    * API method.
364    *
365    * @exception IllegalStateException If the connection is closed.
366    * @exception JMSException In case of an invalid acknowledge mode.
367    */

368   public synchronized javax.jms.Session JavaDoc
369       createSession(boolean transacted,
370                     int acknowledgeMode)
371     throws JMSException JavaDoc {
372     if (logger.isLoggable(BasicLevel.DEBUG))
373       logger.log(BasicLevel.DEBUG,
374                  newTrace(".createSession(" + transacted + ',' + acknowledgeMode + ')'));
375     checkClosed();
376     Session session = new Session(
377       this,
378       transacted,
379       acknowledgeMode,
380       mtpx);
381     addSession(session);
382     return session;
383   }
384
385   /**
386    * Called here and by sub-classes.
387    */

388   protected synchronized void addSession(Session session) {
389     sessions.addElement(session);
390     if (status == Status.START) {
391       session.start();
392     }
393   }
394
395   /**
396    * API method.
397    *
398    * @exception IllegalStateException If the connection is closed.
399    */

400   public synchronized void
401       setExceptionListener(javax.jms.ExceptionListener JavaDoc listener) throws JMSException JavaDoc {
402     checkClosed();
403     mtpx.setExceptionListener(listener);
404   }
405
406   /**
407    * API method.
408    *
409    * @exception IllegalStateException If the connection is closed.
410    */

411   public javax.jms.ExceptionListener JavaDoc getExceptionListener() throws JMSException JavaDoc {
412     checkClosed();
413     return mtpx.getExceptionListener();
414   }
415
416   /**
417    * API method.
418    *
419    * @exception IllegalStateException Systematically thrown.
420    */

421   public void setClientID(String JavaDoc clientID) throws JMSException JavaDoc {
422     throw new IllegalStateException JavaDoc("ClientID is already set by the"
423                                     + " provider.");
424   }
425
426   /**
427    * API method.
428    *
429    * @exception IllegalStateException If the connection is closed.
430    */

431   public String JavaDoc getClientID() throws JMSException JavaDoc {
432     checkClosed();
433     return proxyId;
434   }
435
436   /**
437    * API method.
438    *
439    * @exception IllegalStateException If the connection is closed.
440    */

441   public javax.jms.ConnectionMetaData JavaDoc getMetaData() throws JMSException JavaDoc {
442     checkClosed();
443     if (metaData == null)
444       metaData = new ConnectionMetaData();
445     return metaData;
446   }
447
448   /**
449    * API method for starting the connection.
450    *
451    * @exception IllegalStateException If the connection is closed or broken.
452    */

453   public synchronized void start() throws JMSException JavaDoc {
454     if (logger.isLoggable(BasicLevel.DEBUG))
455       logger.log(
456         BasicLevel.DEBUG,
457         newTrace(".start()"));
458     checkClosed();
459     
460     // Ignoring the call if the connection is started:
461
if (status == Status.START)
462       return;
463
464     if (logger.isLoggable(BasicLevel.DEBUG))
465       logger.log(BasicLevel.DEBUG, "--- " + this
466                                  + ": starting...");
467
468     // Starting the sessions:
469

470     for (int i = 0; i < sessions.size(); i++) {
471       Session session = (Session) sessions.elementAt(i);
472       session.start();
473     }
474
475     // Sending a start request to the server:
476
mtpx.sendRequest(new CnxStartRequest());
477
478     setStatus(Status.START);
479   }
480
481   /**
482    * API method for stopping the connection; even if the connection appears
483    * to be broken, stops the sessions.
484    *
485    * @exception IllegalStateException If the connection is closed or broken.
486    */

487   public void stop() throws JMSException JavaDoc {
488     if (logger.isLoggable(BasicLevel.DEBUG))
489       logger.log(
490         BasicLevel.DEBUG,
491         newTrace(".stop()"));
492     checkClosed();
493
494     synchronized (this) {
495       if (status == Status.STOP)
496         return;
497     }
498
499     // At this point, the server won't deliver messages anymore,
500
// the connection just waits for the sessions to have finished their
501
// processings.
502
// Must go out of the synchronized block in order to enable
503
// the message listeners to use the connection.
504
// As a csq, the connection stop is reentrant. Several
505
// threads can enter this method during the stopping stage.
506
for (int i = 0; i < sessions.size(); i++) {
507       Session session = (Session) sessions.get(i);
508       session.stop();
509     }
510
511     synchronized (this) {
512       if (status == Status.STOP)
513         return;
514
515       // Sending a synchronous "stop" request to the server:
516
requestor.request(new CnxStopRequest());
517
518       // Set the status as STOP as the following operations
519
// (Session.stop) can't fail.
520
setStatus(Status.STOP);
521     }
522   }
523
524   /**
525    * API method for closing the connection; even if the connection appears
526    * to be broken, closes the sessions.
527    *
528    * @exception JMSException Actually never thrown.
529    */

530   public void close() throws JMSException JavaDoc {
531     if (logger.isLoggable(BasicLevel.DEBUG))
532       logger.log(
533         BasicLevel.DEBUG,
534         newTrace(".close()"));
535
536     closer.close();
537   }
538
539   /**
540    * This class synchronizes the close.
541    * Close can't be synchronized with 'this'
542    * because the connection must be accessed
543    * concurrently during its closure. So
544    * we need a second lock.
545    */

546   class Closer {
547     synchronized void close() {
548       doClose();
549     }
550   }
551
552   void doClose() {
553     synchronized (this) {
554       if (status == Status.CLOSE) {
555         return;
556       }
557     }
558       
559     Vector JavaDoc sessionsToClose = (Vector JavaDoc)sessions.clone();
560     sessions.clear();
561     
562     for (int i = 0; i < sessionsToClose.size(); i++) {
563       Session session =
564         (Session) sessionsToClose.elementAt(i);
565       try {
566         session.close();
567       } catch (JMSException JavaDoc exc) {
568         if (logger.isLoggable(BasicLevel.DEBUG))
569           logger.log(
570             BasicLevel.DEBUG, "", exc);
571       }
572     }
573     
574     Vector JavaDoc consumersToClose = (Vector JavaDoc)cconsumers.clone();
575     cconsumers.clear();
576     
577     for (int i = 0; i < consumersToClose.size(); i++) {
578       MultiSessionConsumer consumer =
579         (MultiSessionConsumer) consumersToClose.elementAt(i);
580       try {
581         consumer.close();
582       } catch (JMSException JavaDoc exc) {
583         if (logger.isLoggable(BasicLevel.DEBUG))
584           logger.log(
585             BasicLevel.DEBUG, "", exc);
586       }
587     }
588
589     
590     try {
591       CnxCloseRequest closeReq = new CnxCloseRequest();
592       requestor.request(closeReq);
593     } catch (JMSException JavaDoc exc) {
594       if (logger.isLoggable(BasicLevel.DEBUG))
595         logger.log(
596           BasicLevel.DEBUG, "", exc);
597     }
598     
599     mtpx.close();
600     
601     synchronized (this) {
602       setStatus(Status.CLOSE);
603     }
604   }
605
606
607   /**
608    * Used by OutboundConnection in the connector layer.
609    * When a connection is put back in a pool,
610    * it must be cleaned up.
611    */

612   public void cleanup() {
613     if (logger.isLoggable(BasicLevel.DEBUG))
614       logger.log(
615         BasicLevel.DEBUG, newTrace(".cleanup()"));
616     
617     // Closing the sessions:
618
// Session session;
619
Vector JavaDoc sessionsToClose = (Vector JavaDoc)sessions.clone();
620     sessions.clear();
621
622     for (int i = 0; i < sessionsToClose.size(); i++) {
623       Session session =
624         (Session) sessionsToClose.elementAt(i);
625       try {
626         session.close();
627       } catch (JMSException JavaDoc exc) {
628         if (logger.isLoggable(BasicLevel.DEBUG))
629           logger.log(
630             BasicLevel.DEBUG, "", exc);
631       }
632     }
633     
634     mtpx.cleanup();
635   }
636
637   /** Returns a new session identifier. */
638   synchronized String JavaDoc nextSessionId() {
639     if (sessionsC == Integer.MAX_VALUE)
640       sessionsC = 0;
641     sessionsC++;
642     return "c" + key + "s" + sessionsC;
643   }
644  
645   /** Returns a new message identifier. */
646   synchronized String JavaDoc nextMessageId() {
647     if (messagesC == Integer.MAX_VALUE)
648       messagesC = 0;
649     messagesC++;
650     return "ID:" + proxyId.substring(1) + "c" + key + "m" + messagesC;
651   }
652
653   /** Returns a new subscription name. */
654   synchronized String JavaDoc nextSubName() {
655     if (subsC == Integer.MAX_VALUE)
656       subsC = 0;
657     subsC++;
658     return "c" + key + "sub" + subsC;
659   }
660
661   /**
662    * Called by Session.
663    */

664   synchronized void closeSession(Session session) {
665     if (logger.isLoggable(BasicLevel.DEBUG))
666       logger.log(
667         BasicLevel.DEBUG,
668         newTrace(".closeSession(" + session + ')'));
669     sessions.removeElement(session);
670   }
671
672   /**
673    * Called by MultiSessionConsumer.
674    * Synchronized with run().
675    */

676   synchronized void closeConnectionConsumer(MultiSessionConsumer cc) {
677     if (logger.isLoggable(BasicLevel.DEBUG))
678       logger.log(BasicLevel.DEBUG,
679                  newTrace(".closeConnectionConsumer(" + cc + ')'));
680     cconsumers.removeElement(cc);
681   }
682
683   synchronized AbstractJmsReply syncRequest(
684     AbstractJmsRequest request) throws JMSException JavaDoc {
685     if (logger.isLoggable(BasicLevel.DEBUG))
686       logger.log(BasicLevel.DEBUG,
687                  newTrace(".syncRequest(" + request + ')'));
688     return requestor.request(request);
689   }
690
691   /**
692    * Called by temporary destinations deletion.
693    */

694   synchronized void checkConsumers(String JavaDoc agentId) throws JMSException JavaDoc {
695     for (int i = 0; i < sessions.size(); i++) {
696       Session sess = (Session) sessions.elementAt(i);
697       sess.checkConsumers(agentId);
698     }
699   }
700
701   protected final RequestMultiplexer getRequestMultiplexer() {
702     return mtpx;
703   }
704 }
705
Popular Tags