1 package com.ubermq.jms.server.cluster; 2 3 import java.rmi.RemoteException ; 4 import java.util.*; 5 6 import javax.jms.*; 7 8 import org.apache.log4j.Logger; 9 10 import com.ubermq.jms.client.URLConnectionFactory; 11 import com.ubermq.jms.client.impl.AbstractProducer; 12 import com.ubermq.jms.client.unicast.UnicastConnection; 13 import com.ubermq.jms.server.ServerConfig; 14 import com.ubermq.kernel.Configurator; 15 16 27 public class UnicastClusterPropagation implements ClusterMembership 28 { 29 private static final Logger log = Logger.getLogger(UnicastClusterPropagation.class); 30 31 34 public static final String CLUSTER_FORWARD_TO = "cluster.unicast.forward"; 35 36 39 public static final String CONFIG_UNICAST_HEARTBEAT_INTERVAL = "cluster.unicast.heartbeat", 40 CONFIG_UNICAST_HEARTBEAT_INTERVAL_DEFAULT = "30000"; 41 42 45 private List forwardConnections; 46 47 50 private Connection local; 51 52 public UnicastClusterPropagation() 53 { 54 log.info("Using UnicastClusterPropagation."); 55 } 56 57 public void join(ConnectionFactory localFactory) throws RemoteException 58 { 59 this.forwardConnections = new LinkedList(); 60 int heartbeat = Integer.valueOf(Configurator.getProperty(CONFIG_UNICAST_HEARTBEAT_INTERVAL, CONFIG_UNICAST_HEARTBEAT_INTERVAL_DEFAULT)).intValue(); 61 62 try 63 { 64 this.local = localFactory.createConnection(); 65 66 String sz = Configurator.getProperty(CLUSTER_FORWARD_TO, ""); 68 StringTokenizer st = new StringTokenizer(sz, ",; \t"); 69 while (st.hasMoreTokens()) 70 { 71 UnicastConnection foreign = 72 (UnicastConnection)new URLConnectionFactory(st.nextToken()).createConnection(); 73 forwardConnections.add(foreign); 74 75 foreign.startHeartbeat(heartbeat); 78 79 forward(local, foreign); 81 forward(foreign, local); 82 83 foreign.start(); 84 log.debug("Created cluster propagation to " + foreign.toString()); 85 } 86 87 local.start(); 89 } 90 catch (JMSException e) 91 { 92 throw new RemoteException (e.getMessage()); 93 } 94 log.debug("Joined cluster, created " + this.forwardConnections.size() + " forward connection(s)."); 95 } 96 97 public void leave() throws RemoteException 98 { 99 Iterator i = this.forwardConnections.iterator(); 100 while(i.hasNext()) 101 { 102 UnicastConnection f = (UnicastConnection)i.next(); 103 f.close(); 104 } 105 this.forwardConnections.clear(); 106 107 try 108 { 109 this.local.close(); 110 } 111 catch (JMSException e) 112 { 113 log.error("Unable to close local connection", e); 114 throw new RemoteException (e.getMessage()); 115 } 116 this.local = null; 117 } 118 119 127 private void forward(Connection get, Connection put) throws JMSException 128 { 129 Session s = get.createSession(false, Session.AUTO_ACKNOWLEDGE); 131 MessageConsumer consumer = s.createConsumer(s.createTopic(Configurator.getProperty(ServerConfig.CONFIG_CLUSTERING_SUBSCRIPTION, "#"))); 132 133 final AbstractProducer producer = 135 (AbstractProducer)put.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(null); 136 137 consumer.setMessageListener(new MessageListener() 138 { 139 140 144 public void onMessage(Message message) 145 { 146 try 147 { 148 producer.forward(message.getJMSDestination(), message); 149 } 150 catch (JMSException e) 151 { 152 log.error("could not forward message", e); 153 } 154 } 155 }); 156 } 157 } 158 | Popular Tags |