1 20 package org.apache.mina.management; 21 22 import java.net.SocketAddress ; 23 import java.util.Iterator ; 24 import java.util.Queue ; 25 import java.util.concurrent.ConcurrentLinkedQueue ; 26 import java.util.concurrent.atomic.AtomicLong ; 27 28 import org.apache.mina.common.IoHandler; 29 import org.apache.mina.common.IoService; 30 import org.apache.mina.common.IoServiceConfig; 31 import org.apache.mina.common.IoServiceListener; 32 import org.apache.mina.common.IoSession; 33 34 52 public class StatCollector { 53 56 public static final String KEY = StatCollector.class.getName() + ".stat"; 57 58 61 private static volatile int nextId = 0; 62 63 private final int id = nextId++; 64 65 private final Object calcLock = new Object (); 66 67 private final IoService service; 68 69 private Worker worker; 70 71 private int pollingInterval = 5000; 72 73 private Queue <IoSession> polledSessions; 74 75 private AtomicLong totalProcessedSessions = new AtomicLong (); 77 78 private float msgWrittenThroughput = 0f; 79 80 private float msgReadThroughput = 0f; 81 82 private float bytesWrittenThroughput = 0f; 83 84 private float bytesReadThroughput = 0f; 85 86 private final IoServiceListener serviceListener = new IoServiceListener() { 87 public void serviceActivated(IoService service, 88 SocketAddress serviceAddress, IoHandler handler, 89 IoServiceConfig config) { 90 } 91 92 public void serviceDeactivated(IoService service, 93 SocketAddress serviceAddress, IoHandler handler, 94 IoServiceConfig config) { 95 } 96 97 public void sessionCreated(IoSession session) { 98 addSession(session); 99 } 100 101 public void sessionDestroyed(IoSession session) { 102 removeSession(session); 103 } 104 }; 105 106 110 public StatCollector(IoService service) { 111 this(service, 5000); 112 } 113 114 119 public StatCollector(IoService service, int pollingInterval) { 120 this.service = service; 121 this.pollingInterval = pollingInterval; 122 } 123 124 128 public void start() { 129 synchronized (this) { 130 if (worker != null && worker.isAlive()) 131 throw new RuntimeException ("Stat collecting already started"); 132 133 135 polledSessions = new ConcurrentLinkedQueue <IoSession>(); 136 137 for (Iterator <SocketAddress > iter = service 138 .getManagedServiceAddresses().iterator(); iter.hasNext();) { 139 SocketAddress element = iter.next(); 140 141 for (Iterator <IoSession> iter2 = service.getManagedSessions( 142 element).iterator(); iter2.hasNext();) { 143 addSession(iter2.next()); 144 145 } 146 } 147 148 service.addListener(serviceListener); 150 151 worker = new Worker(); 153 worker.start(); 154 155 } 156 157 } 158 159 163 public void stop() { 164 synchronized (this) { 165 if (worker == null) { 166 return; 167 } 168 169 service.removeListener(serviceListener); 170 171 worker.stop = true; 173 worker.interrupt(); 174 while (worker.isAlive()) { 175 try { 176 worker.join(); 177 } catch (InterruptedException e) { 178 } 180 } 181 182 for (Iterator iter = polledSessions.iterator(); iter.hasNext();) { 183 IoSession session = (IoSession) iter.next(); 184 session.removeAttribute(KEY); 185 } 186 polledSessions.clear(); 187 188 worker = null; 189 } 190 } 191 192 196 public boolean isRunning() { 197 synchronized (this) { 198 return worker != null && worker.stop != true; 199 } 200 } 201 202 private void addSession(IoSession session) { 203 IoSessionStat sessionStats = new IoSessionStat(); 204 session.setAttribute(KEY, sessionStats); 205 totalProcessedSessions.incrementAndGet(); 206 polledSessions.add(session); 207 } 208 209 private void removeSession(IoSession session) { 210 polledSessions.remove(session); 212 213 IoSessionStat sessStat = (IoSessionStat) session.getAttribute(KEY); 216 217 long currentTime = System.currentTimeMillis(); 219 synchronized (calcLock) { 220 bytesReadThroughput += (session.getReadBytes() - sessStat.lastByteRead) 221 / ((currentTime - sessStat.lastPollingTime) / 1000f); 222 bytesWrittenThroughput += (session.getWrittenBytes() - sessStat.lastByteWrite) 223 / ((currentTime - sessStat.lastPollingTime) / 1000f); 224 msgReadThroughput += (session.getReadMessages() - sessStat.lastMessageRead) 225 / ((currentTime - sessStat.lastPollingTime) / 1000f); 226 msgWrittenThroughput += (session.getWrittenMessages() - sessStat.lastMessageWrite) 227 / ((currentTime - sessStat.lastPollingTime) / 1000f); 228 } 229 230 session.removeAttribute(KEY); 231 } 232 233 237 public long getTotalProcessedSessions() { 238 return totalProcessedSessions.get(); 239 } 240 241 public float getBytesReadThroughput() { 242 return bytesReadThroughput; 243 } 244 245 public float getBytesWrittenThroughput() { 246 return bytesWrittenThroughput; 247 } 248 249 public float getMsgReadThroughput() { 250 return msgReadThroughput; 251 } 252 253 public float getMsgWrittenThroughput() { 254 return msgWrittenThroughput; 255 } 256 257 public long getSessionCount() { 258 return polledSessions.size(); 259 } 260 261 private class Worker extends Thread { 262 263 boolean stop = false; 264 265 private Worker() { 266 super("StatCollectorWorker-" + id); 267 } 268 269 public void run() { 270 while (!stop) { 271 for (Iterator iter = polledSessions.iterator(); iter.hasNext();) { 272 IoSession session = (IoSession) iter.next(); 273 IoSessionStat sessStat = (IoSessionStat) session 274 .getAttribute(KEY); 275 276 sessStat.lastByteRead = session.getReadBytes(); 277 sessStat.lastByteWrite = session.getWrittenBytes(); 278 sessStat.lastMessageRead = session.getReadMessages(); 279 sessStat.lastMessageWrite = session.getWrittenMessages(); 280 } 281 282 try { 284 Thread.sleep(pollingInterval); 285 } catch (InterruptedException e) { 286 } 287 288 float tmpMsgWrittenThroughput = 0f; 289 float tmpMsgReadThroughput = 0f; 290 float tmpBytesWrittenThroughput = 0f; 291 float tmpBytesReadThroughput = 0f; 292 293 for (Iterator iter = polledSessions.iterator(); iter.hasNext();) { 294 295 IoSession session = (IoSession) iter.next(); 297 IoSessionStat sessStat = (IoSessionStat) session 298 .getAttribute(KEY); 299 300 sessStat.byteReadThroughput = (session.getReadBytes() - sessStat.lastByteRead) 301 / (pollingInterval / 1000f); 302 tmpBytesReadThroughput += sessStat.byteReadThroughput; 303 304 sessStat.byteWrittenThroughput = (session.getWrittenBytes() - sessStat.lastByteWrite) 305 / (pollingInterval / 1000f); 306 tmpBytesWrittenThroughput += sessStat.byteWrittenThroughput; 307 308 sessStat.messageReadThroughput = (session.getReadMessages() - sessStat.lastMessageRead) 309 / (pollingInterval / 1000f); 310 tmpMsgReadThroughput += sessStat.messageReadThroughput; 311 312 sessStat.messageWrittenThroughput = (session 313 .getWrittenMessages() - sessStat.lastMessageWrite) 314 / (pollingInterval / 1000f); 315 tmpMsgWrittenThroughput += sessStat.messageWrittenThroughput; 316 317 synchronized (calcLock) { 318 msgWrittenThroughput = tmpMsgWrittenThroughput; 319 msgReadThroughput = tmpMsgReadThroughput; 320 bytesWrittenThroughput = tmpBytesWrittenThroughput; 321 bytesReadThroughput = tmpBytesReadThroughput; 322 sessStat.lastPollingTime = System.currentTimeMillis(); 323 } 324 } 325 } 326 } 327 } 328 } | Popular Tags |