KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mmbase > clustering > unicast > ChangesSender


1 /*
2
3 This software is OSI Certified Open Source Software.
4 OSI Certified is a certification mark of the Open Source Initiative.
5
6 The license (Mozilla version 1.0) can be read at the MMBase site.
7 See http://www.MMBase.org/license
8
9 */

10 package org.mmbase.clustering.unicast;
11
12 import org.mmbase.clustering.Statistics;
13 import java.io.DataOutputStream JavaDoc;
14 import java.io.IOException JavaDoc;
15 import java.net.*;
16 import java.util.*;
17
18 import org.mmbase.core.util.DaemonThread;
19 import org.mmbase.module.builders.MMServers;
20 import org.mmbase.module.core.*;
21
22 import org.mmbase.util.Queue;
23 import org.mmbase.util.logging.Logger;
24 import org.mmbase.util.logging.Logging;
25
26
27 /**
28  * ChangesSender is a thread object sending the nodes found in the
29  * sending queue over unicast connections
30  *
31  * @author Nico Klasens
32  * @version $Id: ChangesSender.java,v 1.13 2006/08/09 11:14:49 pierre Exp $
33  */

34 public class ChangesSender implements Runnable JavaDoc {
35
36     private static final Logger log = Logging.getLoggerInstance(ChangesSender.class);
37
38
39     private final Statistics send;
40
41     /** Thread which sends the messages */
42     private Thread JavaDoc kicker = null;
43
44     /** Queue with messages to send to other MMBase instances */
45     private final Queue nodesToSend;
46
47     /** For the port on which the talking between nodes take place.*/
48     private final Map configuration;
49     private final int defaultUnicastPort;
50
51     /** Timeout of the connection.*/
52     private final int unicastTimeout;
53
54     /** last time the mmservers table was checked for active servers */
55     private long lastServerChecked = -1;
56     private List activeServers = new ArrayList();
57
58     /** Interval of servers change their state */
59     private long serverInterval;
60
61     /**
62      * Construct UniCast Sender
63      * @param unicastPort port of the unicast connections
64      * @param unicastTimeout timeout on the connections
65      * @param nodesToSend Queue of messages to send
66      * @param mmbase MMBase instance
67      */

68     ChangesSender(Map configuration, int unicastPort, int unicastTimeout, Queue nodesToSend, Statistics send) {
69         this.nodesToSend = nodesToSend;
70         this.configuration = configuration;
71         this.defaultUnicastPort = unicastPort;
72         this.unicastTimeout = unicastTimeout;
73         this.send = send;
74         this.start();
75     }
76
77     private void start() {
78         if (kicker == null) {
79             kicker = new DaemonThread(this, "UnicastSender");
80             kicker.start();
81             log.debug("UnicastSender started");
82         }
83     }
84     void stop() {
85         if (kicker != null) {
86             kicker.interrupt();
87             kicker.setPriority(Thread.MIN_PRIORITY);
88             kicker = null;
89         } else {
90             log.service("Cannot stop thread, because it is null");
91         }
92     }
93
94
95     // javadoc inherited
96
public void run() {
97         while(kicker != null) {
98             try {
99                 byte[] data = (byte[]) nodesToSend.get();
100                 long startTime = System.currentTimeMillis();
101                 List servers = getActiveServers();
102                 for (int i = 0; i < servers.size(); i++) {
103                     MMObjectNode node = (MMObjectNode) servers.get(i);
104                     if (node != null) {
105                         String JavaDoc hostname = node.getStringValue("host");
106                         String JavaDoc machinename = node.getStringValue("name");
107
108                         int unicastPort = defaultUnicastPort;
109                         String JavaDoc specificPort = (String JavaDoc) configuration.get(machinename + ".unicastport");
110                         if (specificPort != null) {
111                             unicastPort = Integer.parseInt(specificPort);
112                         }
113                         Socket socket = null;
114                         DataOutputStream JavaDoc os = null;
115                         try {
116                             socket = new Socket();
117                             socket.connect(new InetSocketAddress(hostname, unicastPort), unicastTimeout);
118                             os = new DataOutputStream JavaDoc(socket.getOutputStream());
119                             os.write(data, 0, data.length);
120                             os.flush();
121                             if (log.isDebugEnabled()) {
122                                 log.debug("SEND=>" + hostname + ":" + unicastPort);
123                             }
124                         } catch(SocketTimeoutException ste) {
125                             servers.remove(i);
126                             log.warn("Server timeout: " + hostname + ":" + unicastPort + " " + ste + ". Removed " + node + " from active server list.");
127                         } catch (ConnectException ce) {
128                             log.warn("Connect exception: " + hostname + ":" + unicastPort + " " + ce + ".");
129                         } catch (IOException JavaDoc e) {
130                             log.error("can't send message to " + hostname + ":" + unicastPort + " " + e.getMessage() , e);
131                         } finally {
132                             if (os != null) {
133                                 try {
134                                     os.close();
135                                 } catch (IOException JavaDoc e1) {
136                                 }
137                             }
138                             if (socket != null) {
139                                 try {
140                                     socket.close();
141                                 } catch (IOException JavaDoc e1) {
142                                 }
143                             }
144                         }
145                     }
146                 }
147                 send.count++;
148                 send.bytes += data.length;
149                 send.cost += (System.currentTimeMillis() - startTime);
150
151             } catch (InterruptedException JavaDoc e) {
152                 log.debug(Thread.currentThread().getName() +" was interruped.");
153                 break;
154             } catch (Exception JavaDoc e) {
155                 log.error(e.getMessage(), e);
156             }
157         }
158     }
159
160     /**
161      * Get Active server list
162      * @return server list
163      */

164     private List getActiveServers() {
165         List prevActiveServers = activeServers;
166         if (serverInterval < 0) {
167             MMBase mmbase = MMBase.getMMBase();
168             MMServers mmservers = (MMServers) mmbase.getBuilder("mmservers");
169             serverInterval = mmservers.getIntervalTime();
170             activeServers = mmservers.getActiveServers();
171             lastServerChecked = System.currentTimeMillis();
172             log.info("Active servers: " + activeServers );
173         } else {
174             if (lastServerChecked + serverInterval < System.currentTimeMillis()) {
175                 MMBase mmbase = MMBase.getMMBase();
176                 MMServers mmservers = (MMServers) mmbase.getBuilder("mmservers");
177                 activeServers = mmservers.getActiveServers();
178                 lastServerChecked = System.currentTimeMillis();
179                 if (! activeServers.equals(prevActiveServers)) {
180                     log.info("Active servers: " + activeServers + " " + prevActiveServers.size() + " -> " + activeServers.size());
181                 } else {
182                     log.debug("Active servers: " + activeServers);
183                 }
184             }
185         }
186         return activeServers;
187     }
188
189
190 }
191
Popular Tags