1 10 package org.mmbase.clustering.jgroups; 11 12 import java.text.SimpleDateFormat ; 13 import java.util.Date ; 14 import java.util.Iterator ; 15 import java.util.Set ; 16 import java.util.Vector ; 17 18 import org.jgroups.ChannelClosedException; 19 import org.jgroups.ChannelNotConnectedException; 20 import org.jgroups.ExitEvent; 21 import org.jgroups.JChannel; 22 import org.jgroups.Message; 23 import org.jgroups.SuspectEvent; 24 import org.jgroups.TimeoutException; 25 import org.jgroups.View; 26 import org.mmbase.core.util.DaemonThread; 27 import org.mmbase.util.Queue; 28 import org.mmbase.util.logging.Logger; 29 import org.mmbase.util.logging.Logging; 30 31 47 public class ChangesReceiver implements Runnable { 48 49 private static final Logger log = Logging.getLoggerInstance(ChangesReceiver.class); 50 51 52 private Thread kicker = null; 53 54 55 private final Queue nodesToSpawn; 56 57 58 private final JChannel channel; 59 60 65 ChangesReceiver(JChannel channel, Queue nodesToSpawn) { 66 this.channel = channel; 67 this.nodesToSpawn = nodesToSpawn; 68 this.start(); 69 } 70 71 private void start() { 72 if (kicker == null) { 73 kicker = new DaemonThread(this, "MulticastReceiver"); 74 kicker.start(); 75 log.debug("MulticastReceiver started"); 76 } 77 } 78 79 void stop() { 80 if (kicker != null) { 81 kicker.setPriority(Thread.MIN_PRIORITY); 82 kicker.interrupt(); 83 kicker = null; 84 } else { 85 log.service("Cannot stop thread, because it is null"); 86 } 87 } 88 89 public void run() { 90 while (kicker != null) { 91 if (channel == null || (! channel.isConnected())) { 92 log.warn("Channel " + channel + " not connected. Sleeping for 5 s."); 93 try { 94 Thread.sleep(5000); 95 } catch (InterruptedException ie) { 96 } 97 continue; 98 } 99 100 Object receivedObject = null; 101 try { 102 receivedObject = channel.receive(0); 103 } catch (ChannelNotConnectedException e) { 104 log.error("Channel disconnected. This should never happen:" + e.getMessage(), e); 107 continue; 108 } catch (ChannelClosedException e) { 109 log.warn("Channel closed: " + e.getMessage(), e); 110 continue; 111 } catch (TimeoutException e) { 112 log.error("A timeout occurred while receiving a message. This should never happen, since we wait indefinitely: " + e.getMessage(), e); 113 } 114 115 try { 116 117 if (receivedObject != null) { 118 if (receivedObject instanceof Message) { 119 Message message = (Message) receivedObject; 120 if (log.isDebugEnabled()) { 121 log.debug("Received Message from: " + message.getSrc()); 122 log.debug("Message content:"); 123 Set headerKeySet = message.getHeaders().keySet(); 124 final Iterator headers = headerKeySet.iterator(); 125 while(headers.hasNext()) { 126 log.debug(new String (" " + message.getHeaders().get(headers.next()))); 127 } 128 log.debug("message: " + message.getLength() + " bytes"); 129 if (log.isTraceEnabled()) { 130 log.trace(" " + new String (message.getBuffer())); 131 } 132 } 133 try { 134 nodesToSpawn.append(message.getBuffer()); 135 } catch (Exception ex) { 136 log.error(ex); 137 } 138 } else if (receivedObject instanceof View) { 139 View view = (View) receivedObject; 140 log.info("Received View from: " + view.getCreator()); 141 log.info("Current members of group:"); 142 143 Vector members = view.getMembers() ; 144 for ( int i = 0 ; i < members.size() ; i++ ) { 145 log.info(" " + members.elementAt(i) ) ; 146 } 147 } else if (receivedObject instanceof SuspectEvent) { 148 log.warn("Received SuspectEvent for member: " + ((SuspectEvent) receivedObject).getMember()); 149 } else if (receivedObject instanceof ExitEvent) { 150 154 log.warn("Received an ExitEvent. Going to wait until we automatically reconnect to the channel."); 155 log.info("Starting to wait at: " + 156 new SimpleDateFormat ("yyyyMMdd HH:mm:ss.SSS").format(new Date (System.currentTimeMillis()))); 157 while (!(channel.isOpen() && channel.isConnected())) { 158 try { 159 Thread.sleep(10); 160 } catch (InterruptedException e) { 161 if (log.isServiceEnabled()) { 162 log.service("Thread " + Thread.currentThread() + " "); 163 } 164 } 165 } 166 log.info("Finished waiting at: " + 167 new SimpleDateFormat ("yyyyMMdd HH:mm:ss.SSS").format(new Date (System.currentTimeMillis()))); 168 log.info("Channel open again. Current View:"); 169 View view = channel.getView() ; 170 Vector members = view.getMembers() ; 171 for ( int i = 0 ; i < members.size() ; i++ ) { 172 log.info(" " + members.elementAt(i) ) ; 173 } 174 } else { 175 log.warn("Unkown object recieved: " + receivedObject.toString()); 176 } 177 } 178 } catch (Exception e) { 179 log.error(e); 180 } 181 } 182 } 183 } 184
| Popular Tags
|