KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > api > jms > MantaConnection


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Nimo 24-Feb-2004.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 package org.mr.api.jms;
47
48 import org.apache.commons.logging.Log;
49 import org.apache.commons.logging.LogFactory;
50 import java.util.ArrayList JavaDoc;
51 import java.util.Hashtable JavaDoc;
52 import java.util.Iterator JavaDoc;
53
54 import javax.jms.*;
55 import javax.jms.IllegalStateException JavaDoc;
56
57 import org.mr.kernel.security.MantaAuthentication;
58 import org.mr.kernel.security.MantaAuthorization;
59 import org.mr.kernel.security.SecurityActionTypes;
60 import org.mr.kernel.security.SessionID;
61 import org.mr.kernel.services.MantaService;
62 import org.mr.kernel.services.ServiceProducer;
63 import org.mr.kernel.services.queues.VirtualQueuesManager;
64 import org.mr.kernel.services.topics.VirtualTopicManager;
65 import org.mr.MantaAgent;
66 import org.mr.MantaException;
67 import org.mr.core.protocol.MantaBusMessage;
68 import org.mr.api.jms.selector.SelectorManager;
69 import org.mr.core.protocol.MantaBusMessageConsts;
70
71 /**
72  * A Connection object is a client's active connection to its JMS provider. It
73  * typically allocates provider resources outside the Java virtual machine (JVM).
74  *
75  * Connections support concurrent use.
76  *
77  * A connection serves several purposes:
78  * It encapsulates an open connection with a JMS provider. It typically represents
79  * an open TCP/IP socket between a client and the service provider software.
80  * Its creation is where client authentication takes place.
81  * It can specify a unique client identifier.
82  * It provides a ConnectionMetaData object.
83  * It supports an optional ExceptionListener object.
84  * Because the creation of a connection involves setting up authentication and
85  * communication, a connection is a relatively heavyweight object. Most clients
86  * will do all their messaging with a single connection. Other more advanced
87  * applications may use several connections. The JMS API does not architect a
88  * reason for using multiple connections; however, there may be operational
89  * reasons for doing so.
90  *
91  * A JMS client typically creates a connection, one or more sessions, and a number
92  * of message producers and consumers. When a connection is created, it is in
93  * stopped mode. That means that no messages are being delivered.
94  *
95  * It is typical to leave the connection in stopped mode until setup is complete
96  * (that is, until all message consumers have been created). At that point, the
97  * client calls the connection's start method, and messages begin arriving at the
98  * connection's consumers. This setup convention minimizes any client confusion
99  * that may result from asynchronous message delivery while the client is still in
100  * the process of setting itself up.
101  *
102  * A connection can be started immediately, and the setup can be done afterwards.
103  * Clients that do this must be prepared to handle asynchronous message delivery
104  * while they are still in the process of setting up.
105  *
106  * A message producer can send messages while a connection is stopped.
107  *
108  * @author Nimo 24-Feb-2004
109  */

110
111 public class MantaConnection implements Connection, QueueConnection, TopicConnection {
112     public Log log;
113
114     //Constructors--------------------------------------------------------------------
115
/**
116      * A constructor for building a connection with no user credentials. This
117      * will use the default credentials.
118      *
119      * note: no one should create a connection unless by the connection factory,
120      * hence the constructors have the default access modifier.
121      */

122     MantaConnection(MantaConnectionFactory factory) throws JMSException {
123         this(factory, MantaConnection.DEF_USER, MantaConnection.DEF_PASSWORD);
124     } //MantaConnection
125

126     /**
127      * This is the real constructor of course.
128      *
129      * @param factory - the connection factory that created this connection.
130      * @param userName - a username on this connection.
131      * @param password - the password for this username.
132      *
133      * @throws JMSException
134      */

135     MantaConnection(MantaConnectionFactory factory, String JavaDoc userName,
136             String JavaDoc password) throws JMSException {
137         log = LogFactory.getLog("MantaConnection");
138         //this.monitor = ExceptionMonitor.getInstance();
139
if (factory == null)
140             throw new JMSException("MNJMS000BA : FAILED TO CREATE CONNECTION. FACTORY IS NULL.");
141         this.theFactory = factory;
142         this.isStarted = false;
143         this.isOpened = true;
144         this.mantaConnectionMetaData = new MantaConnectionMetaData();
145         this.userName = userName;
146         this.password = password;
147         this.messageChannel = factory.getChannel();
148         //we know we have manta check security
149
MantaAuthentication ma = messageChannel.getSingletonRepository().getMantaAuthentication();
150         if(ma !=null){
151             this.securitySessionId = ma.authenticate(userName,password );
152             this.authorizator = messageChannel.getSingletonRepository().getMantaAuthorization();
153         }
154         this.clientId = messageChannel.getMessageId();
155         this.msgSelectorManager = SelectorManager.getInstance();
156         this.messageChannel.getSingletonRepository().getSelectorsManager()
157                 .setSelector(MantaBusMessageConsts.PAYLOAD_TYPE_JMS,
158                         msgSelectorManager);
159         //Repositories for objects created by this connection.
160
this.tempTopics = new Hashtable JavaDoc();
161         this.tempQueues = new Hashtable JavaDoc();
162         this.createdSessions = new ArrayList JavaDoc();
163     } //MantaConnection
164

165     /**
166      * JMS 1.1 specification.
167      *
168      * Creates a new session object.
169      *
170      * @param :
171      * transacted - is this session supposed to be transacted?
172      * @param :
173      * acknowledgeMode - what is the required ack mode for this
174      * session
175      *
176      * (if one of these params implies that the session is transacted - it will
177      * be transacted).
178      *
179      * @return : a Session object.
180      */

181     public Session createSession(boolean transacted, int acknowledgeMode)
182             throws JMSException {
183         
184         checkLegalOperation();
185
186         MantaSession sess = new MantaSession(messageChannel.getMessageId(),this,
187                 acknowledgeMode,transacted);
188         
189         addSession(sess);
190         return sess;
191     } //createSession
192

193     /**
194      * Creates a QueueSession for this connection.
195      */

196     public QueueSession createQueueSession(boolean trx, int acknowledgeMode) throws JMSException {
197         checkLegalOperation();
198
199         MantaQueueSession sess = new MantaQueueSession(messageChannel.getMessageId(),this,
200                 trx,acknowledgeMode);
201         addSession(sess);
202         return sess;
203     }
204     
205     /**
206      * Create a Topic Session on this connection
207      */

208     public TopicSession createTopicSession (boolean trx, int acknowledgeMode) throws JMSException{
209
210         checkLegalOperation();
211
212         MantaTopicSession sess = new MantaTopicSession(messageChannel.getMessageId(),this,
213                 trx,acknowledgeMode);
214         
215         addSession(sess);
216         return sess;
217     }
218     
219     /**
220      * Gets the client identifier for this connection.
221      *
222      * @return the unique client identifier
223      */

224     public String JavaDoc getClientID() throws JMSException {
225
226         checkLegalOperation();
227         return clientId;
228     } //getClientID
229

230     /**
231      * Sets the client identifier for this connection.
232      * <P>
233      * The preferred way to assign a JMS client's client identifier is for it to
234      * be configured in a client-specific <CODE>ConnectionFactory</CODE>
235      * object and transparently assigned to the <CODE>Connection</CODE> object
236      * it creates.
237      * <P>
238      */

239     public void setClientID(String JavaDoc clientID) throws JMSException {
240         checkLegalOperation();
241         if (this.getClientID() != null)
242             throw new IllegalStateException JavaDoc("MNJMS00026 : METHOD setClientID() FAILED. SET BY SYSTEM.");
243             
244     } //setClientID
245

246     /**
247      * Gets the metadata for this connection.
248      *
249      * @return the connection metadata
250      */

251     public ConnectionMetaData getMetaData() throws JMSException {
252         checkLegalOperation();
253
254         return mantaConnectionMetaData;
255     } //getMetaData
256

257     /**
258      * Gets the ExceptionListener object for this connection. Not every
259      * Connection has an ExceptionListener associated with it.
260      *
261      * @return the <CODE>ExceptionListener</CODE> for this connection, or
262      * null. if no <CODE>ExceptionListener</CODE> is associated with
263      * this connection.
264      */

265     public ExceptionListener getExceptionListener() throws JMSException {
266         checkLegalOperation();
267         return this.abnormalExceptionListener;
268     } //getExceptionListener
269

270     /**
271      * Sets an exception listener for this connection.
272      *
273      * @param listener -
274      * the exception listener
275      */

276     public void setExceptionListener(ExceptionListener listener)
277             throws JMSException {
278         checkLegalOperation();
279         this.abnormalExceptionListener = listener;
280     } //setExceptionListener
281

282     /**
283      * Notify the ExceptionListener that an exception has occured. It is used
284      * only if there is an ExceptionListener on the connection.
285      */

286     protected void notifyListener(JMSException jmse) {
287         if (this.abnormalExceptionListener != null && jmse != null)
288             this.abnormalExceptionListener.onException(jmse);
289     }
290
291     /**
292      * Starts (or restarts) a connection's delivery of incoming messages. A call
293      * to <CODE>start</CODE> on a connection that has already been started is
294      * ignored.
295      *
296      */

297     public synchronized void start() throws JMSException {
298         checkLegalOperation();
299         if (isStarted())
300             return;
301         
302         try {
303             synchronized (createdSessions) {
304                 Iterator JavaDoc sessionsToNotify = createdSessions.iterator();
305                 while (sessionsToNotify.hasNext()) {
306                     MantaSession session = (MantaSession) sessionsToNotify.next();
307                     try {
308                         session.start();
309                     } catch (JMSException e) {
310                         log.error("An error occured while starting a session. ", e);
311                     }
312                 }
313             }
314         }
315         finally {
316             isStarted = true;
317         }
318     } //start
319

320     /**
321      * Temporarily stops a connection's delivery of incoming messages. Delivery
322      * can be restarted using the connection's start method. When the connection
323      * is stopped, delivery to all the connection's message consumers is
324      * inhibited: synchronous receives block, and messages are not delivered to
325      * message listeners.
326      */

327     public synchronized void stop() throws JMSException {
328         checkLegalOperation();
329         if (!isStarted())
330             return;
331
332         try {
333             //Stop all sessions before stopping the connection.
334
synchronized (createdSessions) {
335                 Iterator JavaDoc sessionsToNotify = createdSessions.iterator();
336                 while (sessionsToNotify.hasNext()) {
337                     MantaSession session = (MantaSession) sessionsToNotify.next();
338                     session.stop();
339                 }
340             }
341         }
342         finally {
343             isStarted = false;
344         }
345     } //stop
346

347     /**
348      * Closes the connection.
349      *
350      * Since a provider typically allocates significant resources outside the
351      * JVM on behalf of a connection, clients should close these resources when
352      * they are not needed. Relying on garbage collection to eventually reclaim
353      * these resources may not be timely enough.
354      *
355      * There is no need to close the sessions, producers, and consumers of a
356      * closed connection.
357      *
358      * Closing a connection causes all temporary destinations to be deleted.
359      *
360      */

361
362     public synchronized void close() throws JMSException {
363         if (!isOpened)
364             return;
365         
366         try {
367             cleanup();
368             MantaAuthentication ma = messageChannel.getSingletonRepository().getMantaAuthentication();
369             if(ma!=null){
370                 ma.logout(securitySessionId);
371             }
372                 
373             //remove the agent connection.
374
this.messageChannel = null;
375             //remove this connection from the factory list of
376
//connections, if any...
377
this.theFactory.removeConnection(this);
378             this.theFactory = null;
379             this.tempTopics = null;
380             this.tempQueues = null;
381         } catch (Exception JavaDoc e) {
382             log.error("Connection.close() failed",e);
383             throw new JMSException("MNJMS00027 : FAILED ON CONNECTION close() METHOD. ERROR TEXT : "+e.getMessage());
384         }
385         finally {
386             isOpened = false;
387             
388         }
389     } //close
390

391     //to be used by JCA, so the connection object may be returned to the pool.
392
public synchronized void cleanup() throws JMSException {
393         try {
394             // stop all sessions
395
stop();
396             
397             // close all sessions
398
synchronized (createdSessions) {
399                 ArrayList JavaDoc sessionsForIterator = new ArrayList JavaDoc(createdSessions);
400                 Iterator JavaDoc sessionsToNotify = sessionsForIterator.iterator();
401                 while (sessionsToNotify.hasNext()) {
402                     MantaSession session = (MantaSession) sessionsToNotify.next();
403                     session.close();
404                     // the session should deregister from the connection
405
}
406             }
407             
408             // discard all temporary topics
409
Iterator JavaDoc it = tempTopics.values().iterator();
410             VirtualTopicManager topicsManager = MantaAgent.getInstance().getSingletonRepository().getVirtualTopicManager();
411             while (it.hasNext()) {
412                 topicsManager.closeTopic(it.next().toString());
413             }
414
415             // discard all temporary queues
416
VirtualQueuesManager queuesManager = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager();
417             it = tempQueues.values().iterator();
418             while (it.hasNext()) {
419                 queuesManager.closeQueue(it.next().toString());
420             }
421         }
422         catch (Exception JavaDoc e) {
423             log.error("An Error occured during connection cleanup.", e);
424             throw new JMSException("MNJMS00028 : FAILED ON CONNECTION cleanup() METHOD. ERROR TEXT : "+e.getMessage());
425         }
426     }
427
428     
429     /**
430      * Creates a connection consumer for this connection.
431      * This is an expert facility not used by regular JMS clients.
432      * Only for the application server this method is.
433      *
434      */

435
436     public ConnectionConsumer createConnectionConsumer(Destination destination,
437             String JavaDoc messageSelector, ServerSessionPool sessionPool,
438             int maxMessages) throws JMSException {
439
440         checkLegalOperation();
441         if(destination instanceof Queue){
442             authorize(SecurityActionTypes.ACTION_CREATE_PRODUCER_FOR_QUEUE,destination.toString());
443             
444         }else{
445             authorize(SecurityActionTypes.ACTION_CREATE_PRODUCER_FOR_TOPIC,destination.toString());
446         }
447
448         MantaConnectionConsumer corCc = new MantaConnectionConsumer(
449                 this, destination, messageSelector, sessionPool, maxMessages);
450         return corCc;
451
452     } //createConnectionConsumer
453

454     /**
455      * A package method, used for acking a message on behalf of a session.
456      */

457     void ack(MantaBusMessage msg) throws JMSException {
458         checkLegalOperation();
459         this.messageChannel.ack(msg);
460     }
461
462     /**
463      * Create a durable connection consumer for this connection (optional
464      * operation). This is an expert facility not used by regular JMS clients.
465      *
466      * A connection consumer is usually used by application servers to connect an
467      * MDB to the MantaRay layer.
468      *
469      */

470     public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
471             String JavaDoc subscriptionName, String JavaDoc messageSelector,
472             ServerSessionPool sessionPool, int maxMessages) throws JMSException {
473         checkLegalOperation();
474         return createConnectionConsumer(topic, messageSelector, sessionPool,
475                 maxMessages);
476     } //createDurableConnectionConsumer
477

478     /**
479      * This method creates a temporary queue, owned by the connection. The name
480      * of the temp queue is &&TMPQ{ <mantaAddress>}[<connectionId>]## the mantaAddress is
481      * the name for the specific agent. this is done so the other agents on the
482      * network may be a able to send messages to it. the ## is the temp queue
483      * number inside the connection.
484      *
485      */

486     protected TemporaryQueue addTempQueue() throws JMSException {
487         checkLegalOperation();
488         String JavaDoc name = MantaConnection.TMP_QUEUE_PREFIX + "{"
489                 + getChannel().getAgentName() + "}" + "[" + this.getClientID()
490                 + "]" + tempQueuesNum++;
491         MantaTemporaryQueue result = new MantaTemporaryQueue(name,this);
492         this.tempQueues.put(name, result);
493         return result;
494     } //addTempQueue
495

496     /**
497      * This method creates a temporary topic, owned by the connection. The name
498      * of the temp topic is &&TMPT{ <mantaAddress> }[<connectionId>]## the mantaAddress is
499      * the name for the specific agent. this is done so the other agents on the
500      * network may be a able to send messages to it. the ## is the temp topic
501      * number inside the connection.
502      *
503      */

504     protected TemporaryTopic addTempTopic() throws JMSException {
505         checkLegalOperation();
506         String JavaDoc topicName = MantaConnection.TMP_TOPIC_PREFIX + "{"
507                 + getChannel().getAgentName() + "}" + "[" + this.getClientID()
508                 + "]" + tempTopicsNum++;
509         
510         MantaTemporaryTopic result = new MantaTemporaryTopic(topicName, this);
511         tempTopics.put(topicName, result);
512         return result;
513     } //addTempTopic
514

515     protected void deleteTempTopic(String JavaDoc serviceName) {
516         tempTopics.remove(serviceName);
517     }
518
519     protected void deleteTempQueue(String JavaDoc serviceName) throws MantaException {
520         tempQueues.remove(serviceName);
521         VirtualQueuesManager queuesManager = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager();
522         queuesManager.closeQueue(serviceName);
523     }
524
525     
526
527     
528     /**
529      * This method returns true if the connection is started, false otherwise.
530      */

531     boolean isStarted() {
532         return isStarted; //allowed because it's not an object.
533
} //isStarted
534

535     /**
536      * This method is only for an application server use, when it wants to register
537      * an MDB on a queue. This is not for a regular end-user's use.
538      *
539      * @param queue - the queue to consume
540      * @param messageSelector - the message selector
541      * @param sessionPool - the session pool (this is an application server class)
542      * @param maxMessages - maximum messages to be consumed by a session
543      *
544      * @throws JMSException
545      */

546     public ConnectionConsumer createConnectionConsumer(Queue queue,
547             String JavaDoc messageSelector, ServerSessionPool sessionPool,
548             int maxMessages) throws JMSException {
549         checkLegalOperation();
550         return createConnectionConsumer((Destination) queue, messageSelector,
551                 sessionPool, maxMessages);
552     }
553
554     /**
555      * This method is only for an application server use, when it wants to register
556      * an MDB on a topic. This is not for a regular end-user's use.
557      *
558      * @param topic - the topic to consume
559      * @param messageSelector - the message selector
560      * @param sessionPool - the session pool (this is an application server class)
561      * @param maxMessages - maximum messages to be consumed by a session
562      *
563      * @throws JMSException
564      */

565     public ConnectionConsumer createConnectionConsumer(Topic topic,
566             String JavaDoc messageSelector, ServerSessionPool sessionPool,
567             int maxMessages) throws JMSException {
568
569         checkLegalOperation();
570         return createConnectionConsumer((Destination) topic, messageSelector,
571                 sessionPool, maxMessages);
572     }
573
574     /**
575      * this checks that the operation we are trying to do is legal - that means -
576      * that the connection is started. it is protected, so it can be inherited
577      * by extending classes.
578      *
579      * @throws JMSException
580      */

581     protected void checkLegalOperation() throws JMSException {
582         if (!isOpened)
583             throw new IllegalStateException JavaDoc("MNJMS0002D : ILLEGAL OPERATION. CONNECTION IS CLOSED.");
584     }
585
586     /**
587      * add a session to the internal sessions repository
588      * @param session - the session to add.
589      */

590     void addSession(Session session) {
591         synchronized (createdSessions) {
592             createdSessions.add(session);
593         }
594     }
595
596     /*
597      * remove a session from the internal sessions repository
598      * @param session - the session to remove,
599      */

600     void deleteSession(Session session) {
601         synchronized (createdSessions) {
602             createdSessions.remove(session);
603         }
604     }
605
606     
607     void authorize(int actionType, Object JavaDoc param) throws JMSSecurityException{
608         if(authorizator!=null){
609             authorizator.authorize(securitySessionId, actionType,param );
610         }
611     }
612     void authorize(int actionType) throws JMSSecurityException{
613         if(authorizator!=null){
614             authorizator.authorize(securitySessionId, actionType);
615         }
616     }
617
618     /*
619      * this gets the MantaAgent instance of this machine.
620      *
621      * @return the MantaAgent instance.
622      */

623     public MantaAgent getChannel() //only package access
624
{
625         return messageChannel;
626     }
627
628     
629     public String JavaDoc getUserName() {
630         return userName;
631     }
632     
633     //FIELDS ------------------------------------------------------------------
634
protected ExceptionListener abnormalExceptionListener = null;
635
636     protected String JavaDoc clientId;
637
638     protected String JavaDoc userName = null;
639
640     protected String JavaDoc password = null;
641
642     protected ArrayList JavaDoc createdSessions;
643
644     protected Hashtable JavaDoc tempTopics;
645
646     protected Hashtable JavaDoc tempQueues;
647
648     protected MantaConnectionFactory theFactory = null;
649
650     protected ConnectionMetaData mantaConnectionMetaData = null;
651
652     protected MantaAgent messageChannel = null;
653
654     SelectorManager msgSelectorManager;
655
656     protected int sessionsNumber = 0;
657
658     protected int tempTopicsNum = 0;
659
660     protected int tempQueuesNum = 0;
661
662     protected boolean isOpened;
663
664     protected boolean isStarted;
665     
666     // security
667
protected SessionID securitySessionId;
668     protected MantaAuthorization authorizator;
669
670     //Constants for the JMS adapter package.
671

672     final static String JavaDoc DEF_USER = "unknown";
673
674     final static String JavaDoc DEF_PASSWORD = "unknown";
675
676     public final static String JavaDoc TMP_DESTINATION_PREFIX = "&&TMP";
677
678     public final static String JavaDoc TMP_QUEUE_PREFIX = "&&TMPQ";
679
680     final static String JavaDoc TMP_TOPIC_PREFIX = "&&TMPT";
681
682     final static String JavaDoc JMS_AGENT_NAME = "MANTAJMS";
683
684     
685 }
686
Popular Tags