KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > jms > serverless > GroupConnection


1 /*
2  * JBoss, the OpenSource J2EE webOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.jms.serverless;
8
9 import org.jboss.logging.Logger;
10 import javax.jms.Connection JavaDoc;
11 import javax.jms.JMSException JavaDoc;
12 import javax.jms.ConnectionMetaData JavaDoc;
13 import javax.jms.ExceptionListener JavaDoc;
14 import javax.jms.ConnectionConsumer JavaDoc;
15 import javax.jms.ServerSessionPool JavaDoc;
16 import javax.jms.Destination JavaDoc;
17 import javax.jms.Topic JavaDoc;
18 import javax.jms.Session JavaDoc;
19 import org.jgroups.JChannel;
20 import org.jgroups.ChannelListener;
21 import org.jgroups.Channel;
22 import org.jgroups.Address;
23 import org.jgroups.ChannelException;
24 import java.io.Serializable JavaDoc;
25 import java.net.URL JavaDoc;
26 import javax.jms.Queue JavaDoc;
27 import org.jgroups.SetStateEvent;
28 import org.jgroups.util.Util;
29 import org.jgroups.GetStateEvent;
30 import org.jgroups.View;
31 import org.jgroups.SuspectEvent;
32 import org.jgroups.ChannelClosedException;
33 import org.jgroups.ChannelNotConnectedException;
34
35 /**
36  * The main piece of the JMS client runtime. Sits in top of a JChannel and mainains the "server
37  * group" state. Delegates the session management to the SessionManager instance. Deals with
38  * message delivery to and from sessions. Implements the Connection interface.
39  *
40  * @author Ovidiu Feodorov <ovidiu@jboss.org>
41  * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $
42  *
43  **/

44 class GroupConnection implements Connection JavaDoc, Runnable JavaDoc {
45
46     private static final Logger log = Logger.getLogger(GroupConnection.class);
47
48     private static final String JavaDoc DEFAULT_SERVER_GROUP_NAME = "serverGroup";
49
50     private URL JavaDoc serverChannelConfigURL;
51
52     private SessionManager sessionManager;
53     private org.jgroups.util.Queue deliveryQueue;
54     private ConnectionState connState;
55
56 // private ChannelState channelState;
57
private GroupState groupState;
58     private Thread JavaDoc connManagementThread;
59     private JChannel serverChannel;
60
61     /**
62      * The constructor leaves the Connection in a DISCONNECTED state.
63      *
64      * @param serverChannelConfigURL the URL of the XML file containing channel configuration.
65      **/

66     GroupConnection(URL JavaDoc serverChannelConfigURL) {
67
68         this.serverChannelConfigURL = serverChannelConfigURL;
69
70         deliveryQueue = new org.jgroups.util.Queue();
71         sessionManager = new SessionManager(this, deliveryQueue);
72         groupState = new GroupState();
73         connManagementThread = new Thread JavaDoc(this, "Connection Management Thread");
74         connState = new ConnectionState();
75
76     }
77
78     /**
79      * Initalizes the connection, by connecting the channel to the server group. Should be called
80      * only once, when the Connection instance is created.
81      **/

82     void connect() throws JMSException JavaDoc {
83
84         // TO_DO: if is already connected (stopped), just return
85

86         try {
87
88             serverChannel = new JChannel(serverChannelConfigURL);
89             serverChannel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
90             serverChannel.setChannelListener(new ChannelListener() {
91                     
92                     public void channelClosed(Channel channel) {
93                         log.debug("channelClosed("+channel+")");
94                     }
95            
96                     public void channelConnected(Channel channel) {
97                         log.debug("channelConnected() to group ["+
98                                  channel.getChannelName()+"]");
99                     }
100                     
101                     public void channelDisconnected(Channel channel) {
102                         log.debug("channelDisconnected("+channel+")");
103                     }
104            
105                     public void channelReconnected(Address addr) {
106                         log.debug("channelReconnected("+addr+")");
107                     }
108            
109                     public void channelShunned() {
110                         log.debug("channelShunned()");
111                     }
112                 });
113             
114             log.debug("channel created");
115             serverChannel.connect(DEFAULT_SERVER_GROUP_NAME);
116             log.debug("channel connected");
117             connState.setStopped();
118             connManagementThread.start();
119             log.debug("Connection Management Thread started");
120             boolean getStateOK = serverChannel.getState(null, 0);
121             log.debug("getState(): "+getStateOK);
122         }
123         catch(ChannelException e) {
124             String JavaDoc msg = "Failed to create an active connection";
125             log.error(msg, e);
126             JMSException JavaDoc jmse = new JMSException JavaDoc(msg);
127             jmse.setLinkedException(e);
128             throw jmse;
129         }
130     }
131
132
133     // TO_DO: deal with situation when this method is accessed concurrently from different threads
134
void send(javax.jms.Message JavaDoc m) throws JMSException JavaDoc {
135
136         try {
137             // the Destination is already set for the message
138
if (m.getJMSDestination() instanceof Topic JavaDoc) {
139                 // for topics, multicast
140
serverChannel.send(null, null, (Serializable JavaDoc)m);
141             }
142             else {
143                 // for queues, unicast to the coordinator
144

145                 // TO_DO: optimization, if I am the only on in group, don't send the messages
146
// down the stack anymore
147
org.jgroups.Message jgmsg =
148                     new org.jgroups.Message((Address)serverChannel.getView().getMembers().get(0),
149                                             null, new QueueCarrier(m));
150                 serverChannel.send(jgmsg);
151             }
152         }
153         catch(Exception JavaDoc e) {
154             String JavaDoc msg = "Failed to send message";
155             log.error(msg, e);
156             JMSException JavaDoc jmse = new JMSException JavaDoc(msg);
157             jmse.setLinkedException(e);
158             throw jmse;
159         }
160
161     }
162
163     //
164
// Runnable INTERFACE IMPLEMENTATION
165
//
166

167     /**
168      * Code executed on the Connection Management Thread thread. It synchronously pulls JG
169      * message and events from the channel.
170      **/

171     public void run() {
172
173         Object JavaDoc incoming = null;
174
175         while(true) {
176
177             try {
178                 incoming = serverChannel.receive(0);
179             }
180             catch(ChannelClosedException e) {
181                 log.debug("Channel closed, exiting");
182                 break;
183             }
184             catch(ChannelNotConnectedException e) {
185                 log.warn("TO_DO: Channel not connected, I should block the thread ...");
186                 continue;
187             }
188             catch(Exception JavaDoc e) {
189                 // TO_DO: use a JMS ExceptionListener and do some other things as well ....
190
log.error("Failed to synchronously read from the channel", e);
191             }
192
193             try {
194                 dispatch(incoming);
195             }
196             catch(Exception JavaDoc e) {
197                 // TO_DO: I don't want that poorly written client code (dispatch() ends running
198
// MessageListener code) to throw RuntimeException and terminate this thread
199
// use the ExceptionListener and do some other things as well ....
200
log.error("Dispatching failed", e);
201             }
202         }
203     }
204
205     //
206
//
207
//
208

209     private void dispatch(Object JavaDoc o) throws Exception JavaDoc {
210
211         log.debug("dispatching "+o);
212
213         if (o instanceof SetStateEvent) {
214             byte[] buffer = ((SetStateEvent)o).getArg();
215             if (buffer == null) {
216                 // that's ok if I am the coordinator, just ignore it
217
log.debug("null group state, ignoring ...");
218             }
219             else {
220                 // update my group state
221
groupState.fromByteBuffer(buffer);
222             }
223             return;
224         }
225         else if (o instanceof GetStateEvent) {
226             // somebody is requesting the group state
227
serverChannel.returnState(groupState.toByteBuffer());
228             return;
229         }
230         else if (o instanceof View) {
231             // no use for it for the time being
232
return;
233         }
234         else if (o instanceof SuspectEvent) {
235             // no use for it for the time being
236
return;
237         }
238         else if (!(o instanceof org.jgroups.Message)) {
239             // ignore it for the time being
240
log.warn("Ignoring "+o);
241             return;
242         }
243
244         org.jgroups.Message jgmsg = (org.jgroups.Message)o;
245         Object JavaDoc payload = jgmsg.getObject();
246         if (payload instanceof ServerAdminCommand) {
247             // ADD_QUEUE_RECEIVER, aso
248
handleServerAdminCommand(jgmsg.getSrc(), (ServerAdminCommand)payload);
249         }
250         else if (payload instanceof QueueCarrier) {
251             QueueCarrier qc = (QueueCarrier)payload;
252             String JavaDoc sessionID = qc.getSessionID();
253             // this is either an initial queue carrier that forwards the message from its
254
// source to the coordinator, or a final queue carrier that forwards the message
255
// from the coordinator to its final destination.
256
if (sessionID == null) {
257                 queueForward(qc);
258             }
259             else {
260                 deliveryQueue.add(qc);
261             }
262         }
263         else if (payload instanceof javax.jms.Message JavaDoc) {
264             // deliver only if the connection is started, discard otherwise
265
if (connState.isStarted()) {
266                 deliveryQueue.add((javax.jms.Message JavaDoc)payload);
267             }
268         }
269         else {
270             log.warn("JG Message with a payload something else than a JMS Message: "+
271                      (payload == null ? "null" : payload.getClass().getName()));
272         }
273     }
274
275     private void handleServerAdminCommand(Address src, ServerAdminCommand c) {
276         //log.debug("Handling "+c.getCommand());
277
String JavaDoc comm = c.getCommand();
278         if (ServerAdminCommand.ADD_QUEUE_RECEIVER.equals(comm)) {
279             String JavaDoc queueName = (String JavaDoc)c.get(0);
280             String JavaDoc sessionID = (String JavaDoc)c.get(1);
281             String JavaDoc queueReceiverID = (String JavaDoc)c.get(2);
282             groupState.addQueueReceiver(queueName, src, sessionID, queueReceiverID);
283         }
284         else if (ServerAdminCommand.REMOVE_QUEUE_RECEIVER.equals(comm)) {
285             String JavaDoc queueName = (String JavaDoc)c.get(0);
286             String JavaDoc sessionID = (String JavaDoc)c.get(1);
287             String JavaDoc queueReceiverID = (String JavaDoc)c.get(2);
288             groupState.removeQueueReceiver(queueName, src, sessionID, queueReceiverID);
289         }
290         else {
291             log.error("Unknown server administration command: "+comm);
292         }
293     }
294
295     void advertiseQueueReceiver(String JavaDoc queueName, String JavaDoc sessionID,
296                                 String JavaDoc queueReceiverID, boolean isOn) throws ProviderException {
297
298         try {
299             // multicast the change, this will update my own state as well
300
String JavaDoc cs = isOn ?
301                 ServerAdminCommand.ADD_QUEUE_RECEIVER :
302                 ServerAdminCommand.REMOVE_QUEUE_RECEIVER;
303             ServerAdminCommand comm =
304                 new ServerAdminCommand(cs, queueName, sessionID, queueReceiverID);
305             serverChannel.send(null, null, comm);
306         }
307         catch(ChannelException e) {
308             throw new ProviderException("Failed to advertise the queue receiver", e);
309         }
310     }
311
312     private void queueForward(QueueCarrier qc) throws Exception JavaDoc {
313
314         Queue JavaDoc destQueue = (Queue JavaDoc)qc.getJMSMessage().getJMSDestination();
315         QueueReceiverAddress ra = groupState.selectReceiver(destQueue.getQueueName());
316         if (ra == null) {
317             // TO_DO: no receivers for this queue, discard it for the time being
318
log.warn("Discarding message for queue "+destQueue.getQueueName()+"!");
319             return;
320         }
321         Address destAddress = ra.getAddress();
322         qc.setSessionID(ra.getSessionID());
323         qc.setReceiverID(ra.getReceiverID());
324         
325         // forward it to the final destination
326
serverChannel.send(destAddress, null, qc);
327         
328     }
329
330     //
331
// Connection INTERFACE IMPLEMENTATION
332
//
333

334     public void start() throws JMSException JavaDoc {
335
336         // makes sense to call it only a connection that is stopped. If called on a started
337
// connection, the call is ignored. If called on a closed connection: TO_DO
338
// TO_DO: throw apropriate exceptions for illegal transitions
339
if (connState.isStarted()) {
340             return;
341         }
342         synchronized(connState) {
343             connState.setStarted();
344             connState.notify();
345         }
346
347     }
348
349     public void stop() throws JMSException JavaDoc {
350
351         // TO_DO: throw apropriate exceptions for illegal transitions
352
connState.setStopped();
353     }
354
355     public void close() throws JMSException JavaDoc {
356
357         // TO_DO: throw apropriate exceptions for illegal transitions
358
// TO_DO: read the rest of specs and make sure I comply; tests
359
if (connState.isClosed()) {
360             return;
361         }
362         connState.setClosed();
363         serverChannel.close();
364
365     }
366
367     public Session JavaDoc createSession(boolean transacted, int acknowledgeMode) throws JMSException JavaDoc {
368
369         return sessionManager.createSession(transacted, acknowledgeMode);
370
371     }
372     
373     public String JavaDoc getClientID() throws JMSException JavaDoc {
374         throw new NotImplementedException();
375     }
376
377     public void setClientID(String JavaDoc clientID) throws JMSException JavaDoc {
378
379         // Once the connection has been initialized, the runtime provides a ClientID, that cannot
380
// be changed by the user; according to JMS1.1 specs, the method should throw
381
// IllegalStateException
382
String JavaDoc msg = "ClientID ("+""+") cannot be modified";
383         throw new IllegalStateException JavaDoc(msg);
384     }
385
386     public ConnectionMetaData JavaDoc getMetaData() throws JMSException JavaDoc {
387         throw new NotImplementedException();
388     }
389
390     public ExceptionListener JavaDoc getExceptionListener() throws JMSException JavaDoc {
391         throw new NotImplementedException();
392     }
393
394     public void setExceptionListener(ExceptionListener JavaDoc listener) throws JMSException JavaDoc {
395         throw new NotImplementedException();
396     }
397
398     public ConnectionConsumer JavaDoc createConnectionConsumer(Destination JavaDoc destination,
399                                                        String JavaDoc messageSelector,
400                                                        ServerSessionPool JavaDoc sessionPool,
401                                                        int maxMessages)
402         throws JMSException JavaDoc {
403         throw new NotImplementedException();
404     }
405
406
407     public ConnectionConsumer JavaDoc createDurableConnectionConsumer(Topic JavaDoc topic,
408                                                               String JavaDoc subscriptionName,
409                                                               String JavaDoc messageSelector,
410                                                               ServerSessionPool JavaDoc sessionPool,
411                                                               int maxMessages)
412         throws JMSException JavaDoc {
413         throw new NotImplementedException();
414     }
415
416     //
417
// END OF Connection INTERFACE IMPLEMENTATION
418
//
419

420
421     /**
422      * Debugging only
423      **/

424     public static void main(String JavaDoc[] args) throws Exception JavaDoc {
425
426         GroupConnection c = new GroupConnection(new URL JavaDoc(args[0]));
427         c.connect();
428     }
429
430
431
432 }
433
Popular Tags