KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > TransportStatusDetector


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker;
19
20 import java.util.concurrent.CopyOnWriteArraySet JavaDoc;
21 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
22
23 import org.apache.activemq.Service;
24 import org.apache.activemq.ThreadPriorities;
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27
28 import java.util.Iterator JavaDoc;
29 import java.util.Set JavaDoc;
30 /**
31  * Used to provide information on the status of the Connection
32  *
33  * @version $Revision: 1.5 $
34  */

35 public class TransportStatusDetector implements Service,Runnable JavaDoc{
36     private static final Log log=LogFactory.getLog(TransportStatusDetector.class);
37     private TransportConnector connector;
38     private Set JavaDoc collectionCandidates=new CopyOnWriteArraySet JavaDoc();
39     private AtomicBoolean JavaDoc started=new AtomicBoolean JavaDoc(false);
40     private Thread JavaDoc runner;
41     private int sweepInterval=5000;
42     
43     TransportStatusDetector(TransportConnector connector){
44         this.connector=connector;
45     }
46     /**
47      * @return Returns the sweepInterval.
48      */

49     public int getSweepInterval(){
50         return sweepInterval;
51     }
52     
53     /**
54      * The sweepInterval to set.
55      * @param sweepInterval
56      *
57      */

58     public void setSweepInterval(int sweepInterval){
59         this.sweepInterval=sweepInterval;
60     }
61     
62     protected void doCollection(){
63         for(Iterator JavaDoc i=collectionCandidates.iterator();i.hasNext();){
64             TransportConnection tc=(TransportConnection) i.next();
65             if(tc.isMarkedCandidate()){
66                 if(tc.isBlockedCandidate()){
67                     collectionCandidates.remove(tc);
68                     doCollection(tc);
69                 }else{
70                     tc.doMark();
71                 }
72             }else{
73                 collectionCandidates.remove(tc);
74             }
75         }
76     }
77     protected void doSweep(){
78         for(Iterator JavaDoc i=connector.getConnections().iterator();i.hasNext();){
79             TransportConnection connection=(TransportConnection) i.next();
80             if(connection.isMarkedCandidate()){
81                 connection.doMark();
82                 collectionCandidates.add(connection);
83             }
84         }
85     }
86     protected void doCollection(TransportConnection tc){
87         log.warn("found a blocked client - stopping: "+tc);
88         try{
89             tc.stop();
90         }catch(Exception JavaDoc e){
91             log.error("Error stopping "+tc,e);
92         }
93     }
94     public void run(){
95         while(started.get()){
96             try{
97                 doCollection();
98                 doSweep();
99                 Thread.sleep(sweepInterval);
100             }catch(Throwable JavaDoc e){
101                 log.error("failed to complete a sweep for blocked clients",e);
102             }
103         }
104     }
105     public void start() throws Exception JavaDoc{
106         if(started.compareAndSet(false,true)){
107             runner=new Thread JavaDoc(this,"ActiveMQ Transport Status Monitor: "+connector);
108             runner.setDaemon(true);
109             runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
110             runner.start();
111         }
112     }
113     public void stop() throws Exception JavaDoc{
114         started.set(false);
115         if (runner != null) {
116             runner.join(getSweepInterval() * 5);
117         }
118     }
119 }
120
Popular Tags