KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mmbase > clustering > multicast > ChangesSender


1 /*
2
3 This software is OSI Certified Open Source Software.
4 OSI Certified is a certification mark of the Open Source Initiative.
5
6 The license (Mozilla version 1.0) can be read at the MMBase site.
7 See http://www.MMBase.org/license
8
9 */

10 package org.mmbase.clustering.multicast;
11
12 import org.mmbase.clustering.Statistics;
13
14 import java.net.*;
15 import java.io.*;
16
17 import org.mmbase.module.core.MMBaseContext;
18 import org.mmbase.util.*;
19 import org.mmbase.util.logging.Logger;
20 import org.mmbase.util.logging.Logging;
21
22 /**
23  * ChangesSender is a thread object sending the nodes found in the
24  * sending queue over the multicast 'channel'
25  *
26  * @author Daniel Ockeloen
27  * @author Rico Jansen
28  * @author Nico Klasens
29  * @version $Id: ChangesSender.java,v 1.11 2006/07/06 11:27:27 michiel Exp $
30  */

31 public class ChangesSender implements Runnable JavaDoc {
32
33     private static final Logger log = Logging.getLoggerInstance(ChangesSender.class);
34
35     private final Statistics send;
36
37     /** Thread which sends the messages */
38     private Thread JavaDoc kicker = null;
39
40     /** Queue with messages to send to other MMBase instances */
41     private final Queue nodesToSend;
42
43     /** address to send the messages to */
44     private final InetAddress ia;
45
46     /** Socket to send the multicast packets */
47     private MulticastSocket ms;
48
49     /** Port for sending datapackets send by Multicast */
50     private final int mport;
51     /** Time To Live for datapackets send by Multicast */
52     private final int mTTL;
53
54     /**
55      * Construct MultiCast Sender
56      * @param multicastHost 'channel' of the multicast
57      * @param mport port of the multicast
58      * @param mTTL time-to-live of the multicast packet (0-255)
59      * @param nodesToSend Queue of messages to send
60      * @param send Statistics object in which to administer duration costs
61      */

62     ChangesSender(String JavaDoc multicastHost, int mport, int mTTL, Queue nodesToSend, Statistics send) throws UnknownHostException {
63         this.mport = mport;
64         this.mTTL = mTTL;
65         this.nodesToSend = nodesToSend;
66         this.ia = InetAddress.getByName(multicastHost);
67         this.send = send;
68         this.start();
69     }
70
71     private void start() {
72         if (kicker == null && ia != null) {
73             try {
74                 ms = new MulticastSocket();
75                 ms.joinGroup(ia);
76                 ms.setTimeToLive(mTTL);
77             } catch(Exception JavaDoc e) {
78                 log.error(Logging.stackTrace(e));
79             }
80
81             kicker = MMBaseContext.startThread(this, "MulticastSender");
82             log.debug("MulticastSender started");
83         }
84     }
85
86     void stop() {
87         try {
88             ms.leaveGroup(ia);
89             ms.close();
90         } catch (Exception JavaDoc e) {
91             // nothing
92
}
93         ms = null;
94         if (kicker != null) {
95             kicker.interrupt();
96             kicker.setPriority(Thread.MIN_PRIORITY);
97             kicker = null;
98         } else {
99             log.service("Cannot stop thread, because it is null");
100         }
101     }
102     public void run() {
103         log.debug("Started sending");
104         while(ms != null) {
105             try {
106                 byte[] data = (byte[]) nodesToSend.get();
107                 long startTime = System.currentTimeMillis();
108                 DatagramPacket dp = new DatagramPacket(data, data.length, ia, mport);
109                 try {
110                     if (log.isDebugEnabled()) {
111                         log.debug("SEND=> " + dp.getLength() + " bytes to " + dp.getAddress());
112                     }
113                     ms.send(dp);
114                 } catch (IOException e) {
115                     log.error("can't send message" + dp + " to " + ia + ":" + mport);
116                     log.error(e.getMessage(), e);
117                 }
118                 send.count++;
119                 send.bytes += data.length;
120                 send.cost += (System.currentTimeMillis() - startTime);
121             } catch (InterruptedException JavaDoc e) {
122                 log.debug(Thread.currentThread().getName() +" was interruped.");
123                 break;
124             } catch (Exception JavaDoc e) {
125                 log.error(e.getMessage(), e);
126             }
127         }
128         log.debug("Finished sending");
129     }
130 }
131
Popular Tags