KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mmbase > clustering > jgroups > Multicast


1 /*
2
3 This software is OSI Certified Open Source Software.
4 OSI Certified is a certification mark of the Open Source Initiative.
5
6 The license (Mozilla version 1.0) can be read at the MMBase site.
7 See http://www.MMBase.org/license
8
9 */

10 package org.mmbase.clustering.jgroups;
11
12 import java.util.Map JavaDoc;
13 import java.util.regex.Pattern JavaDoc;
14 import java.util.regex.Matcher JavaDoc;
15
16 import org.mmbase.clustering.ClusterManager;
17 import org.mmbase.util.logging.Logger;
18 import org.mmbase.util.logging.Logging;
19 import org.mmbase.util.xml.UtilReader;
20
21 import org.jgroups.*;
22
23 /**
24  * Multicast is a thread object that reads the receive queue and spawns them to
25  * call the objects (listeners) who need to know. The Multicast start two
26  * threads to handle the sending and receiving of multicast messages. This is
27  * the version that uses JavaGroups (JGroups) to ensure reliable delivery of
28  * multicast messages.
29  *
30  * See <a HREF="http://www.jgroups.org/">http://www.jgroups.org/</a> for more
31  * information on JGroups.
32  *
33  * @see org.mmbase.clustering.jgroups.ChangesSender
34  * @see org.mmbase.clustering.jgroups.ChangesReceiver
35  *
36  * @author Daniel Ockeloen
37  * @author Rico Jansen
38  * @author Nico Klasens
39  * @author Costyn van Dongen
40  * @author Ronald Wildenberg
41  * @version $Id: Multicast.java,v 1.9 2006/07/06 11:40:48 michiel Exp $
42  */

43 public class Multicast extends ClusterManager {
44
45     private static final Logger log = Logging.getLoggerInstance(Multicast.class);
46
47     /**
48      * Field containing the configuration file with the various options that
49      * can be specified for configuring the JGroups channel
50      * */

51     public static final String JavaDoc CONFIG_FILE = "multicastJG.xml";
52
53     /**
54      * Sender which reads the nodesToSend Queue amd puts the message on the
55      * line
56      * */

57     private ChangesSender mcs;
58
59     /**
60      * Receiver which reads the message from the line and puts message in the
61      * nodesToSpawn Queue
62      * */

63     private ChangesReceiver mcr;
64
65     /**
66      * JChannel which the ChangesReceiver and ChangesSender
67      * use to communicate with other instances
68      * */

69     private JChannel channel;
70
71     /**
72      * channelproperties A string specifying the properties of the JChannel
73      * protocol stack.
74      * */

75     private String JavaDoc channelProperties;
76
77     /**
78      * Name which the various MMBase instances use to communicate with
79      * each other. If there are different clouds in a network which should not
80      * communicate, this name should be different for each group of clouds
81      * communicating with each other.
82      * */

83     private String JavaDoc channelName;
84
85     /**
86      * @since MMBase-1.8.1
87      */

88     private final UtilReader reader = new UtilReader(CONFIG_FILE,
89                                                      new Runnable JavaDoc() {
90                                                          public void run() {
91                                                              synchronized(Multicast.this) {
92                                                                  stopCommunicationThreads();
93                                                                  readConfiguration(reader.getProperties());
94                                                                  startCommunicationThreads();
95                                                              }
96                                                          }
97                                                      });
98
99     /**
100      */

101     public Multicast() {
102         readConfiguration(reader.getProperties());
103         start();
104     }
105
106     /**
107      * @since MMBase-1.8.1
108      */

109     protected synchronized void readConfiguration(Map JavaDoc configuration) {
110         super.readConfiguration(configuration);
111
112         String JavaDoc tmp = (String JavaDoc) configuration.get("channelproperties");
113         if (tmp != null && !tmp.equals("")) {
114             channelProperties = tmp;
115         } else {
116             log.error("No channel properties found");
117         }
118
119         tmp = (String JavaDoc) configuration.get("channelname");
120         if (tmp != null && !tmp.equals("")) {
121             channelName = tmp;
122         }
123
124         /**
125          * We need to strip out white space characters from the
126          * channelproperties string before we pass it onto new JChannel().
127          */

128         Pattern JavaDoc p = Pattern.compile("\\s");
129         Matcher JavaDoc m = p.matcher(channelProperties);
130         channelProperties = m.replaceAll("");
131
132         try {
133             if (channel != null) channel.disconnect();
134             channel = new JChannel(channelProperties);
135             channel.connect(channelName);
136         } catch (ChannelException createChannelException) {
137             log.error("JChannel: Unable to create or join multicast channel: " + createChannelException.getMessage(), createChannelException);
138             if (channel == null) {
139                 return;
140             }
141         }
142
143         if (channel.isConnected()) {
144             log.info("Joining channel: " + channel.toString(true));
145         } else {
146             log.warn("Could not connect channel: " + channel.toString(true));
147         }
148     }
149
150     /**
151      * Starts the ChangesSender and MulticastChangerReciever threads,
152      * which handle the sending and recieving of messages on the channel in
153      * seaparate threads.
154      */

155     protected synchronized void startCommunicationThreads() {
156         mcs = new ChangesSender(channel, nodesToSend, send);
157         log.service("Started communication sender " + mcs);
158         mcr = new ChangesReceiver(channel, nodesToSpawn);
159         log.service("Started communication receiver " + mcr);
160     }
161
162
163     protected synchronized void stopCommunicationThreads() {
164         if (mcs != null) {
165             mcs.stop();
166             log.service("Stopped communication sender " + mcs);
167             mcs = null;
168         }
169         if (mcr != null) {
170             mcr.stop();
171             log.service("Stopped communication receiver " + mcr);
172             mcr = null;
173         }
174         if (channel != null) {
175             log.service("Disconnecting jgroup channel " + channel.toString(true));
176             channel.disconnect();
177             channel = null;
178         }
179     }
180 }
181
Popular Tags