1 10 package org.mmbase.clustering.unicast; 11 12 import java.io.*; 13 import java.net.*; 14 15 import org.mmbase.core.util.DaemonThread; 16 import org.mmbase.module.core.MMBase; 17 import org.mmbase.util.Queue; 18 import org.mmbase.util.logging.Logger; 19 import org.mmbase.util.logging.Logging; 20 21 22 29 public class ChangesReceiver implements Runnable { 30 31 private static final Logger log = Logging.getLoggerInstance(ChangesReceiver.class); 32 33 34 35 private Thread kicker = null; 36 37 38 private final Queue nodesToSpawn; 39 40 41 private final ServerSocket serverSocket; 42 43 48 ChangesReceiver(int unicastPort, Queue nodesToSpawn) throws IOException { 49 this.nodesToSpawn = nodesToSpawn; 50 this.serverSocket = new ServerSocket(); 51 SocketAddress address = new InetSocketAddress(MMBase.getMMBase().getHost(), unicastPort); 52 serverSocket.bind(address); 53 log.info("Listening to " + address); 54 this.start(); 55 } 56 57 private void start() { 58 if (kicker == null) { 59 kicker = new DaemonThread(this, "UnicastReceiver"); 60 kicker.start(); 61 log.debug("UnicastReceiver started"); 62 } 63 } 64 65 void stop() { 66 if (kicker != null) { 67 try { 68 kicker.interrupt(); 69 kicker.setPriority(Thread.MIN_PRIORITY); 70 kicker = null; 71 } catch (Throwable t) { 72 } 73 try { 74 serverSocket.close(); 75 } catch (IOException ioe) { 76 log.warn(ioe); 77 } 78 } else { 79 log.service("Cannot stop thread, because it is null"); 80 } 81 } 82 83 public void run() { 84 try { 85 while (kicker!=null) { 86 Socket socket = null; 87 InputStream reader = null; 88 try { 89 socket = serverSocket.accept(); 90 reader = new BufferedInputStream(socket.getInputStream()); 91 ByteArrayOutputStream writer = new ByteArrayOutputStream(); 92 int size = 0; 93 byte[] buffer = new byte[1024]; 95 96 while ((size = reader.read(buffer)) != -1) { 97 if (writer != null) { 98 writer.write(buffer, 0, size); 99 writer.flush(); 100 } 101 } 102 byte[] message = writer.toByteArray(); 104 if (log.isDebugEnabled()) { 105 log.debug("RECEIVED=>" + message); 106 } 107 nodesToSpawn.append(message); 108 } catch (SocketException e) { 109 log.warn(e); 110 continue; 111 } catch (Exception e) { 112 log.error(e); 113 } finally { 114 if (reader != null) { 115 try { 116 reader.close(); 117 } catch (IOException e) { 118 } 119 } 120 if (socket != null) { 121 try { 122 socket.close(); 123 } catch (IOException e) { 124 } 125 } 126 } 127 } 128 } catch (Exception e) { 129 log.error(e); 130 } finally { 131 if (serverSocket != null) { 132 try { 133 serverSocket.close(); 134 } catch (IOException e) { 135 } 136 } 137 } 138 } 139 140 } 141
| Popular Tags
|