KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > sun > enterprise > admin > monitor > callflow > AsyncHandler


1 /*
2  * The contents of this file are subject to the terms
3  * of the Common Development and Distribution License
4  * (the License). You may not use this file except in
5  * compliance with the License.
6  *
7  * You can obtain a copy of the license at
8  * https://glassfish.dev.java.net/public/CDDLv1.0.html or
9  * glassfish/bootstrap/legal/CDDLv1.0.txt.
10  * See the License for the specific language governing
11  * permissions and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL
14  * Header Notice in each file and include the License file
15  * at glassfish/bootstrap/legal/CDDLv1.0.txt.
16  * If applicable, add the following below the CDDL Header,
17  * with the fields enclosed by brackets [] replaced by
18  * you own identifying information:
19  * "Portions Copyrighted [year] [name of copyright owner]"
20  *
21  * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
22  */

23
24 /*
25  * AsyncHandler.java
26  * $Id: AsyncHandler.java,v 1.10 2005/12/25 03:43:19 tcfujii Exp $
27  * $Date: 2005/12/25 03:43:19 $
28  * $Revision: 1.10 $
29  */

30
31 package com.sun.enterprise.admin.monitor.callflow;
32
33 import java.util.List JavaDoc;
34 import java.util.ArrayList JavaDoc;
35 import java.util.logging.Level JavaDoc;
36 import java.util.logging.Logger JavaDoc;
37 import java.util.concurrent.TimeUnit JavaDoc;
38 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
39 import java.util.concurrent.ConcurrentLinkedQueue JavaDoc;
40
41 import com.sun.enterprise.admin.common.constant.AdminConstants;
42
43 /**
44  * This class asynchronously writes supplied input into a data base. That is,
45  * a separate asynchronous thread is used to write data to the data base.
46  *
47  * The implementation uses a typical producer - consumer model: The data
48  * produced by application threads is collected in unbounded queue objects;
49  * an asynchronous thread consumes the data from the queues and writes it out
50  * to the data base.
51  *
52  * @author Ram Jeyaraman, Harpreet Singh
53  * @date March 21, 2005
54  */

55 class AsyncHandler {
56     
57     /** Static fields */
58     
59     private static final Logger JavaDoc logger =
60             Logger.getLogger(AdminConstants.kLoggerName);
61     private static final int WAIT_INTERVALS = 100;
62     private static final int MAX_BULK_SIZE = 10000;
63     private static final int BUFFER_COUNT = 6;
64     private static final String JavaDoc THREAD_NAME = "Callflow AsyncThread";
65         
66     /** Private fields */
67     
68     private LinkedBlockingQueue JavaDoc<RequestStartTO> requestStartQ;
69     private LinkedBlockingQueue JavaDoc<RequestEndTO> requestEndQ;
70     private LinkedBlockingQueue JavaDoc<MethodStartTO> methodStartQ;
71     private LinkedBlockingQueue JavaDoc<MethodEndTO> methodEndQ;
72     private LinkedBlockingQueue JavaDoc<StartTimeTO> startTimeQ;
73     private LinkedBlockingQueue JavaDoc<EndTimeTO> endTimeQ;
74     
75     private ConcurrentLinkedQueue JavaDoc<RequestStartTO> requestStartFreeQ;
76     private ConcurrentLinkedQueue JavaDoc<RequestEndTO> requestEndFreeQ;
77     private ConcurrentLinkedQueue JavaDoc<MethodStartTO> methodStartFreeQ;
78     private ConcurrentLinkedQueue JavaDoc<MethodEndTO> methodEndFreeQ;
79     private ConcurrentLinkedQueue JavaDoc<StartTimeTO> startTimeFreeQ;
80     private ConcurrentLinkedQueue JavaDoc<EndTimeTO> endTimeFreeQ;
81     
82     private AsyncThread asyncThread;
83     
84     private class AsyncThread extends Thread JavaDoc {
85         
86         private boolean shutdown;
87         private int emptyBufferCount;
88         private DbAccessObject dbAccessObject;
89         
90         AsyncThread() {
91             setDaemon(true);
92             setName(THREAD_NAME);
93             dbAccessObject = DbAccessObjectImpl.getInstance();
94         }
95         
96         void shutdown() {
97             shutdown = true;
98             while (emptyBufferCount < BUFFER_COUNT) {
99                 try {
100                     Thread.sleep(WAIT_INTERVALS);
101                 } catch (InterruptedException JavaDoc e) {}
102             }
103         }
104         
105         public void run() {
106             
107             List JavaDoc<RequestStartTO> rsTransferObjects =
108                     new ArrayList JavaDoc<RequestStartTO>();
109             List JavaDoc<RequestEndTO> reTransferObjects =
110                     new ArrayList JavaDoc<RequestEndTO>();
111             List JavaDoc<MethodStartTO> msTransferObjects =
112                     new ArrayList JavaDoc<MethodStartTO>();
113             List JavaDoc<MethodEndTO> meTransferObjects =
114                     new ArrayList JavaDoc<MethodEndTO>();
115             List JavaDoc<StartTimeTO> stTransferObjects =
116                     new ArrayList JavaDoc<StartTimeTO>();
117             List JavaDoc<EndTimeTO> etTransferObjects =
118                     new ArrayList JavaDoc<EndTimeTO>();
119             
120             while (emptyBufferCount < BUFFER_COUNT) {
121                 
122                 // Handle RequestStart
123

124                 for (int i=0; i<MAX_BULK_SIZE; i++) {
125                     try {
126                         RequestStartTO rsto =
127                                 requestStartQ.poll(
128                                     WAIT_INTERVALS, TimeUnit.MILLISECONDS);
129                         if (rsto == null) {
130                             break;
131                         }
132                         rsTransferObjects.add(rsto);
133                     } catch (InterruptedException JavaDoc e) {
134                         logger.log(
135                                 Level.FINE,
136                                 "callflow.async_thread_interrupted",
137                                 e);
138                         break;
139                     }
140                 }
141                 try {
142                     if (rsTransferObjects.isEmpty()) {
143                         if (shutdown) {
144                             emptyBufferCount++;
145                         }
146                     } else {
147                         dbAccessObject.insert(
148                             rsTransferObjects.toArray(new TransferObject[0]));
149                     }
150                 } catch (Exception JavaDoc e) {
151                     logger.log(
152                             Level.WARNING,
153                             "callflow.async_db_write_failed",
154                             e);
155                 }
156                 requestStartFreeQ.addAll(rsTransferObjects);
157                 rsTransferObjects.clear();
158                 
159                 // Handle RequestEnd
160

161                 for (int i=0; i<MAX_BULK_SIZE; i++) {
162                     try {
163                         RequestEndTO reto =
164                                 requestEndQ.poll(
165                                     WAIT_INTERVALS, TimeUnit.MILLISECONDS);
166                         if (reto == null) {
167                             break;
168                         }
169                         reTransferObjects.add(reto);
170                     } catch (InterruptedException JavaDoc e) {
171                         logger.log(
172                                 Level.FINE,
173                                 "callflow.async_thread_interrupted",
174                                 e);
175                         break;
176                     }
177                 }
178                 try {
179                     if (reTransferObjects.isEmpty()) {
180                         if (shutdown) {
181                             emptyBufferCount++;
182                         }
183                     } else {
184                         dbAccessObject.insert(
185                             reTransferObjects.toArray(new TransferObject[0]));
186                     }
187                 } catch (Exception JavaDoc e) {
188                     logger.log(
189                             Level.WARNING,
190                             "callflow.async_db_write_failed",
191                             e);
192                 }
193                 requestEndFreeQ.addAll(reTransferObjects);
194                 reTransferObjects.clear();
195                 
196                 // Handle MethodStart
197

198                 for (int i=0; i<MAX_BULK_SIZE; i++) {
199                     try {
200                         MethodStartTO msto =
201                                 methodStartQ.poll(
202                                     WAIT_INTERVALS, TimeUnit.MILLISECONDS);
203                         if (msto == null) {
204                             break;
205                         }
206                         msTransferObjects.add(msto);
207                     } catch (InterruptedException JavaDoc e) {
208                         logger.log(
209                                 Level.FINE,
210                                 "callflow.async_thread_interrupted",
211                                 e);
212                         break;
213                     }
214                 }
215                 try {
216                     if (msTransferObjects.isEmpty()) {
217                         if (shutdown) {
218                             emptyBufferCount++;
219                         }
220                     } else {
221                         dbAccessObject.insert(
222                             msTransferObjects.toArray(new TransferObject[0]));
223                     }
224                 } catch (Exception JavaDoc e) {
225                     logger.log(
226                             Level.WARNING,
227                             "callflow.async_db_write_failed",
228                             e);
229                 }
230                 methodStartFreeQ.addAll(msTransferObjects);
231                 msTransferObjects.clear();
232                 
233                 // Handle MethodEnd
234

235                 for (int i=0; i<MAX_BULK_SIZE; i++) {
236                     try {
237                         MethodEndTO meto =
238                                 methodEndQ.poll(
239                                     WAIT_INTERVALS, TimeUnit.MILLISECONDS);
240                         if (meto == null) {
241                             break;
242                         }
243                         meTransferObjects.add(meto);
244                     } catch (InterruptedException JavaDoc e) {
245                         logger.log(
246                                 Level.FINE,
247                                 "callflow.async_thread_interrupted",
248                                 e);
249                         break;
250                     }
251                 }
252                 try {
253                     if (meTransferObjects.isEmpty()) {
254                         if (shutdown) {
255                             emptyBufferCount++;
256                         }
257                     } else {
258                         dbAccessObject.insert(
259                             meTransferObjects.toArray(new TransferObject[0]));
260                     }
261                 } catch (Exception JavaDoc e) {
262                     logger.log(
263                             Level.WARNING,
264                             "callflow.async_db_write_failed",
265                             e);
266                 }
267                 methodEndFreeQ.addAll(meTransferObjects);
268                 meTransferObjects.clear();
269                 
270                 // Handle StartTime
271

272                 for (int i=0; i<MAX_BULK_SIZE; i++) {
273                     try {
274                         StartTimeTO stto =
275                                 startTimeQ.poll(
276                                     WAIT_INTERVALS, TimeUnit.MILLISECONDS);
277                         if (stto == null) {
278                             break;
279                         }
280                         stTransferObjects.add(stto);
281                     } catch (InterruptedException JavaDoc e) {
282                         logger.log(
283                                 Level.FINE,
284                                 "callflow.async_thread_interrupted",
285                                 e);
286                         break;
287                     }
288                 }
289                 try {
290                     if (stTransferObjects.isEmpty()) {
291                         if (shutdown) {
292                             emptyBufferCount++;
293                         }
294                     } else {
295                         dbAccessObject.insert(
296                             stTransferObjects.toArray(new TransferObject[0]));
297                     }
298                 } catch (Exception JavaDoc e) {
299                     logger.log(
300                             Level.WARNING,
301                             "callflow.async_db_write_failed",
302                             e);
303                 }
304                 startTimeFreeQ.addAll(stTransferObjects);
305                 stTransferObjects.clear();
306                 
307                 // Handle EndTime
308

309                 for (int i=0; i<MAX_BULK_SIZE; i++) {
310                     try {
311                         EndTimeTO etto =
312                                 endTimeQ.poll(
313                                     WAIT_INTERVALS, TimeUnit.MILLISECONDS);
314                         if (etto == null) {
315                             break;
316                         }
317                         etTransferObjects.add(etto);
318                     } catch (InterruptedException JavaDoc e) {
319                         logger.log(
320                                 Level.FINE,
321                                 "callflow.async_thread_interrupted",
322                                 e);
323                         break;
324                     }
325                 }
326                 try {
327                     if (etTransferObjects.isEmpty()) {
328                         if (shutdown) {
329                             emptyBufferCount++;
330                         }
331                     } else {
332                         dbAccessObject.insert(
333                             etTransferObjects.toArray(new TransferObject[0]));
334                     }
335                 } catch (Exception JavaDoc e) {
336                     logger.log(
337                             Level.WARNING,
338                             "callflow.async_db_write_failed",
339                             e);
340                 }
341                 endTimeFreeQ.addAll(etTransferObjects);
342                 etTransferObjects.clear();
343             }
344         }
345     }
346     
347     AsyncHandler() {
348         
349         requestStartQ = new LinkedBlockingQueue JavaDoc<RequestStartTO>();
350         requestEndQ = new LinkedBlockingQueue JavaDoc<RequestEndTO>();
351         methodStartQ = new LinkedBlockingQueue JavaDoc<MethodStartTO>();
352         methodEndQ = new LinkedBlockingQueue JavaDoc<MethodEndTO>();
353         startTimeQ = new LinkedBlockingQueue JavaDoc<StartTimeTO>();
354         endTimeQ = new LinkedBlockingQueue JavaDoc<EndTimeTO>();
355         
356         requestStartFreeQ = new ConcurrentLinkedQueue JavaDoc<RequestStartTO>();
357         requestEndFreeQ = new ConcurrentLinkedQueue JavaDoc<RequestEndTO>();
358         methodStartFreeQ = new ConcurrentLinkedQueue JavaDoc<MethodStartTO>();
359         methodEndFreeQ = new ConcurrentLinkedQueue JavaDoc<MethodEndTO>();
360         startTimeFreeQ = new ConcurrentLinkedQueue JavaDoc<StartTimeTO>();
361         endTimeFreeQ = new ConcurrentLinkedQueue JavaDoc<EndTimeTO>();
362     }
363     
364     synchronized void enable() {
365         asyncThread = new AsyncThread();
366         asyncThread.start();
367     }
368     
369     synchronized void disable() {
370         asyncThread.shutdown();
371     }
372     
373     void handleRequestStart(
374             String JavaDoc requestId, long timeStamp, long timeStampMillis,
375             RequestType requestType, String JavaDoc callerIPAddress,
376             String JavaDoc remoteUser) {
377         RequestStartTO rsto = requestStartFreeQ.poll();
378         if (rsto == null) {
379             rsto = new RequestStartTO();
380         }
381         rsto.setRequestId(requestId);
382         rsto.setTimeStamp(timeStamp);
383         rsto.setTimeStampMillis(timeStampMillis);
384         rsto.setRequestType(requestType);
385         rsto.setIpAddress(callerIPAddress);
386         //rsto.setRemoteUser(remoteUser); // not currently in db schema.
387
boolean success = false;
388         while (!success) {
389             try {
390                 requestStartQ.put(rsto);
391                 success = true;
392             } catch (InterruptedException JavaDoc e) {
393                 logger.log(
394                         Level.FINE,
395                         "callflow.transfer_to_async_thread_interrupted", e);
396             }
397         }
398     }
399     
400     void handleRequestEnd(String JavaDoc requestId, long timeStamp) {
401         RequestEndTO reto = requestEndFreeQ.poll();
402         if (reto == null) {
403             reto = new RequestEndTO();
404         }
405         reto.setRequestId(requestId);
406         reto.setTimeStamp(timeStamp);
407
408         boolean success = false;
409         while (!success) {
410             try {
411                 requestEndQ.put(reto);
412                 success = true;
413             } catch (InterruptedException JavaDoc e) {
414                 logger.log(
415                         Level.FINE,
416                         "callflow.transfer_to_async_thread_interrupted", e);
417             }
418         }
419     }
420     
421     void handleMethodStart(
422             String JavaDoc requestId, long timeStamp, String JavaDoc methodName,
423             ComponentType componentType, String JavaDoc applicationName,
424             String JavaDoc moduleName, String JavaDoc componentName, String JavaDoc threadId,
425             String JavaDoc transactionId, String JavaDoc securityId) {
426         MethodStartTO msto = methodStartFreeQ.poll();
427         if (msto == null) {
428             msto = new MethodStartTO();
429         }
430         msto.setRequestId(requestId);
431         msto.setTimeStamp(timeStamp);
432         msto.setMethodName(methodName);
433         msto.setComponentType(componentType);
434         msto.setAppName(applicationName);
435         msto.setModuleName(moduleName);
436         msto.setComponentName(componentName);
437         msto.setThreadId(threadId);
438         msto.setTransactionId(transactionId);
439         msto.setSecurityId(securityId);
440         
441         boolean success = false;
442         while (!success) {
443             try {
444                 methodStartQ.put(msto);
445                 success = true;
446             } catch (InterruptedException JavaDoc e) {
447                 logger.log(
448                         Level.FINE,
449                         "callflow.transfer_to_async_thread_interrupted", e);
450             }
451         }
452     }
453     
454     void handleMethodEnd(
455             String JavaDoc requestId, long timeStamp, Throwable JavaDoc exception) {
456         MethodEndTO meto = methodEndFreeQ.poll();
457         if (meto == null) {
458             meto = new MethodEndTO();
459         }
460         meto.setRequestId(requestId);
461         meto.setTimeStamp(timeStamp);
462         meto.setException(((exception == null) ? null : exception.toString()));
463         
464         boolean success = false;
465         while (!success) {
466             try {
467                 methodEndQ.put(meto);
468                 success = true;
469             } catch (InterruptedException JavaDoc e) {
470                 logger.log(
471                         Level.FINE,
472                         "callflow.transfer_to_async_thread_interrupted", e);
473             }
474         }
475     }
476     
477     void handleStartTime(
478             String JavaDoc requestId, long timeStamp,
479             ContainerTypeOrApplicationType type) {
480         StartTimeTO stto = startTimeFreeQ.poll();
481         if (stto == null) {
482             stto = new StartTimeTO();
483         }
484         stto.setRequestId(requestId);
485         stto.setTimeStamp(timeStamp);
486         stto.setContainerTypeOrApplicationType(type);
487         boolean success = false;
488         while (!success) {
489             try {
490                 startTimeQ.put(stto);
491                 success = true;
492             } catch (InterruptedException JavaDoc e) {
493                 logger.log(
494                         Level.FINE,
495                         "callflow.transfer_to_async_thread_interrupted", e);
496             }
497         }
498     }
499     
500     void handleEndTime(
501             String JavaDoc requestId, long timeStamp,
502             ContainerTypeOrApplicationType type) {
503         EndTimeTO etto = endTimeFreeQ.poll();
504         if (etto == null) {
505             etto = new EndTimeTO();
506         }
507         etto.setRequestId(requestId);
508         etto.setTimeStamp(timeStamp);
509         etto.setContainerTypeOrApplicationType(type);
510         boolean success = false;
511         while (!success) {
512             try {
513                 endTimeQ.put(etto);
514                 success = true;
515             } catch (InterruptedException JavaDoc e) {
516                 logger.log(
517                         Level.FINE,
518                         "callflow.transfer_to_async_thread_interrupted", e);
519             }
520         }
521     }
522 }
523
Popular Tags