KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mmbase > clustering > unicast > ChangesReceiver


1 /*
2
3 This software is OSI Certified Open Source Software.
4 OSI Certified is a certification mark of the Open Source Initiative.
5
6 The license (Mozilla version 1.0) can be read at the MMBase site.
7 See http://www.MMBase.org/license
8
9 */

10 package org.mmbase.clustering.unicast;
11
12 import java.io.*;
13 import java.net.*;
14
15 import org.mmbase.core.util.DaemonThread;
16 import org.mmbase.module.core.MMBase;
17 import org.mmbase.util.Queue;
18 import org.mmbase.util.logging.Logger;
19 import org.mmbase.util.logging.Logging;
20
21
22 /**
23  * ChangesReceiver is a thread object that builds a Unicast Thread
24  * to receive changes from other MMBase Servers.
25  *
26  * @author Nico Klasens
27  * @version $Id: ChangesReceiver.java,v 1.7 2006/08/09 11:14:49 pierre Exp $
28  */

29 public class ChangesReceiver implements Runnable JavaDoc {
30
31     private static final Logger log = Logging.getLoggerInstance(ChangesReceiver.class);
32
33
34     /** Thread which sends the messages */
35     private Thread JavaDoc kicker = null;
36
37     /** Queue with messages received from other MMBase instances */
38     private final Queue nodesToSpawn;
39
40
41     private final ServerSocket serverSocket;
42
43     /**
44      * Construct UniCast Receiver
45      * @param unicastPort port of the unicast connections
46      * @param nodesToSpawn Queue of received messages
47      */

48     ChangesReceiver(int unicastPort, Queue nodesToSpawn) throws IOException {
49         this.nodesToSpawn = nodesToSpawn;
50         this.serverSocket = new ServerSocket();
51         SocketAddress address = new InetSocketAddress(MMBase.getMMBase().getHost(), unicastPort);
52         serverSocket.bind(address);
53         log.info("Listening to " + address);
54         this.start();
55     }
56
57     private void start() {
58         if (kicker == null) {
59             kicker = new DaemonThread(this, "UnicastReceiver");
60             kicker.start();
61             log.debug("UnicastReceiver started");
62         }
63     }
64
65     void stop() {
66         if (kicker != null) {
67             try {
68                 kicker.interrupt();
69                 kicker.setPriority(Thread.MIN_PRIORITY);
70                 kicker = null;
71             } catch (Throwable JavaDoc t) {
72             }
73             try {
74                 serverSocket.close();
75             } catch (IOException ioe) {
76                 log.warn(ioe);
77             }
78         } else {
79             log.service("Cannot stop thread, because it is null");
80         }
81     }
82
83     public void run() {
84         try {
85             while (kicker!=null) {
86                 Socket socket = null;
87                 InputStream reader = null;
88                 try {
89                     socket = serverSocket.accept();
90                     reader = new BufferedInputStream(socket.getInputStream());
91                     ByteArrayOutputStream writer = new ByteArrayOutputStream();
92                     int size = 0;
93                     //this buffer has nothing to do with the OS buffer
94
byte[] buffer = new byte[1024];
95
96                     while ((size = reader.read(buffer)) != -1) {
97                         if (writer != null) {
98                           writer.write(buffer, 0, size);
99                           writer.flush();
100                        }
101                     }
102                     // maybe we should use encoding here?
103
byte[] message = writer.toByteArray();
104                     if (log.isDebugEnabled()) {
105                         log.debug("RECEIVED=>" + message);
106                     }
107                     nodesToSpawn.append(message);
108                 } catch (SocketException e) {
109                     log.warn(e);
110                     continue;
111                 } catch (Exception JavaDoc e) {
112                     log.error(e);
113                 } finally {
114                     if (reader != null) {
115                         try {
116                             reader.close();
117                         } catch (IOException e) {
118                         }
119                     }
120                     if (socket != null) {
121                         try {
122                             socket.close();
123                         } catch (IOException e) {
124                         }
125                     }
126                 }
127             }
128         } catch (Exception JavaDoc e) {
129             log.error(e);
130         } finally {
131             if (serverSocket != null) {
132                 try {
133                     serverSocket.close();
134                 } catch (IOException e) {
135                 }
136             }
137         }
138     }
139
140 }
141
Popular Tags