KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > server > cluster > UnicastClusterPropagation


1 package com.ubermq.jms.server.cluster;
2
3 import java.rmi.RemoteException JavaDoc;
4 import java.util.*;
5
6 import javax.jms.*;
7
8 import org.apache.log4j.Logger;
9
10 import com.ubermq.jms.client.URLConnectionFactory;
11 import com.ubermq.jms.client.impl.AbstractProducer;
12 import com.ubermq.jms.client.unicast.UnicastConnection;
13 import com.ubermq.jms.server.ServerConfig;
14 import com.ubermq.kernel.Configurator;
15
16 /**
17  * Represents a directional propagation arc for a message. This class
18  * opens cluster connections to a local server and a foreign server, and
19  * moves messages between the two.<P>
20  *
21  * Currently, this implementation has some problems when creating clusters
22  * of more than 2 servers. It is recommended to use the new clustering
23  * mechanism that leverages JGroups for more reliable clustered operation.<P>
24  *
25  * @since 3.0
26  */

27 public class UnicastClusterPropagation implements ClusterMembership
28 {
29     private static final Logger log = Logger.getLogger(UnicastClusterPropagation.class);
30     
31     /**
32      * Who we connect to as a clustering peer. This is comma delimited.
33      */

34     public static final String JavaDoc CLUSTER_FORWARD_TO = "cluster.unicast.forward";
35     
36     /**
37      * The interval (in ms) to check remote connectivity.
38      */

39     public static final String JavaDoc CONFIG_UNICAST_HEARTBEAT_INTERVAL = "cluster.unicast.heartbeat",
40         CONFIG_UNICAST_HEARTBEAT_INTERVAL_DEFAULT = "30000";
41
42     /**
43      * contains a List of all connections that we forward messages to.
44      */

45     private List forwardConnections;
46
47     /**
48      * The connection our local message server.
49      */

50     private Connection local;
51     
52     public UnicastClusterPropagation()
53     {
54         log.info("Using UnicastClusterPropagation.");
55     }
56
57     public void join(ConnectionFactory localFactory) throws RemoteException JavaDoc
58     {
59         this.forwardConnections = new LinkedList();
60         int heartbeat = Integer.valueOf(Configurator.getProperty(CONFIG_UNICAST_HEARTBEAT_INTERVAL, CONFIG_UNICAST_HEARTBEAT_INTERVAL_DEFAULT)).intValue();
61
62         try
63         {
64             this.local = localFactory.createConnection();
65
66             // parse the configuration to find out what servers to connect to.
67
String JavaDoc sz = Configurator.getProperty(CLUSTER_FORWARD_TO, "");
68             StringTokenizer st = new StringTokenizer(sz, ",; \t");
69             while (st.hasMoreTokens())
70             {
71                 UnicastConnection foreign =
72                     (UnicastConnection)new URLConnectionFactory(st.nextToken()).createConnection();
73                 forwardConnections.add(foreign);
74                 
75                 // start the heartbeat so we can reconnect in case of
76
// remote failure.
77
foreign.startHeartbeat(heartbeat);
78
79                 // begin by forwarding in the appropriate direction.
80
forward(local, foreign);
81                 forward(foreign, local);
82
83                 foreign.start();
84                 log.debug("Created cluster propagation to " + foreign.toString());
85             }
86
87             // start propagating.
88
local.start();
89         }
90         catch (JMSException e)
91         {
92             throw new RemoteException JavaDoc(e.getMessage());
93         }
94         log.debug("Joined cluster, created " + this.forwardConnections.size() + " forward connection(s).");
95     }
96
97     public void leave() throws RemoteException JavaDoc
98     {
99         Iterator i = this.forwardConnections.iterator();
100         while(i.hasNext())
101         {
102             UnicastConnection f = (UnicastConnection)i.next();
103             f.close();
104         }
105         this.forwardConnections.clear();
106         
107         try
108         {
109             this.local.close();
110         }
111         catch (JMSException e)
112         {
113             log.error("Unable to close local connection", e);
114             throw new RemoteException JavaDoc(e.getMessage());
115         }
116         this.local = null;
117     }
118
119     /**
120      * Creates JMS subscribers that will forward an incoming message
121      * on the <code>get</code> connection to the <code>put</code> connection.
122      *
123      * @param get the connection to monitor
124      * @param put the connection to propagate messages to.
125      * @throws JMSException if something failed!
126      */

127     private void forward(Connection get, Connection put) throws JMSException
128     {
129         // subscribe at the get location
130
Session s = get.createSession(false, Session.AUTO_ACKNOWLEDGE);
131         MessageConsumer consumer = s.createConsumer(s.createTopic(Configurator.getProperty(ServerConfig.CONFIG_CLUSTERING_SUBSCRIPTION, "#")));
132
133         // publish at put
134
final AbstractProducer producer =
135             (AbstractProducer)put.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(null);
136
137         consumer.setMessageListener(new MessageListener()
138         {
139
140             /** Passes a message to the listener.
141              *
142              * @param message the message passed to the listener
143              */

144             public void onMessage(Message message)
145             {
146                 try
147                 {
148                     producer.forward(message.getJMSDestination(), message);
149                 }
150                 catch (JMSException e)
151                 {
152                     log.error("could not forward message", e);
153                 }
154             }
155         });
156     }
157 }
158
Popular Tags