KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mmbase > clustering > multicast > ChangesReceiver


1 /*
2
3 This software is OSI Certified Open Source Software.
4 OSI Certified is a certification mark of the Open Source Initiative.
5
6 The license (Mozilla version 1.0) can be read at the MMBase site.
7 See http://www.MMBase.org/license
8
9 */

10 package org.mmbase.clustering.multicast;
11
12 import java.net.*;
13
14 import org.mmbase.util.Queue;
15 import org.mmbase.module.core.MMBaseContext;
16 import org.mmbase.util.logging.Logger;
17 import org.mmbase.util.logging.Logging;
18
19
20 /**
21  * ChangesReceiver is a thread object that builds a MultiCast Thread
22  * to receive changes from other MMBase Servers.
23  *
24  * @author Daniel Ockeloen
25  * @author Rico Jansen
26  * @author Nico Klasens
27  * @version $Id: ChangesReceiver.java,v 1.13 2006/07/06 11:27:27 michiel Exp $
28  */

29 public class ChangesReceiver implements Runnable JavaDoc {
30
31     private static final Logger log = Logging.getLoggerInstance(ChangesReceiver.class);
32
33     /** Thread which sends the messages */
34     private Thread JavaDoc kicker = null;
35
36     /** Queue with messages received from other MMBase instances */
37     private final Queue nodesToSpawn;
38
39     /** address to send the messages to */
40     private final InetAddress ia;
41
42     /** Socket to send the multicast packets */
43     private MulticastSocket ms;
44
45     /** Port for sending datapackets send by Multicast */
46     private final int mport;
47
48     /** Datapacket receive size */
49     private final int dpsize;
50
51     /**
52      * Construct the MultiCast Receiver
53      * @param multicastHost 'channel' of the multicast
54      * @param mport port of the multicast
55      * @param dpsize datapacket receive size
56      * @param nodesToSpawn Queue of received messages
57      */

58     ChangesReceiver(String JavaDoc multicastHost, int mport, int dpsize, Queue nodesToSpawn) throws UnknownHostException {
59         this.mport = mport;
60         this.dpsize = dpsize;
61         this.nodesToSpawn = nodesToSpawn;
62         this.ia = InetAddress.getByName(multicastHost);
63         this.start();
64     }
65     private void start() {
66         if (kicker == null && ia != null) {
67             try {
68                 ms = new MulticastSocket(mport);
69                 ms.joinGroup(ia);
70             } catch(Exception JavaDoc e) {
71                 log.error(Logging.stackTrace(e));
72             }
73             if (ms != null) {
74                 kicker = MMBaseContext.startThread(this, "MulticastReceiver");
75                 log.debug("MulticastReceiver started");
76             }
77         }
78     }
79
80     void stop() {
81         MulticastSocket closingMS = ms; // for closing the socket while the thread stops
82
ms = null; // the criteria for stopping thread
83
try {
84             closingMS.leaveGroup(ia);
85             closingMS.close();
86         } catch (Exception JavaDoc e) {
87             // nothing
88
}
89         if (kicker != null) {
90             kicker.setPriority(Thread.MIN_PRIORITY);
91             kicker.interrupt();
92             kicker = null;
93         } else {
94             log.service("Cannot stop thread, because it is null");
95         }
96     }
97
98
99     public void run() {
100         // create a datapackage to receive all messages
101
byte[] buffer = new byte[dpsize];
102         DatagramPacket dp = new DatagramPacket(buffer, dpsize);
103         while (ms != null) {
104             try {
105                 // reset datapackage buffer size for re-use
106
dp.setLength(dpsize);
107                 ms.receive(dp);
108                 byte[] message = new byte[dp.getLength()];
109
110                 // the dp.getData array always has dpsize length.
111
// That's not what we want. Especially when falling back to legacy, this is translated to a String.
112
// which otherwise gets dpsize length (64k!)
113
System.arraycopy(dp.getData(), 0, message, 0, dp.getLength());
114                 if (log.isDebugEnabled()) {
115                     log.debug("RECEIVED=> " + dp.getLength() + " bytes from " + dp.getAddress());
116                 }
117                 nodesToSpawn.append(message);
118             } catch (java.net.SocketException JavaDoc se) {
119                 // generally happens on shutdown (ms==null)
120
// if not log it as an error
121
if (ms != null) log.error(se.getMessage());
122             } catch (Exception JavaDoc f) {
123                 log.error(f.getMessage(), f);
124             }
125         }
126     }
127
128 }
129
Popular Tags