1 10 package org.mmbase.clustering.multicast; 11 12 import org.mmbase.clustering.Statistics; 13 14 import java.net.*; 15 import java.io.*; 16 17 import org.mmbase.module.core.MMBaseContext; 18 import org.mmbase.util.*; 19 import org.mmbase.util.logging.Logger; 20 import org.mmbase.util.logging.Logging; 21 22 31 public class ChangesSender implements Runnable { 32 33 private static final Logger log = Logging.getLoggerInstance(ChangesSender.class); 34 35 private final Statistics send; 36 37 38 private Thread kicker = null; 39 40 41 private final Queue nodesToSend; 42 43 44 private final InetAddress ia; 45 46 47 private MulticastSocket ms; 48 49 50 private final int mport; 51 52 private final int mTTL; 53 54 62 ChangesSender(String multicastHost, int mport, int mTTL, Queue nodesToSend, Statistics send) throws UnknownHostException { 63 this.mport = mport; 64 this.mTTL = mTTL; 65 this.nodesToSend = nodesToSend; 66 this.ia = InetAddress.getByName(multicastHost); 67 this.send = send; 68 this.start(); 69 } 70 71 private void start() { 72 if (kicker == null && ia != null) { 73 try { 74 ms = new MulticastSocket(); 75 ms.joinGroup(ia); 76 ms.setTimeToLive(mTTL); 77 } catch(Exception e) { 78 log.error(Logging.stackTrace(e)); 79 } 80 81 kicker = MMBaseContext.startThread(this, "MulticastSender"); 82 log.debug("MulticastSender started"); 83 } 84 } 85 86 void stop() { 87 try { 88 ms.leaveGroup(ia); 89 ms.close(); 90 } catch (Exception e) { 91 } 93 ms = null; 94 if (kicker != null) { 95 kicker.interrupt(); 96 kicker.setPriority(Thread.MIN_PRIORITY); 97 kicker = null; 98 } else { 99 log.service("Cannot stop thread, because it is null"); 100 } 101 } 102 public void run() { 103 log.debug("Started sending"); 104 while(ms != null) { 105 try { 106 byte[] data = (byte[]) nodesToSend.get(); 107 long startTime = System.currentTimeMillis(); 108 DatagramPacket dp = new DatagramPacket(data, data.length, ia, mport); 109 try { 110 if (log.isDebugEnabled()) { 111 log.debug("SEND=> " + dp.getLength() + " bytes to " + dp.getAddress()); 112 } 113 ms.send(dp); 114 } catch (IOException e) { 115 log.error("can't send message" + dp + " to " + ia + ":" + mport); 116 log.error(e.getMessage(), e); 117 } 118 send.count++; 119 send.bytes += data.length; 120 send.cost += (System.currentTimeMillis() - startTime); 121 } catch (InterruptedException e) { 122 log.debug(Thread.currentThread().getName() +" was interruped."); 123 break; 124 } catch (Exception e) { 125 log.error(e.getMessage(), e); 126 } 127 } 128 log.debug("Finished sending"); 129 } 130 } 131 | Popular Tags |