| 1 5 package com.ubermq.jms.server.cluster; 6 7 import java.io.IOException ; 8 import java.nio.ByteBuffer ; 9 import java.rmi.RemoteException ; 10 11 import javax.jms.*; 12 import javax.jms.Connection ; 13 import javax.jms.Session ; 14 15 import org.apache.log4j.Logger; 16 import org.jgroups.*; 17 import org.jgroups.Message; 18 import org.jgroups.blocks.PullPushAdapter; 19 20 import com.ubermq.jms.client.IAcknowledgeHandler; 21 import com.ubermq.jms.client.impl.*; 22 import com.ubermq.jms.common.datagram.IMessageDatagram; 23 import com.ubermq.jms.common.datagram.impl.DatagramFactory; 24 import com.ubermq.jms.server.ServerConfig; 25 import com.ubermq.kernel.*; 26 27 30 public class JGroupsClusterMembership 31 implements ClusterMembership, javax.jms.MessageListener , org.jgroups.MessageListener, IAcknowledgeHandler 32 { 33 private static final Logger log = Logger.getLogger(JGroupsClusterMembership.class); 34 public static final String CONFIG_JGROUPS_STACK = "cluster.jgroups.stack"; 35 36 private Connection local; 37 private Session s; 38 private MessageConsumer consumer; 39 private AbstractProducer producer; 40 41 private JChannel clusterChannel; 42 private PullPushAdapter adapter; 43 44 private ByteBuffer scratch; 45 46 49 public JGroupsClusterMembership() throws ChannelException 50 { 51 super(); 52 this.clusterChannel = new JChannel(Configurator.getProperty(CONFIG_JGROUPS_STACK)); 53 this.scratch = ByteBuffer.allocateDirect(Integer.valueOf(Configurator.getProperty(ConfigConstants.GENERAL_CONNECTION_BUFFER_SIZE)).intValue()); 54 } 55 56 public JChannel getChannel() 57 { 58 return clusterChannel; 59 } 60 61 64 public void join(ConnectionFactory cf) throws RemoteException  65 { 66 try 67 { 68 this.local = cf.createConnection(); 71 this.s = local.createSession(false, Session.AUTO_ACKNOWLEDGE); 72 this.consumer = 73 s.createConsumer( 74 s.createTopic( 75 Configurator.getProperty(ServerConfig.CONFIG_CLUSTERING_SUBSCRIPTION, "#"))); 76 consumer.setMessageListener(this); 77 this.producer = (AbstractProducer)s.createProducer(null); 78 79 clusterChannel.connect(Configurator.getProperty(ServerConfig.CLUSTER_NAME)); 81 82 this.adapter = new PullPushAdapter(clusterChannel); 84 adapter.setListener(this); 85 adapter.start(); 86 } 87 catch (ChannelException e) 88 { 89 log.error("unable to join channel", e); 90 throw new RemoteException (e.getMessage()); 91 } 92 catch (JMSException e) 93 { 94 log.error("JMS problems", e); 95 throw new RemoteException (e.getMessage()); 96 } 97 } 98 99 103 public synchronized void onMessage(javax.jms.Message message) 104 { 105 forwardTo(message, null); 106 } 107 108 111 protected void forwardTo(javax.jms.Message message, Address peer) 112 { 113 IDatagram d = ((LocalMessage)message).getDatagram(); 114 115 scratch.clear(); 117 DatagramFactory.getInstance().outgoing(scratch, d); 118 119 scratch.flip(); 121 byte[] wireRepresentation = new byte[scratch.remaining()]; 122 scratch.get(wireRepresentation); 123 124 Message m = new Message(peer, clusterChannel.getLocalAddress(), wireRepresentation); 127 try 128 { 129 clusterChannel.send(m); 130 log.debug("sent message to the cluster " + m); 131 } 132 catch (ChannelNotConnectedException e) 133 { 134 log.error("", e); 135 } 136 catch (ChannelClosedException e) 137 { 138 log.error("", e); 139 } 140 } 141 142 145 public synchronized void receive(Message arg0) 146 { 147 if (arg0.getSrc().equals(clusterChannel.getLocalAddress())) 149 { 150 log.debug("ignoring locally propagated message..."); 151 return; 152 } 153 154 log.debug("got message from the cluster " + arg0); 156 scratch.clear(); 157 scratch.put(arg0.getBuffer()); 158 scratch.flip(); 159 160 IMessageDatagram d = (IMessageDatagram)DatagramFactory.getInstance().incoming(scratch); 161 javax.jms.Message m = LocalMessage.getMessage(d, this); 162 try 163 { 164 producer.forward(m.getJMSDestination(), m); 165 } 166 catch (JMSException e) 167 { 168 log.error("could not forward message", e); 169 } 170 } 171 172 175 public void leave() throws RemoteException  176 { 177 try 179 { 180 local.close(); 181 } 182 catch (JMSException e) 183 { 184 throw new RemoteException (e.getMessage()); 185 } 186 187 clusterChannel.disconnect(); 189 } 190 191 public void addMembershipListener(MembershipListener ml) 192 { 193 adapter.addMembershipListener(ml); 194 } 195 196 public String toString() 197 { 198 return "JGroups clustering " + clusterChannel.getChannelName(); 199 } 200 201 204 public byte[] getState() 205 { 206 throw new UnsupportedOperationException (); 207 } 208 209 212 public void setState(byte[] arg0) 213 { 214 throw new UnsupportedOperationException (); 215 } 216 217 220 public void acknowledge(IMessageDatagram md) 221 { 222 } 223 224 225 } 226 | Popular Tags |