KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > server > cluster > JGroupsClusterMembership


1 /*
2  * Copyright (c) 2004 Rhombus Technologies, Inc.
3  * All rights reserved.
4  */

5 package com.ubermq.jms.server.cluster;
6
7 import java.io.IOException JavaDoc;
8 import java.nio.ByteBuffer JavaDoc;
9 import java.rmi.RemoteException JavaDoc;
10
11 import javax.jms.*;
12 import javax.jms.Connection JavaDoc;
13 import javax.jms.Session JavaDoc;
14
15 import org.apache.log4j.Logger;
16 import org.jgroups.*;
17 import org.jgroups.Message;
18 import org.jgroups.blocks.PullPushAdapter;
19
20 import com.ubermq.jms.client.IAcknowledgeHandler;
21 import com.ubermq.jms.client.impl.*;
22 import com.ubermq.jms.common.datagram.IMessageDatagram;
23 import com.ubermq.jms.common.datagram.impl.DatagramFactory;
24 import com.ubermq.jms.server.ServerConfig;
25 import com.ubermq.kernel.*;
26
27 /**
28  * Uses JGroups to do clustering.
29  */

30 public class JGroupsClusterMembership
31     implements ClusterMembership, javax.jms.MessageListener JavaDoc, org.jgroups.MessageListener, IAcknowledgeHandler
32 {
33     private static final Logger log = Logger.getLogger(JGroupsClusterMembership.class);
34     public static final String JavaDoc CONFIG_JGROUPS_STACK = "cluster.jgroups.stack";
35
36     private Connection JavaDoc local;
37     private Session JavaDoc s;
38     private MessageConsumer consumer;
39     private AbstractProducer producer;
40     
41     private JChannel clusterChannel;
42     private PullPushAdapter adapter;
43     
44     private ByteBuffer JavaDoc scratch;
45
46     /**
47      *
48      */

49     public JGroupsClusterMembership() throws ChannelException
50     {
51         super();
52         this.clusterChannel = new JChannel(Configurator.getProperty(CONFIG_JGROUPS_STACK));
53         this.scratch = ByteBuffer.allocateDirect(Integer.valueOf(Configurator.getProperty(ConfigConstants.GENERAL_CONNECTION_BUFFER_SIZE)).intValue());
54     }
55     
56     public JChannel getChannel()
57     {
58         return clusterChannel;
59     }
60
61     /* (non-Javadoc)
62      * @see com.ubermq.jms.server.cluster.ClusterMembership#join(javax.jms.ConnectionFactory)
63      */

64     public void join(ConnectionFactory cf) throws RemoteException JavaDoc
65     {
66         try
67         {
68             // subscribe locally. any message that comes in locally on JMS, we toss out into the
69
// cluster via JGroups.
70
this.local = cf.createConnection();
71             this.s = local.createSession(false, Session.AUTO_ACKNOWLEDGE);
72             this.consumer =
73                 s.createConsumer(
74                     s.createTopic(
75                         Configurator.getProperty(ServerConfig.CONFIG_CLUSTERING_SUBSCRIPTION, "#")));
76             consumer.setMessageListener(this);
77             this.producer = (AbstractProducer)s.createProducer(null);
78
79             // join the multicast group.
80
clusterChannel.connect(Configurator.getProperty(ServerConfig.CLUSTER_NAME));
81             
82             // use the PushPullAdapter
83
this.adapter = new PullPushAdapter(clusterChannel);
84             adapter.setListener(this);
85             adapter.start();
86         }
87         catch (ChannelException e)
88         {
89             log.error("unable to join channel", e);
90             throw new RemoteException JavaDoc(e.getMessage());
91         }
92         catch (JMSException e)
93         {
94             log.error("JMS problems", e);
95             throw new RemoteException JavaDoc(e.getMessage());
96         }
97     }
98
99     /** Passes a message to the listener.
100      *
101      * @param message the message passed to the listener
102      */

103     public synchronized void onMessage(javax.jms.Message JavaDoc message)
104     {
105         forwardTo(message, null);
106     }
107     
108     /**
109      * Forwards a JMS message to another group member (peer).
110      */

111     protected void forwardTo(javax.jms.Message JavaDoc message, Address peer)
112     {
113         IDatagram d = ((LocalMessage)message).getDatagram();
114         
115         // save into the scratch buffer.
116
scratch.clear();
117         DatagramFactory.getInstance().outgoing(scratch, d);
118         
119         // read out bytes.
120
scratch.flip();
121         byte[] wireRepresentation = new byte[scratch.remaining()];
122         scratch.get(wireRepresentation);
123
124         // ok, we see a message that was published locally. now we toss it into the
125
// cluster.
126
Message m = new Message(peer, clusterChannel.getLocalAddress(), wireRepresentation);
127         try
128         {
129             clusterChannel.send(m);
130             log.debug("sent message to the cluster " + m);
131         }
132         catch (ChannelNotConnectedException e)
133         {
134             log.error("", e);
135         }
136         catch (ChannelClosedException e)
137         {
138             log.error("", e);
139         }
140     }
141
142     /* (non-Javadoc)
143      * @see org.jgroups.MessageListener#receive(org.jgroups.Message)
144      */

145     public synchronized void receive(Message arg0)
146     {
147         // ignore messages from me.
148
if (arg0.getSrc().equals(clusterChannel.getLocalAddress()))
149         {
150             log.debug("ignoring locally propagated message...");
151             return;
152         }
153         
154         // o/w, proceed and propagate it.
155
log.debug("got message from the cluster " + arg0);
156         scratch.clear();
157         scratch.put(arg0.getBuffer());
158         scratch.flip();
159         
160         IMessageDatagram d = (IMessageDatagram)DatagramFactory.getInstance().incoming(scratch);
161         javax.jms.Message JavaDoc m = LocalMessage.getMessage(d, this);
162         try
163         {
164             producer.forward(m.getJMSDestination(), m);
165         }
166         catch (JMSException e)
167         {
168             log.error("could not forward message", e);
169         }
170     }
171
172     /* (non-Javadoc)
173          * @see com.ubermq.jms.server.cluster.ClusterMembership#leave()
174      */

175     public void leave() throws RemoteException JavaDoc
176     {
177         // stop sending messages.
178
try
179         {
180             local.close();
181         }
182         catch (JMSException e)
183         {
184             throw new RemoteException JavaDoc(e.getMessage());
185         }
186
187         // leave the cluster.
188
clusterChannel.disconnect();
189     }
190     
191     public void addMembershipListener(MembershipListener ml)
192     {
193         adapter.addMembershipListener(ml);
194     }
195
196     public String JavaDoc toString()
197     {
198         return "JGroups clustering " + clusterChannel.getChannelName();
199     }
200
201     /* (non-Javadoc)
202      * @see org.jgroups.MessageListener#getState()
203      */

204     public byte[] getState()
205     {
206         throw new UnsupportedOperationException JavaDoc();
207     }
208
209     /* (non-Javadoc)
210      * @see org.jgroups.MessageListener#setState(byte[])
211      */

212     public void setState(byte[] arg0)
213     {
214         throw new UnsupportedOperationException JavaDoc();
215     }
216
217     /* (non-Javadoc)
218      * @see com.ubermq.jms.client.IAcknowledgeHandler#acknowledge(com.ubermq.jms.common.datagram.IMessageDatagram)
219      */

220     public void acknowledge(IMessageDatagram md)
221     {
222     }
223
224
225 }
226
Popular Tags