KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > mina > management > StatCollector


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  *
19  */

20 package org.apache.mina.management;
21
22 import java.net.SocketAddress JavaDoc;
23 import java.util.Iterator JavaDoc;
24 import java.util.Queue JavaDoc;
25 import java.util.concurrent.ConcurrentLinkedQueue JavaDoc;
26 import java.util.concurrent.atomic.AtomicLong JavaDoc;
27
28 import org.apache.mina.common.IoHandler;
29 import org.apache.mina.common.IoService;
30 import org.apache.mina.common.IoServiceConfig;
31 import org.apache.mina.common.IoServiceListener;
32 import org.apache.mina.common.IoSession;
33
34 /**
35  * Collects statistics of an {@link IoService}. It's polling all the sessions of a given
36  * IoService. It's attaching a {@link IoSessionStat} object to all the sessions polled
37  * and filling the throughput values.
38  *
39  * Usage :
40  * <pre>
41  * IoService service = ...
42  * StatCollector collector = new StatCollector( service );
43  * collector.start();
44  * </pre>
45  *
46  * By default the {@link StatCollector} is polling the sessions every 5 seconds. You can
47  * give a different polling time using a second constructor.
48  *
49  * @author The Apache Directory Project (mina-dev@directory.apache.org)
50  * @version $Rev: 555855 $, $Date: 2007-07-13 12:19:00 +0900 (금, 13 7월 2007) $
51  */

52 public class StatCollector {
53     /**
54      * The session attribute key for {@link IoSessionStat}.
55      */

56     public static final String JavaDoc KEY = StatCollector.class.getName() + ".stat";
57
58     /**
59      * @noinspection StaticNonFinalField
60      */

61     private static volatile int nextId = 0;
62
63     private final int id = nextId++;
64
65     private final Object JavaDoc calcLock = new Object JavaDoc();
66
67     private final IoService service;
68
69     private Worker worker;
70
71     private int pollingInterval = 5000;
72
73     private Queue JavaDoc<IoSession> polledSessions;
74
75     // resume of session stats, for simplifying acces to the statistics
76
private AtomicLong JavaDoc totalProcessedSessions = new AtomicLong JavaDoc();
77
78     private float msgWrittenThroughput = 0f;
79
80     private float msgReadThroughput = 0f;
81
82     private float bytesWrittenThroughput = 0f;
83
84     private float bytesReadThroughput = 0f;
85
86     private final IoServiceListener serviceListener = new IoServiceListener() {
87         public void serviceActivated(IoService service,
88                 SocketAddress JavaDoc serviceAddress, IoHandler handler,
89                 IoServiceConfig config) {
90         }
91
92         public void serviceDeactivated(IoService service,
93                 SocketAddress JavaDoc serviceAddress, IoHandler handler,
94                 IoServiceConfig config) {
95         }
96
97         public void sessionCreated(IoSession session) {
98             addSession(session);
99         }
100
101         public void sessionDestroyed(IoSession session) {
102             removeSession(session);
103         }
104     };
105
106     /**
107      * Create a stat collector for the given service with a default polling time of 5 seconds.
108      * @param service the IoService to inspect
109      */

110     public StatCollector(IoService service) {
111         this(service, 5000);
112     }
113
114     /**
115      * create a stat collector for the given given service
116      * @param service the IoService to inspect
117      * @param pollingInterval milliseconds
118      */

119     public StatCollector(IoService service, int pollingInterval) {
120         this.service = service;
121         this.pollingInterval = pollingInterval;
122     }
123
124     /**
125      * Start collecting stats for the {@link IoSession} of the service.
126      * New sessions or destroyed will be automaticly added or removed.
127      */

128     public void start() {
129         synchronized (this) {
130             if (worker != null && worker.isAlive())
131                 throw new RuntimeException JavaDoc("Stat collecting already started");
132
133             // add all current sessions
134

135             polledSessions = new ConcurrentLinkedQueue JavaDoc<IoSession>();
136
137             for (Iterator JavaDoc<SocketAddress JavaDoc> iter = service
138                     .getManagedServiceAddresses().iterator(); iter.hasNext();) {
139                 SocketAddress JavaDoc element = iter.next();
140
141                 for (Iterator JavaDoc<IoSession> iter2 = service.getManagedSessions(
142                         element).iterator(); iter2.hasNext();) {
143                     addSession(iter2.next());
144
145                 }
146             }
147
148             // listen for new ones
149
service.addListener(serviceListener);
150
151             // start polling
152
worker = new Worker();
153             worker.start();
154
155         }
156
157     }
158
159     /**
160      * Stop collecting stats. all the {@link IoSessionStat} object will be removed of the
161      * polled session attachements.
162      */

163     public void stop() {
164         synchronized (this) {
165             if (worker == null) {
166                 return;
167             }
168
169             service.removeListener(serviceListener);
170
171             // stop worker
172
worker.stop = true;
173             worker.interrupt();
174             while (worker.isAlive()) {
175                 try {
176                     worker.join();
177                 } catch (InterruptedException JavaDoc e) {
178                     //ignore since this is shutdown time
179
}
180             }
181
182             for (Iterator JavaDoc iter = polledSessions.iterator(); iter.hasNext();) {
183                 IoSession session = (IoSession) iter.next();
184                 session.removeAttribute(KEY);
185             }
186             polledSessions.clear();
187
188             worker = null;
189         }
190     }
191
192     /**
193      * is the stat collector started and polling the {@link IoSession} of the {@link IoService}
194      * @return true if started
195      */

196     public boolean isRunning() {
197         synchronized (this) {
198             return worker != null && worker.stop != true;
199         }
200     }
201
202     private void addSession(IoSession session) {
203         IoSessionStat sessionStats = new IoSessionStat();
204         session.setAttribute(KEY, sessionStats);
205         totalProcessedSessions.incrementAndGet();
206         polledSessions.add(session);
207     }
208
209     private void removeSession(IoSession session) {
210         // remove the session from the list of polled sessions
211
polledSessions.remove(session);
212
213         // add the bytes processed between last polling and session closing
214
// prevent non seen byte with non-connected protocols like HTTP and datagrams
215
IoSessionStat sessStat = (IoSessionStat) session.getAttribute(KEY);
216
217         // computing with time between polling and closing
218
long currentTime = System.currentTimeMillis();
219         synchronized (calcLock) {
220             bytesReadThroughput += (session.getReadBytes() - sessStat.lastByteRead)
221                     / ((currentTime - sessStat.lastPollingTime) / 1000f);
222             bytesWrittenThroughput += (session.getWrittenBytes() - sessStat.lastByteWrite)
223                     / ((currentTime - sessStat.lastPollingTime) / 1000f);
224             msgReadThroughput += (session.getReadMessages() - sessStat.lastMessageRead)
225                     / ((currentTime - sessStat.lastPollingTime) / 1000f);
226             msgWrittenThroughput += (session.getWrittenMessages() - sessStat.lastMessageWrite)
227                     / ((currentTime - sessStat.lastPollingTime) / 1000f);
228         }
229
230         session.removeAttribute(KEY);
231     }
232
233     /**
234      * total number of sessions processed by the stat collector
235      * @return number of sessions
236      */

237     public long getTotalProcessedSessions() {
238         return totalProcessedSessions.get();
239     }
240
241     public float getBytesReadThroughput() {
242         return bytesReadThroughput;
243     }
244
245     public float getBytesWrittenThroughput() {
246         return bytesWrittenThroughput;
247     }
248
249     public float getMsgReadThroughput() {
250         return msgReadThroughput;
251     }
252
253     public float getMsgWrittenThroughput() {
254         return msgWrittenThroughput;
255     }
256
257     public long getSessionCount() {
258         return polledSessions.size();
259     }
260
261     private class Worker extends Thread JavaDoc {
262
263         boolean stop = false;
264
265         private Worker() {
266             super("StatCollectorWorker-" + id);
267         }
268
269         public void run() {
270             while (!stop) {
271                 for (Iterator JavaDoc iter = polledSessions.iterator(); iter.hasNext();) {
272                     IoSession session = (IoSession) iter.next();
273                     IoSessionStat sessStat = (IoSessionStat) session
274                             .getAttribute(KEY);
275
276                     sessStat.lastByteRead = session.getReadBytes();
277                     sessStat.lastByteWrite = session.getWrittenBytes();
278                     sessStat.lastMessageRead = session.getReadMessages();
279                     sessStat.lastMessageWrite = session.getWrittenMessages();
280                 }
281
282                 // wait polling time
283
try {
284                     Thread.sleep(pollingInterval);
285                 } catch (InterruptedException JavaDoc e) {
286                 }
287
288                 float tmpMsgWrittenThroughput = 0f;
289                 float tmpMsgReadThroughput = 0f;
290                 float tmpBytesWrittenThroughput = 0f;
291                 float tmpBytesReadThroughput = 0f;
292
293                 for (Iterator JavaDoc iter = polledSessions.iterator(); iter.hasNext();) {
294
295                     // upadating individual session statistics
296
IoSession session = (IoSession) iter.next();
297                     IoSessionStat sessStat = (IoSessionStat) session
298                             .getAttribute(KEY);
299
300                     sessStat.byteReadThroughput = (session.getReadBytes() - sessStat.lastByteRead)
301                             / (pollingInterval / 1000f);
302                     tmpBytesReadThroughput += sessStat.byteReadThroughput;
303
304                     sessStat.byteWrittenThroughput = (session.getWrittenBytes() - sessStat.lastByteWrite)
305                             / (pollingInterval / 1000f);
306                     tmpBytesWrittenThroughput += sessStat.byteWrittenThroughput;
307
308                     sessStat.messageReadThroughput = (session.getReadMessages() - sessStat.lastMessageRead)
309                             / (pollingInterval / 1000f);
310                     tmpMsgReadThroughput += sessStat.messageReadThroughput;
311
312                     sessStat.messageWrittenThroughput = (session
313                             .getWrittenMessages() - sessStat.lastMessageWrite)
314                             / (pollingInterval / 1000f);
315                     tmpMsgWrittenThroughput += sessStat.messageWrittenThroughput;
316
317                     synchronized (calcLock) {
318                         msgWrittenThroughput = tmpMsgWrittenThroughput;
319                         msgReadThroughput = tmpMsgReadThroughput;
320                         bytesWrittenThroughput = tmpBytesWrittenThroughput;
321                         bytesReadThroughput = tmpBytesReadThroughput;
322                         sessStat.lastPollingTime = System.currentTimeMillis();
323                     }
324                 }
325             }
326         }
327     }
328 }
Popular Tags