KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mmbase > clustering > jgroups > ChangesSender


1 /*
2
3 This software is OSI Certified Open Source Software.
4 OSI Certified is a certification mark of the Open Source Initiative.
5
6 The license (Mozilla version 1.0) can be read at the MMBase site.
7 See http://www.MMBase.org/license
8
9 */

10 package org.mmbase.clustering.jgroups;
11
12 import org.mmbase.clustering.Statistics;
13
14 import org.jgroups.ChannelException;
15 import org.jgroups.JChannel;
16 import org.jgroups.Message;
17 import org.mmbase.core.util.DaemonThread;
18 import org.mmbase.util.Queue;
19 import org.mmbase.util.logging.Logger;
20 import org.mmbase.util.logging.Logging;
21
22 /**
23  * ChangesSender is a thread object sending the nodes found in the
24  * sending queue over the multicast 'channel'.
25  *
26  * This is the JGroups variant.
27  *
28  * @see org.mmbase.clustering.jgroups.Multicast
29  * @see org.mmbase.clustering.jgroups.ChangesReceiver
30  *
31  * @author Daniel Ockeloen
32  * @author Rico Jansen
33  * @author Nico Klasens
34  * @author Costyn van Dongen
35  * @version $Id: ChangesSender.java,v 1.7 2006/08/09 11:52:33 pierre Exp $
36  */

37 public class ChangesSender implements Runnable JavaDoc {
38
39     private static final Logger log = Logging.getLoggerInstance(ChangesSender.class);
40
41     /** Thread which sends the messages */
42     private Thread JavaDoc kicker = null;
43
44     /** Queue with messages to send to other MMBase instances */
45     private final Queue nodesToSend;
46
47     /** Channel to send messages on */
48     private final JChannel channel;
49
50     private final Statistics send;
51
52     /** Construct MultiCast Sender
53      * @param channel Channel on which to send messages
54      * @param nodesToSend Queue of messages to send
55      */

56     ChangesSender(JChannel channel, Queue nodesToSend, Statistics send) {
57         this.send = send;
58         this.channel = channel;
59         this.nodesToSend = nodesToSend;
60         this.start();
61     }
62
63     private void start() {
64         if (kicker == null) {
65             kicker = new DaemonThread(this, "MulticastSender");
66             kicker.start();
67             log.debug("MulticastSender started");
68         }
69     }
70
71     void stop() {
72         if (kicker != null) {
73             kicker.interrupt();
74             kicker.setPriority(Thread.MIN_PRIORITY);
75             kicker = null;
76         } else {
77             log.service("Cannot stop thread, because it is null");
78         }
79     }
80
81
82     /**
83      * Take messages fromt the queeu nodesToSend and send them
84      * on the JChannel. send() will throw an exception in the
85      * cases that the channel is closed or that no channel
86      * has been joined.
87      *
88      */

89     public void run() {
90         while(kicker != null) {
91             try {
92                 if (channel == null || (! channel.isConnected())) {
93                     log.warn("Channel " + channel + " not connected. Sleeping for 5 s.");
94                     try {
95                         Thread.sleep(5000);
96                     } catch (InterruptedException JavaDoc ie) {
97                     }
98                     continue;
99                 }
100
101                 byte[] message = (byte[]) nodesToSend.get();
102                 long startTime = System.currentTimeMillis();
103                 Message msg = new Message(null, null, message);
104                 try {
105                     if (log.isDebugEnabled()) {
106                         log.debug("SEND=>" + message);
107                     }
108                     channel.send(msg);
109                 } catch (ChannelException e) {
110                     log.error("Can't send message" + message + ": " + e.getMessage(), e);
111                 }
112                 send.count++;
113                 send.bytes += message.length;
114                 send.cost += (System.currentTimeMillis() - startTime);
115             } catch (InterruptedException JavaDoc e) {
116                 log.debug(Thread.currentThread().getName() +" was interruped.");
117                 break;
118             } catch (Exception JavaDoc e) {
119                 log.error(e);
120             }
121         }
122     }
123 }
124
Popular Tags