1 10 package org.mmbase.clustering.multicast; 11 12 import java.net.*; 13 14 import org.mmbase.util.Queue; 15 import org.mmbase.module.core.MMBaseContext; 16 import org.mmbase.util.logging.Logger; 17 import org.mmbase.util.logging.Logging; 18 19 20 29 public class ChangesReceiver implements Runnable { 30 31 private static final Logger log = Logging.getLoggerInstance(ChangesReceiver.class); 32 33 34 private Thread kicker = null; 35 36 37 private final Queue nodesToSpawn; 38 39 40 private final InetAddress ia; 41 42 43 private MulticastSocket ms; 44 45 46 private final int mport; 47 48 49 private final int dpsize; 50 51 58 ChangesReceiver(String multicastHost, int mport, int dpsize, Queue nodesToSpawn) throws UnknownHostException { 59 this.mport = mport; 60 this.dpsize = dpsize; 61 this.nodesToSpawn = nodesToSpawn; 62 this.ia = InetAddress.getByName(multicastHost); 63 this.start(); 64 } 65 private void start() { 66 if (kicker == null && ia != null) { 67 try { 68 ms = new MulticastSocket(mport); 69 ms.joinGroup(ia); 70 } catch(Exception e) { 71 log.error(Logging.stackTrace(e)); 72 } 73 if (ms != null) { 74 kicker = MMBaseContext.startThread(this, "MulticastReceiver"); 75 log.debug("MulticastReceiver started"); 76 } 77 } 78 } 79 80 void stop() { 81 MulticastSocket closingMS = ms; ms = null; try { 84 closingMS.leaveGroup(ia); 85 closingMS.close(); 86 } catch (Exception e) { 87 } 89 if (kicker != null) { 90 kicker.setPriority(Thread.MIN_PRIORITY); 91 kicker.interrupt(); 92 kicker = null; 93 } else { 94 log.service("Cannot stop thread, because it is null"); 95 } 96 } 97 98 99 public void run() { 100 byte[] buffer = new byte[dpsize]; 102 DatagramPacket dp = new DatagramPacket(buffer, dpsize); 103 while (ms != null) { 104 try { 105 dp.setLength(dpsize); 107 ms.receive(dp); 108 byte[] message = new byte[dp.getLength()]; 109 110 System.arraycopy(dp.getData(), 0, message, 0, dp.getLength()); 114 if (log.isDebugEnabled()) { 115 log.debug("RECEIVED=> " + dp.getLength() + " bytes from " + dp.getAddress()); 116 } 117 nodesToSpawn.append(message); 118 } catch (java.net.SocketException se) { 119 if (ms != null) log.error(se.getMessage()); 122 } catch (Exception f) { 123 log.error(f.getMessage(), f); 124 } 125 } 126 } 127 128 } 129 | Popular Tags |