KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > jms > serverless > SessionManager


1 /*
2  * JBoss, the OpenSource J2EE webOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.jms.serverless;
8
9 import org.jboss.logging.Logger;
10 import javax.jms.Session JavaDoc;
11 import java.util.List JavaDoc;
12 import java.util.ArrayList JavaDoc;
13 import javax.jms.JMSException JavaDoc;
14 import java.util.Iterator JavaDoc;
15 import javax.jms.Message JavaDoc;
16
17 /**
18  * The main reason for this class to exist is to insure synchronized access to the connection's
19  * session list. It also handles message delivery from the group to sessions and vice-versa.
20  *
21  * @author Ovidiu Feodorov <ovidiu@jboss.org>
22  * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $
23  *
24  **/

25 class SessionManager implements Runnable JavaDoc {
26
27     private static final Logger log = Logger.getLogger(SessionManager.class);
28
29     private GroupConnection connection;
30     private org.jgroups.util.Queue deliveryQueue;
31     private Thread JavaDoc deliveryThread;
32     private List JavaDoc sessions;
33     private int sessionCounter = 0;
34
35     SessionManager(GroupConnection connection, org.jgroups.util.Queue deliveryQueue) {
36
37         this.connection = connection;
38         this.deliveryQueue = deliveryQueue;
39         sessions = new ArrayList JavaDoc();
40         deliveryThread = new Thread JavaDoc(this, "Session Delivery Thread");
41         deliveryThread.start();
42     }
43
44     public Session JavaDoc createSession(boolean transacted, int acknowledgeMode) throws JMSException JavaDoc {
45
46         Session JavaDoc s = new SessionImpl(this, generateSessionID(), transacted, acknowledgeMode);
47         synchronized(sessions) {
48             sessions.add(s);
49         }
50         return s;
51     }
52
53     /**
54      * The only way for the managed Session to access the Connection instance. If a session
55      * needs to access the connection directly, that's the way it gets the instance.
56      **/

57     GroupConnection getConnection() {
58         return connection;
59     }
60
61     // TO_DO: acknowledgement, deal with failed deliveries
62
private void deliver(Message JavaDoc m) {
63
64         // TO_DO: single threaded access for sessions
65
// So far, the only thread that accesses dispatch() is the connection's puller thread and
66
// this will be the unique thread that accesses the Sessions. This may not be sufficient
67
// for high load, consider the possiblity to (dynamically) add new threads to handle
68
// delivery, possibly a thread per session.
69

70         synchronized(sessions) {
71             for(Iterator JavaDoc i = sessions.iterator(); i.hasNext(); ) {
72                 ((SessionImpl)i.next()).deliver(m);
73             }
74         }
75     }
76
77
78     // TO_DO: acknowledgement, deal with failed deliveries
79
private void deliver(Message JavaDoc m, String JavaDoc sessionID, String JavaDoc queueReceiverID) {
80
81
82         // TO_DO: single threaded access for sessions
83
// So far, the only thread that accesses dispatch() is the connection's puller thread and
84
// this will be the unique thread that accesses the Sessions. This may not be sufficient
85
// for high load, consider the possiblity to (dynamically) add new threads to handle
86
// delivery, possibly a thread per session.
87

88         SessionImpl session = null;
89         synchronized(sessions) {
90             for(Iterator JavaDoc i = sessions.iterator(); i.hasNext(); ) {
91                 SessionImpl crts = (SessionImpl)i.next();
92                 if (crts.getID().equals(sessionID)) {
93                     session = crts;
94                     break;
95                 }
96             }
97         }
98         if (session == null) {
99             log.error("No such session: "+sessionID+". Delivery failed!");
100         }
101         else {
102             session.deliver(m, queueReceiverID);
103         }
104     }
105
106     /**
107      * Method called by a managed sessions when a new queue receiver is created or removed.
108      * The queue receiver has to be advertised to the group, to update the queue section of the
109      * group state.
110      **/

111     void advertiseQueueReceiver(String JavaDoc sessionID, QueueReceiverImpl qr, boolean isOn)
112         throws JMSException JavaDoc {
113         try {
114             connection.
115                 advertiseQueueReceiver(qr.getQueue().getQueueName(), sessionID, qr.getID(), isOn);
116         }
117         catch(ProviderException e) {
118             // the multicast failed, the queue receiver is invalid
119
String JavaDoc msg = "Cannot advertise queue receiver";
120             JMSException JavaDoc jmse = new JMSException JavaDoc(msg);
121             jmse.setLinkedException(e);
122             throw jmse;
123         }
124     }
125
126     //
127
//
128
//
129

130     /**
131      * Generate a session ID that is quaranteed to be unique for the life time of a SessionManager
132      * instance.
133      **/

134     private synchronized String JavaDoc generateSessionID() {
135         return Integer.toString(sessionCounter++);
136     }
137
138     //
139
// Runnable INTERFACE IMPLEMENTATION
140
//
141

142     public void run() {
143
144         while(true) {
145
146             try {
147                 Object JavaDoc o = deliveryQueue.remove();
148                 if (o instanceof javax.jms.Message JavaDoc) {
149                     deliver((javax.jms.Message JavaDoc)o);
150                 }
151                 else if (o instanceof QueueCarrier) {
152                     QueueCarrier qc = (QueueCarrier)o;
153                     deliver(qc.getJMSMessage(), qc.getSessionID(), qc.getReceiverID());
154                 }
155                 else {
156                     log.warn("Unknown delivery object: " +
157                              (o == null ? "null" : o.getClass().getName()));
158                 }
159             }
160             catch(Exception JavaDoc e) {
161                 log.warn("Failed to remove element from the delivery queue", e);
162             }
163         }
164     }
165
166     //
167
// END Runnable INTERFACE IMPLEMENTATION
168
//
169

170 }
171
Popular Tags