1 10 package org.mmbase.clustering.jgroups; 11 12 import org.mmbase.clustering.Statistics; 13 14 import org.jgroups.ChannelException; 15 import org.jgroups.JChannel; 16 import org.jgroups.Message; 17 import org.mmbase.core.util.DaemonThread; 18 import org.mmbase.util.Queue; 19 import org.mmbase.util.logging.Logger; 20 import org.mmbase.util.logging.Logging; 21 22 37 public class ChangesSender implements Runnable { 38 39 private static final Logger log = Logging.getLoggerInstance(ChangesSender.class); 40 41 42 private Thread kicker = null; 43 44 45 private final Queue nodesToSend; 46 47 48 private final JChannel channel; 49 50 private final Statistics send; 51 52 56 ChangesSender(JChannel channel, Queue nodesToSend, Statistics send) { 57 this.send = send; 58 this.channel = channel; 59 this.nodesToSend = nodesToSend; 60 this.start(); 61 } 62 63 private void start() { 64 if (kicker == null) { 65 kicker = new DaemonThread(this, "MulticastSender"); 66 kicker.start(); 67 log.debug("MulticastSender started"); 68 } 69 } 70 71 void stop() { 72 if (kicker != null) { 73 kicker.interrupt(); 74 kicker.setPriority(Thread.MIN_PRIORITY); 75 kicker = null; 76 } else { 77 log.service("Cannot stop thread, because it is null"); 78 } 79 } 80 81 82 89 public void run() { 90 while(kicker != null) { 91 try { 92 if (channel == null || (! channel.isConnected())) { 93 log.warn("Channel " + channel + " not connected. Sleeping for 5 s."); 94 try { 95 Thread.sleep(5000); 96 } catch (InterruptedException ie) { 97 } 98 continue; 99 } 100 101 byte[] message = (byte[]) nodesToSend.get(); 102 long startTime = System.currentTimeMillis(); 103 Message msg = new Message(null, null, message); 104 try { 105 if (log.isDebugEnabled()) { 106 log.debug("SEND=>" + message); 107 } 108 channel.send(msg); 109 } catch (ChannelException e) { 110 log.error("Can't send message" + message + ": " + e.getMessage(), e); 111 } 112 send.count++; 113 send.bytes += message.length; 114 send.cost += (System.currentTimeMillis() - startTime); 115 } catch (InterruptedException e) { 116 log.debug(Thread.currentThread().getName() +" was interruped."); 117 break; 118 } catch (Exception e) { 119 log.error(e); 120 } 121 } 122 } 123 } 124
| Popular Tags
|