1 46 package org.mr.core.stats; 47 48 import javax.jms.Connection ; 49 import javax.jms.ConnectionFactory ; 50 import javax.jms.JMSException ; 51 import javax.jms.MessageProducer ; 52 import javax.jms.Session ; 53 import javax.jms.TextMessage ; 54 import javax.jms.Topic ; 55 56 import org.apache.commons.logging.Log; 57 import org.apache.commons.logging.LogFactory; 58 import org.mr.MantaAgent; 59 import org.mr.MantaAgentConstants; 60 import org.mr.api.jms.MantaConnectionFactory; 61 import org.mr.core.configuration.ConfigManager; 62 import org.mr.core.configuration.ConfigurationChangeEvent; 63 import org.mr.core.configuration.ConfigurationChangeListener; 64 import org.mr.core.stats.cmc.CMCAgentStats; 65 import org.mr.core.util.SystemTime; 66 import org.mr.core.util.TimeoutTimer; 67 import org.mr.core.util.Timeoutable; 68 69 78 public class StatManager { 79 private String serviceName; 80 private long updateInterval; private boolean sendUpdates; 82 private boolean collectStats; 83 private TimeoutTimer timer; 84 private ConfigManager config; 85 private Log log; 86 private Timeoutable topicPublisher; 87 88 private AggregateCounter totalBytes; 90 private AggregateCounter totalMessages; 91 private TemporalCounter temporalBytes; 92 private TemporalCounter temporalMessages; 93 private MaxCounter maxMem; 94 private CurrentCounter currentMem; 95 private CurrentCounter freeMem; 96 97 private ConnectionFactory factory; 99 private Connection con; 100 private Session ses; 101 private MessageProducer prod; 102 private Topic topic; 103 104 105 public StatManager() { 107 this.totalBytes = new AggregateCounter(); 109 this.totalMessages = new AggregateCounter(); 110 this.temporalBytes = new TemporalCounter(5000, 60); this.temporalMessages = new TemporalCounter(5000, 60); 112 this.maxMem = new MaxCounter(); 113 this.currentMem = new CurrentCounter(); 114 this.freeMem = new CurrentCounter(); 115 116 this.config = MantaAgent.getInstance().getSingletonRepository().getConfigManager(); 118 this.serviceName = config.getStringProperty("statistics.topic", "StatsTopic"); 119 if (this.serviceName.equals("")) { 120 this.serviceName = null; 121 } 122 this.updateInterval = config.getLongProperty("statistics.update_interval", 3600L) * 1000; 123 this.sendUpdates = config.getBooleanProperty("statistics.send_updates", false); 124 this.collectStats = config.getBooleanProperty("statistics.collect_stats", false); 125 config.registerAsConfigChangeListener(new ConfigurationChangeListener() { 126 public void refresh(ConfigurationChangeEvent e) { 127 if (e.getKey().equals("statistics.send_updates")) { 128 StatManager.this.sendUpdates = StatManager.this.config. 129 getBooleanProperty("statistics.send_updates", false); 130 } 131 if (e.getKey().equals("statistics.collect_stats")) { 132 StatManager.this.collectStats = StatManager.this.config. 133 getBooleanProperty("statistics.collect_stats", false); 134 } 135 if (StatManager.this.collectStats && StatManager.this.sendUpdates) { 136 startUpdates(); 137 } else { 138 stopUpdates(); 139 } 140 } 141 }); 142 143 this.timer = new TimeoutTimer("Stat_Timer"); 145 this.topicPublisher = new Timeoutable() { 146 public void timeout(Object o) { 147 publishUpdate(); 148 timer.addTimeout(this, this, StatManager.this.updateInterval); 149 } 150 }; 151 Timeoutable memSampler = new Timeoutable() { 152 public void timeout(Object event) { 153 long mem = Runtime.getRuntime().totalMemory(); 154 maxMem.addSample(mem); 155 currentMem.addSample(mem); 156 freeMem.addSample(Runtime.getRuntime().freeMemory()); 157 timer.addTimeout(this, this, 60000); 158 } 159 }; 160 timer.addTimeout(memSampler, memSampler, 1000); 161 162 this.log = LogFactory.getLog("StatManager"); 163 164 if (this.collectStats && this.sendUpdates) { 165 startUpdates(); 166 } 167 } 169 170 private void initJMS() throws JMSException { 172 factory = new MantaConnectionFactory(); 173 con = factory.createConnection(); 174 ses = con.createSession(false,Session.AUTO_ACKNOWLEDGE); 175 topic = ses.createTopic(serviceName); 176 prod = ses.createProducer(topic); 177 } 178 179 180 private void closeJMS() throws JMSException { 182 con.close(); 183 ses = null; 184 topic = null; 185 prod = null; 186 con = null; 187 factory = null; 188 } 189 190 195 public void addMessageSample(long byteCount) { 196 if (this.collectStats) { 197 totalBytes.addSample(byteCount); 198 totalMessages.addSample(1); 199 temporalBytes.addSample(byteCount); 200 temporalMessages.addSample(1); 201 } 202 } 204 205 209 210 public CMCAgentStats getCurrentCMCStats() { 211 CMCAgentStats stats = new CMCAgentStats(); 212 stats.setTotalMem(currentMem.getValue()); 213 stats.setFreeMem(freeMem.getValue()); 214 stats.setTotalMessages(totalMessages.getValue()); 215 stats.setTotalBytes(totalBytes.getValue()); 216 stats.setFiveMinMessages(temporalMessages.getValue()); 217 stats.setFiveMinBytes(temporalBytes.getValue()); 218 return stats; 219 } 220 221 222 private void startUpdates() { 224 this.timer.addTimeout(this.topicPublisher, this.topicPublisher, 225 this.updateInterval); 226 } 227 228 229 private void stopUpdates() { 231 this.timer.removeTimeout(this.topicPublisher); 232 } 233 234 235 private String getCurrentStats() { 237 StringBuffer sb = new StringBuffer (); 238 sb.append(Long.toString(this.totalMessages.getValue())).append("|"); 239 sb.append(Long.toString(this.totalBytes.getValue())).append("|"); 240 sb.append(Long.toString(this.temporalMessages.getValue())).append("|"); 241 sb.append(Long.toString(this.temporalBytes.getValue())).append("|"); 242 sb.append(Long.toString(this.maxMem.getValue())).append("|"); 243 sb.append(Long.toString(this.currentMem.getValue())).append("|"); 244 return sb.toString(); 245 } 246 247 private void publishUpdate() { 249 if (this.prod == null) { 250 try { 251 initJMS(); 252 } catch (JMSException e) { 253 if (log.isErrorEnabled()) { 254 log.error("Failed initializing Statistics topic service."); 255 } 256 try { 257 closeJMS(); 258 } catch (JMSException e1) {} 259 return; 260 } 261 } 262 if (this.sendUpdates) { 263 try { 264 TextMessage textMessage = ses.createTextMessage(getCurrentStats()); 265 if(log.isDebugEnabled()){ 266 log.debug("Publishing statistics"); 267 } 268 prod.send(textMessage, 269 MantaAgentConstants.NON_PERSISTENT, 270 MantaAgentConstants.NORMAL, 271 SystemTime.gmtCurrentTimeMillis()+60000); 272 } catch (JMSException e) { 273 if(log.isErrorEnabled()){ 274 log.error("Exception during publishing statstics.", e); 275 } 276 } 277 } 278 } 279 } | Popular Tags |