KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > scalagent > kjoram > Connection


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (INRIA)
22  * Contributor(s): Nicolas Tachker (ScalAgent)
23  */

24 package com.scalagent.kjoram;
25
26 import com.scalagent.kjoram.excepts.IllegalStateException;
27 import com.scalagent.kjoram.excepts.*;
28 import com.scalagent.kjoram.jms.*;
29 import com.scalagent.kjoram.*;
30 import com.scalagent.kjoram.util.StoppedQueueException;
31
32 import java.util.*;
33
34 public class Connection
35 {
36   /** Actual connection linking the client and the JORAM platform. */
37   private ConnectionItf connectionImpl;
38   
39   /** Client's agent proxy identifier. */
40   private String JavaDoc proxyId;
41   /** Connection key. */
42   private int key;
43
44   /** Connection meta data. */
45   private ConnectionMetaData metaData = null;
46   /** The connection's exception listener, if any. */
47   private ExceptionListener excListener = null;
48
49   /** Requests counter. */
50   private int requestsC = 0;
51   /** Sessions counter. */
52   private int sessionsC = 0;
53   /** Messages counter. */
54   private int messagesC = 0;
55   /** Subscriptions counter. */
56   private int subsC = 0;
57
58   /** Timer for closing pending sessions. */
59   private com.scalagent.kjoram.util.Timer sessionsTimer = null;
60
61   /** The factory's parameters. */
62   FactoryParameters factoryParameters;
63
64   /** Driver listening to asynchronous deliveries. */
65   Driver driver;
66
67   /** <code>true</code> if the connection is started. */
68   boolean started = false;
69   /** <code>true</code> if the connection is closing. */
70   boolean closing = false;
71   /** <code>true</code> if the connection is closed. */
72   boolean closed = false;
73   /** Vector of the connection's sessions. */
74   Vector sessions;
75   /** Vector of the connection's consumers. */
76   Vector cconsumers;
77   /**
78    * Table holding requests related objects, either locks of synchronous
79    * requests, or asynchronous consumers.
80    */

81   Hashtable requestsTable;
82   /**
83    * Table holding the server replies to synchronous requests.
84    */

85   Hashtable repliesTable;
86
87   String JavaDoc name = null;
88
89   /**
90    * Creates a <code>Connection</code> instance.
91    *
92    * @param factoryParameters The factory parameters.
93    * @param connectionImpl The actual connection to wrap.
94    *
95    * @exception JMSSecurityException If the user identification is incorrect.
96    * @exception IllegalStateException If the server is not listening.
97    */

98   public Connection(FactoryParameters factoryParameters,
99                     ConnectionItf connectionImpl) throws JMSException
100   {
101     try {
102       this.factoryParameters = factoryParameters;
103
104       sessions = new Vector();
105       requestsTable = new Hashtable();
106       repliesTable = new Hashtable();
107     
108       this.connectionImpl = connectionImpl;
109       name = connectionImpl.getUserName();
110
111       // Creating and starting the connection's driver:
112
driver = connectionImpl.createDriver(this);
113       driver.start();
114   
115       // Requesting the connection key and proxy identifier:
116
CnxConnectRequest req = new CnxConnectRequest();
117       CnxConnectReply rep = (CnxConnectReply) syncRequest(req);
118       proxyId = rep.getProxyId();
119       key = rep.getCnxKey();
120
121       // Transactions will be scheduled; creating a timer.
122
if (factoryParameters.txPendingTimer != 0)
123         sessionsTimer = new com.scalagent.kjoram.util.Timer();
124
125       if (JoramTracing.dbgClient)
126         JoramTracing.log(JoramTracing.DEBUG, this + ": opened.");
127     }
128     // Connection could not be established:
129
catch (JMSException jE) {
130       JoramTracing.log(JoramTracing.ERROR, jE);
131       throw jE;
132     }
133   }
134
135   public String JavaDoc getUserName() {
136     return name;
137   }
138
139   /** String image of the connection. */
140   public String JavaDoc toString()
141   {
142     return "Cnx:" + proxyId + "-" + key;
143   }
144
145   /**
146    * Specializes this Object method; returns <code>true</code> if the
147    * parameter is a <code>Connection</code> instance sharing the same
148    * proxy identifier and connection key.
149    */

150   public boolean equals(Object JavaDoc obj)
151   {
152     return (obj instanceof Connection)
153            && toString().equals(obj.toString());
154   }
155
156
157   /**
158    * API method.
159    *
160    * @exception IllegalStateException If the connection is closed.
161    * @exception InvalidSelectorException If the selector syntax is wrong.
162    * @exception InvalidDestinationException If the target destination does
163    * not exist.
164    * @exception JMSException If the method fails for any other reason.
165    */

166   public ConnectionConsumer
167       createConnectionConsumer(Destination dest, String JavaDoc selector,
168                                ServerSessionPool sessionPool,
169                                int maxMessages) throws JMSException
170   {
171     if (closed)
172       throw new IllegalStateException JavaDoc("Forbidden call on a closed"
173                                       + " connection.");
174
175     return new ConnectionConsumer(this, (Destination) dest, selector,
176                                   sessionPool, maxMessages);
177   }
178
179   /**
180    * API method.
181    *
182    * @exception IllegalStateException If the connection is closed.
183    * @exception InvalidSelectorException If the selector syntax is wrong.
184    * @exception InvalidDestinationException If the target topic does
185    * not exist.
186    * @exception JMSException If the method fails for any other reason.
187    */

188   public ConnectionConsumer
189       createDurableConnectionConsumer(Topic topic, String JavaDoc subName,
190                                       String JavaDoc selector,
191                                       ServerSessionPool sessPool,
192                                       int maxMessages) throws JMSException
193   {
194     if (closed)
195       throw new IllegalStateException JavaDoc("Forbidden call on a closed"
196                                       + " connection.");
197
198     return new ConnectionConsumer(this, (Topic) topic, subName, selector,
199                                   sessPool, maxMessages);
200   }
201
202   /**
203    * API method.
204    *
205    * @exception IllegalStateException If the connection is closed.
206    * @exception JMSException In case of an invalid acknowledge mode.
207    */

208   public Session
209       createSession(boolean transacted, int acknowledgeMode)
210     throws JMSException
211   {
212     if (closed)
213       throw new IllegalStateException JavaDoc("Forbidden call on a closed"
214                                       + " connection.");
215
216     return new Session(this, transacted, acknowledgeMode);
217   }
218
219   /**
220    * API method.
221    *
222    * @exception IllegalStateException If the connection is closed.
223    */

224   public void setExceptionListener(ExceptionListener listener)
225               throws JMSException
226   {
227     if (closed)
228       throw new IllegalStateException JavaDoc("Forbidden call on a closed"
229                                       + " connection.");
230     this.excListener = listener;
231   }
232
233   /**
234    * API method.
235    *
236    * @exception IllegalStateException If the connection is closed.
237    */

238   public ExceptionListener getExceptionListener() throws JMSException
239   {
240     if (closed)
241       throw new IllegalStateException JavaDoc("Forbidden call on a closed"
242                                       + " connection.");
243     return excListener;
244   }
245
246   /**
247    * Passes an asynchronous exception to the exception listener, if any.
248    *
249    * @param jE The asynchronous JMSException.
250    */

251   synchronized void onException(JMSException jE)
252   {
253     if (JoramTracing.dbgClient)
254       JoramTracing.log(JoramTracing.WARN, this + ": " + jE);
255
256     if (excListener != null)
257       excListener.onException(jE);
258   }
259
260   /**
261    * API method.
262    *
263    * @exception IllegalStateException Systematically thrown.
264    */

265   public void setClientID(String JavaDoc clientID) throws JMSException
266   {
267     throw new IllegalStateException JavaDoc("ClientID is already set by the"
268                                     + " provider.");
269   }
270
271   /**
272    * API method.
273    *
274    * @exception IllegalStateException If the connection is closed.
275    */

276   public String JavaDoc getClientID() throws JMSException
277   {
278     if (closed)
279       throw new IllegalStateException JavaDoc("Forbidden call on a closed"
280                                       + " connection.");
281     return proxyId;
282   }
283
284   /**
285    * API method.
286    *
287    * @exception IllegalStateException If the connection is closed.
288    */

289   public ConnectionMetaData getMetaData() throws JMSException
290   {
291     if (closed)
292       throw new IllegalStateException JavaDoc("Forbidden call on a closed"
293                                       + " connection.");
294     if (metaData == null)
295       metaData = new ConnectionMetaData();
296     return metaData;
297   }
298
299   /**
300    * API method for starting the connection.
301    *
302    * @exception IllegalStateException If the connection is closed or broken.
303    */

304   public void start() throws JMSException
305   {
306     // If closed, throwing an exception:
307
if (closed)
308       throw new IllegalStateException JavaDoc("Forbidden call on a closed"
309                                       + " connection.");
310
311     // Ignoring the call if the connection is started:
312
if (started)
313       return;
314
315     if (JoramTracing.dbgClient)
316       JoramTracing.log(JoramTracing.DEBUG, "--- " + this
317                        + ": starting...");
318
319     // Starting the sessions:
320
Session session;
321     for (int i = 0; i < sessions.size(); i++) {
322       session = (Session) sessions.elementAt(i);
323       session.repliesIn.start();
324       session.start();
325     }
326     // Sending a start request to the server:
327
asyncRequest(new CnxStartRequest());
328
329     started = true;
330
331     if (JoramTracing.dbgClient)
332       JoramTracing.log(JoramTracing.DEBUG, this + ": started.");
333   }
334
335   /**
336    * API method for stopping the connection; even if the connection appears
337    * to be broken, stops the sessions.
338    *
339    * @exception IllegalStateException If the connection is closed or broken.
340    */

341   public void stop() throws JMSException
342   {
343     IllegalStateException JavaDoc isE = null;
344
345     // If closed, throwing an exception:
346
if (closed)
347       throw new IllegalStateException JavaDoc("Forbidden call on a closed"
348                                       + " connection.");
349
350     // Ignoring the call if the connection is already stopped:
351
if (! started)
352       return;
353
354     if (JoramTracing.dbgClient)
355       JoramTracing.log(JoramTracing.DEBUG, this + ": stopping...");
356
357     // Sending a synchronous "stop" request to the server:
358
try {
359       syncRequest(new CnxStopRequest());
360     }
361     // Catching an IllegalStateException if the connection is broken:
362
catch (IllegalStateException JavaDoc caughtISE) {
363       isE = caughtISE;
364     }
365
366     // At this point, the server won't deliver messages anymore,
367
// the connection just waits for the sessions to have finished their
368
// processings.
369
Session session;
370     for (int i = 0; i < sessions.size(); i++) {
371       session = (Session) sessions.elementAt(i);
372       try {
373         session.repliesIn.stop();
374       }
375       catch (InterruptedException JavaDoc iE) {}
376       session.stop();
377     }
378
379     started = false;
380
381     if (isE != null) {
382       JoramTracing.log(JoramTracing.ERROR, isE);
383       throw isE;
384     }
385
386     if (JoramTracing.dbgClient)
387       JoramTracing.log(JoramTracing.DEBUG, this + ": is stopped.");
388   }
389
390
391   /**
392    * API method for closing the connection; even if the connection appears
393    * to be broken, closes the sessions.
394    *
395    * @exception JMSException Actually never thrown.
396    */

397   public void close() throws JMSException
398   {
399     // Ignoring the call if the connection is closed:
400
if (closed)
401       return;
402
403     closing = true;
404
405     if (JoramTracing.dbgClient)
406       JoramTracing.log(JoramTracing.DEBUG, "--- " + this
407                        + ": closing...");
408
409     // Finishing the timer, if any:
410
if (sessionsTimer != null)
411       sessionsTimer.cancel();
412
413     // Stopping the connection:
414
try {
415       stop();
416     }
417     // Catching a JMSException if the connection is broken:
418
catch (JMSException jE) {}
419
420     // Closing the sessions:
421
Session session;
422     while (! sessions.isEmpty()) {
423       session = (Session) sessions.elementAt(0);
424       try {
425         session.close();
426       }
427       // Catching a JMSException if the connection is broken:
428
catch (JMSException jE) {}
429     }
430
431     // Closing the connection consumers:
432
if (cconsumers != null) {
433       ConnectionConsumer cc;
434       while (! cconsumers.isEmpty()) {
435         cc = (ConnectionConsumer) cconsumers.elementAt(0);
436         cc.close();
437       }
438     }
439     
440     // Closing the connection:
441
connectionImpl.close();
442
443     // Shutting down the driver, if needed:
444
if (! driver.stopping)
445       driver.stop();
446
447     requestsTable.clear();
448     requestsTable = null;
449     repliesTable.clear();
450     repliesTable = null;
451
452     closed = true;
453
454     if (JoramTracing.dbgClient)
455       JoramTracing.log(JoramTracing.DEBUG, this + ": closed.");
456   }
457
458   /** Returns a new request identifier. */
459   synchronized int nextRequestId()
460   {
461     if (requestsC == Integer.MAX_VALUE)
462       requestsC = 0;
463     return requestsC++;
464   }
465
466   /** Returns a new session identifier. */
467   synchronized String JavaDoc nextSessionId()
468   {
469     if (sessionsC == Integer.MAX_VALUE)
470       sessionsC = 0;
471     sessionsC++;
472     return "c" + key + "s" + sessionsC;
473   }
474  
475   /** Returns a new message identifier. */
476   synchronized String JavaDoc nextMessageId()
477   {
478     if (messagesC == Integer.MAX_VALUE)
479       messagesC = 0;
480     messagesC++;
481     return "ID:" + proxyId + "c" + key + "m" + messagesC;
482   }
483
484   /** Returns a new subscription name. */
485   synchronized String JavaDoc nextSubName()
486   {
487     if (subsC == Integer.MAX_VALUE)
488       subsC = 0;
489     subsC++;
490     return "c" + key + "sub" + subsC;
491   }
492
493   /** Schedules a session task to the connection's timer. */
494   synchronized void schedule(com.scalagent.kjoram.util.TimerTask task)
495   {
496     if (sessionsTimer == null)
497       return;
498
499     try {
500       sessionsTimer.schedule(task, factoryParameters.txPendingTimer * 1000);
501     }
502     catch (Exception JavaDoc exc) {}
503   }
504   
505   /**
506    * Method sending a synchronous request to the server and waiting for an
507    * answer.
508    *
509    * @exception IllegalStateException If the connection is closed or broken.
510    * @exception JMSSecurityException When sending a request to a destination
511    * not accessible because of security.
512    * @exception InvalidDestinationException When sending a request to a
513    * destination that no longer exists.
514    * @exception JMSException If the request failed for any other reason.
515    */

516   AbstractJmsReply syncRequest(AbstractJmsRequest request) throws JMSException
517   {
518     if (closed)
519       throw new IllegalStateException JavaDoc("Forbidden call on a closed"
520                                       + " connection.");
521
522     if (request.getRequestId() == -1)
523       request.setRequestId(nextRequestId());
524
525     int requestId = request.getRequestId();
526
527     try {
528       if (JoramTracing.dbgClient)
529         JoramTracing.log(JoramTracing.DEBUG, this + ": sends request: "
530                          + request.getClass().getName()
531                          + " with id: " + requestId);
532
533       Lock lock = new Lock();
534       requestsTable.put(request.getKey(), lock);
535       synchronized(lock) {
536         connectionImpl.send(request);
537         while (true) {
538           try {
539             lock.wait();
540             break;
541           }
542           catch (InterruptedException JavaDoc iE) {
543             if (JoramTracing.dbgClient)
544               JoramTracing.log(JoramTracing.WARN,this
545                                + ": caught InterruptedException");
546             continue;
547           }
548         }
549         requestsTable.remove(request.getKey());
550       }
551     }
552     // Catching an exception because of...
553
catch (Exception JavaDoc e) {
554       JMSException jE = null;
555       if (e instanceof JMSException)
556         throw (JMSException) e;
557       else
558         jE = new JMSException("Exception while getting a reply.");
559
560       jE.setLinkedException(e);
561
562       // Unregistering the request:
563
if (requestsTable != null)
564         requestsTable.remove(request.getKey());
565
566       JoramTracing.log(JoramTracing.ERROR, jE);
567       throw jE;
568     }
569     // Finally, returning the reply:
570
AbstractJmsReply reply =
571       (AbstractJmsReply) repliesTable.remove(request.getKey());
572
573     if (JoramTracing.dbgClient)
574       JoramTracing.log(JoramTracing.DEBUG, this + ": got reply.");
575
576     // If the reply is null, it means that the requester has been unlocked
577
// by the driver because it detected a connection failure:
578
if (reply == null)
579       throw new IllegalStateException JavaDoc("Connection is broken.");
580     // Else, if the reply notifies of an error: throwing the appropriate exc:
581
else if (reply instanceof MomExceptionReply) {
582       MomException mE = ((MomExceptionReply) reply).getException();
583
584       if (mE instanceof AccessException)
585         throw new JMSSecurityException(mE.getMessage());
586       else if (mE instanceof DestinationException)
587         throw new InvalidDestinationException(mE.getMessage());
588       else
589         throw new JMSException(mE.getMessage());
590     }
591     // Else: returning the reply:
592
else
593       return reply;
594   }
595
596   /**
597    * Actually sends an asynchronous request to the server.
598    *
599    * @exception IllegalStateException If the connection is closed or broken.
600    */

601   void asyncRequest(AbstractJmsRequest request) throws IllegalStateException JavaDoc
602   {
603     if (closed)
604       throw new IllegalStateException JavaDoc("Forbidden call on a closed"
605                                       + " connection.");
606
607     if (request.getRequestId() == -1)
608       request.setRequestId(nextRequestId());
609
610     try {
611       if (JoramTracing.dbgClient)
612         JoramTracing.log(JoramTracing.DEBUG, this + ": sends request: "
613                          + request.getClass().getName()
614                          + " with id: " + request.getRequestId());
615       connectionImpl.send(request);
616     }
617     // In the case of a broken connection:
618
catch (IllegalStateException JavaDoc exc) {
619       // Removes the potentially stored requester:
620
requestsTable.remove(request.getKey());
621
622       JoramTracing.log(JoramTracing.ERROR, exc);
623       throw exc;
624     }
625   }
626
627   /**
628    * Method called by the driver for distributing the server replies
629    * it gets on the connection.
630    * <p>
631    * Server replies are either synchronous replies to client requests,
632    * or asynchronous message deliveries, or asynchronous exceptions
633    * notifications.
634    */

635   void distribute(AbstractJmsReply reply)
636   {
637     // Getting the correlation identifier:
638
int correlationId = reply.getCorrelationId();
639
640     if (JoramTracing.dbgClient)
641       JoramTracing.log(JoramTracing.DEBUG, this + ": got reply: "
642                        + correlationId);
643
644     Object JavaDoc obj = null;
645     if (correlationId != -1)
646       obj = requestsTable.get(reply.getKey());
647
648     // If the request is a synchronous request, putting the reply in the
649
// replies table and unlocking the requester:
650
if (obj instanceof Lock) {
651       repliesTable.put(reply.getKey(), reply);
652
653       synchronized(obj) {
654         obj.notify();
655       }
656     }
657     // If the reply is an asynchronous exception, passing it:
658
else if (reply instanceof MomExceptionReply) {
659       // Removing the potential consumer object from the table:
660
requestsTable.remove(reply.getKey());
661
662       MomException mE = ((MomExceptionReply) reply).getException();
663       JMSException jE = null;
664
665       if (mE instanceof AccessException)
666         jE = new JMSSecurityException(mE.getMessage());
667       else if (mE instanceof DestinationException)
668         jE = new InvalidDestinationException(mE.getMessage());
669       else
670         jE = new JMSException(mE.getMessage());
671
672       onException(jE);
673     }
674     // Else, if the reply is an asynchronous delivery:
675
else if (obj != null) {
676       try {
677         // Passing the reply to its consumer:
678
if (obj instanceof ConnectionConsumer)
679           ((ConnectionConsumer) obj).repliesIn.push(reply);
680         else if (obj instanceof MessageConsumer)
681           ((MessageConsumer) obj).sess.repliesIn.push(reply);
682       }
683       catch (StoppedQueueException sqE) {
684         denyDelivery((ConsumerMessages) reply);
685       }
686     }
687     // Finally, if the requester disappeared, denying the delivery:
688
else if (reply instanceof ConsumerMessages)
689       denyDelivery((ConsumerMessages) reply);
690   }
691
692   /** Actually denies a non deliverable delivery. */
693   private void denyDelivery(ConsumerMessages delivery)
694   {
695     Vector msgs = delivery.getMessages();
696     com.scalagent.kjoram.messages.Message msg;
697     Vector ids = new Vector();
698
699     for (int i = 0; i < msgs.size(); i++) {
700       msg = (com.scalagent.kjoram.messages.Message) msgs.elementAt(i);
701       ids.addElement(msg.getIdentifier());
702     }
703
704     if (ids.isEmpty())
705       return;
706
707     try {
708       // Sending the denying as an asynchronous request, as no synchronous
709
// behaviour is expected here:
710
asyncRequest(new SessDenyRequest(delivery.comesFrom(), ids,
711                                        delivery.getQueueMode(), true));
712     }
713     // If sthg goes wrong while denying, nothing more can be done!
714
catch (JMSException jE) {}
715   }
716 }
717
Popular Tags