KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mmbase > clustering > jgroups > ChangesReceiver


1 /*
2
3 This software is OSI Certified Open Source Software.
4 OSI Certified is a certification mark of the Open Source Initiative.
5
6 The license (Mozilla version 1.0) can be read at the MMBase site.
7 See http://www.MMBase.org/license
8
9 */

10 package org.mmbase.clustering.jgroups;
11
12 import java.text.SimpleDateFormat JavaDoc;
13 import java.util.Date JavaDoc;
14 import java.util.Iterator JavaDoc;
15 import java.util.Set JavaDoc;
16 import java.util.Vector JavaDoc;
17
18 import org.jgroups.ChannelClosedException;
19 import org.jgroups.ChannelNotConnectedException;
20 import org.jgroups.ExitEvent;
21 import org.jgroups.JChannel;
22 import org.jgroups.Message;
23 import org.jgroups.SuspectEvent;
24 import org.jgroups.TimeoutException;
25 import org.jgroups.View;
26 import org.mmbase.core.util.DaemonThread;
27 import org.mmbase.util.Queue;
28 import org.mmbase.util.logging.Logger;
29 import org.mmbase.util.logging.Logging;
30
31 /**
32  * ChangesReceiver is a thread object that builds a MultiCast Thread
33  * to receive changes from other MMBase Servers.
34  *
35  * This is the JGroups variant.
36  *
37  * @see org.mmbase.clustering.jgroups.Multicast
38  * @see org.mmbase.clustering.jgroups.ChangesSender
39  *
40  * @author Daniel Ockeloen
41  * @author Rico Jansen
42  * @author Nico Klasens
43  * @author Costyn van Dongen
44  * @author Ronald Wildenberg
45  * @version $Id: ChangesReceiver.java,v 1.6 2006/08/09 11:52:33 pierre Exp $
46  */

47 public class ChangesReceiver implements Runnable JavaDoc {
48
49     private static final Logger log = Logging.getLoggerInstance(ChangesReceiver.class);
50
51     /** Thread which sends the messages */
52     private Thread JavaDoc kicker = null;
53
54     /** Queue with messages received from other MMBase instances */
55     private final Queue nodesToSpawn;
56
57     /** JChannel: the multicast communication channel */
58     private final JChannel channel;
59
60     /**
61      * Construct the MultiCast Receiver
62      * @param channel channel on which to listen for and recieve messages.
63      * @param nodesToSpawn Queue of received messages
64      */

65     ChangesReceiver(JChannel channel, Queue nodesToSpawn) {
66         this.channel = channel;
67         this.nodesToSpawn = nodesToSpawn;
68         this.start();
69     }
70
71     private void start() {
72         if (kicker == null) {
73             kicker = new DaemonThread(this, "MulticastReceiver");
74             kicker.start();
75             log.debug("MulticastReceiver started");
76         }
77     }
78
79     void stop() {
80         if (kicker != null) {
81             kicker.setPriority(Thread.MIN_PRIORITY);
82             kicker.interrupt();
83             kicker = null;
84         } else {
85             log.service("Cannot stop thread, because it is null");
86         }
87     }
88
89     public void run() {
90       while (kicker != null) {
91           if (channel == null || (! channel.isConnected())) {
92               log.warn("Channel " + channel + " not connected. Sleeping for 5 s.");
93               try {
94                   Thread.sleep(5000);
95               } catch (InterruptedException JavaDoc ie) {
96               }
97               continue;
98           }
99          /* Attempt to receive an object. */
100          Object JavaDoc receivedObject = null;
101          try {
102             receivedObject = channel.receive(0); /* wait forever */
103          } catch (ChannelNotConnectedException e) {
104              // This channel is not connected to a group.
105
// This should never happen, since we never call disconnect on the channel. */
106
log.error("Channel disconnected. This should never happen:" + e.getMessage(), e);
107              continue;
108          } catch (ChannelClosedException e) {
109              log.warn("Channel closed: " + e.getMessage(), e);
110              continue;
111          } catch (TimeoutException e) {
112              log.error("A timeout occurred while receiving a message. This should never happen, since we wait indefinitely: " + e.getMessage(), e);
113          }
114
115          try {
116              /* Handle the received object. */
117              if (receivedObject != null) {
118                  if (receivedObject instanceof Message) {
119                      Message message = (Message) receivedObject;
120                      if (log.isDebugEnabled()) {
121                          log.debug("Received Message from: " + message.getSrc());
122                          log.debug("Message content:");
123                          Set JavaDoc headerKeySet = message.getHeaders().keySet();
124                          final Iterator JavaDoc headers = headerKeySet.iterator();
125                          while(headers.hasNext()) {
126                              log.debug(new String JavaDoc(" " + message.getHeaders().get(headers.next())));
127                          }
128                          log.debug("message: " + message.getLength() + " bytes");
129                          if (log.isTraceEnabled()) {
130                              log.trace(" " + new String JavaDoc(message.getBuffer()));
131                          }
132                      }
133                      try {
134                          nodesToSpawn.append(message.getBuffer());
135                      } catch (Exception JavaDoc ex) {
136                          log.error(ex);
137                      }
138                  } else if (receivedObject instanceof View) {
139                      View view = (View) receivedObject;
140                      log.info("Received View from: " + view.getCreator());
141                      log.info("Current members of group:");
142
143                      Vector JavaDoc members = view.getMembers() ;
144                      for ( int i = 0 ; i < members.size() ; i++ ) {
145                          log.info(" " + members.elementAt(i) ) ;
146                      }
147                  } else if (receivedObject instanceof SuspectEvent) {
148                      log.warn("Received SuspectEvent for member: " + ((SuspectEvent) receivedObject).getMember());
149                  } else if (receivedObject instanceof ExitEvent) {
150                      /* If an ExitEvent occurs, this means the channel is no longer open.
151                       * Continuing to call JChannel.receive(0) inside this
152                       * loop will result in throwing an enormous amount of
153                       * ChannelClosedException's. Therefore, we wait until the channel is open again. */

154                      log.warn("Received an ExitEvent. Going to wait until we automatically reconnect to the channel.");
155                      log.info("Starting to wait at: " +
156                               new SimpleDateFormat JavaDoc("yyyyMMdd HH:mm:ss.SSS").format(new Date JavaDoc(System.currentTimeMillis())));
157                      while (!(channel.isOpen() && channel.isConnected())) {
158                          try {
159                              Thread.sleep(10);
160                          } catch (InterruptedException JavaDoc e) {
161                              if (log.isServiceEnabled()) {
162                                  log.service("Thread " + Thread.currentThread() + " ");
163                              }
164                          }
165                      }
166                      log.info("Finished waiting at: " +
167                               new SimpleDateFormat JavaDoc("yyyyMMdd HH:mm:ss.SSS").format(new Date JavaDoc(System.currentTimeMillis())));
168                      log.info("Channel open again. Current View:");
169                      View view = channel.getView() ;
170                      Vector JavaDoc members = view.getMembers() ;
171                      for ( int i = 0 ; i < members.size() ; i++ ) {
172                          log.info(" " + members.elementAt(i) ) ;
173                      }
174                  } else {
175                      log.warn("Unkown object recieved: " + receivedObject.toString());
176                  }
177              }
178          } catch (Exception JavaDoc e) {
179              log.error(e);
180          }
181       }
182    }
183 }
184
Popular Tags