1 18 package org.apache.activemq.broker; 19 20 import java.util.concurrent.CopyOnWriteArraySet ; 21 import java.util.concurrent.atomic.AtomicBoolean ; 22 23 import org.apache.activemq.Service; 24 import org.apache.activemq.ThreadPriorities; 25 import org.apache.commons.logging.Log; 26 import org.apache.commons.logging.LogFactory; 27 28 import java.util.Iterator ; 29 import java.util.Set ; 30 35 public class TransportStatusDetector implements Service,Runnable { 36 private static final Log log=LogFactory.getLog(TransportStatusDetector.class); 37 private TransportConnector connector; 38 private Set collectionCandidates=new CopyOnWriteArraySet (); 39 private AtomicBoolean started=new AtomicBoolean (false); 40 private Thread runner; 41 private int sweepInterval=5000; 42 43 TransportStatusDetector(TransportConnector connector){ 44 this.connector=connector; 45 } 46 49 public int getSweepInterval(){ 50 return sweepInterval; 51 } 52 53 58 public void setSweepInterval(int sweepInterval){ 59 this.sweepInterval=sweepInterval; 60 } 61 62 protected void doCollection(){ 63 for(Iterator i=collectionCandidates.iterator();i.hasNext();){ 64 TransportConnection tc=(TransportConnection) i.next(); 65 if(tc.isMarkedCandidate()){ 66 if(tc.isBlockedCandidate()){ 67 collectionCandidates.remove(tc); 68 doCollection(tc); 69 }else{ 70 tc.doMark(); 71 } 72 }else{ 73 collectionCandidates.remove(tc); 74 } 75 } 76 } 77 protected void doSweep(){ 78 for(Iterator i=connector.getConnections().iterator();i.hasNext();){ 79 TransportConnection connection=(TransportConnection) i.next(); 80 if(connection.isMarkedCandidate()){ 81 connection.doMark(); 82 collectionCandidates.add(connection); 83 } 84 } 85 } 86 protected void doCollection(TransportConnection tc){ 87 log.warn("found a blocked client - stopping: "+tc); 88 try{ 89 tc.stop(); 90 }catch(Exception e){ 91 log.error("Error stopping "+tc,e); 92 } 93 } 94 public void run(){ 95 while(started.get()){ 96 try{ 97 doCollection(); 98 doSweep(); 99 Thread.sleep(sweepInterval); 100 }catch(Throwable e){ 101 log.error("failed to complete a sweep for blocked clients",e); 102 } 103 } 104 } 105 public void start() throws Exception { 106 if(started.compareAndSet(false,true)){ 107 runner=new Thread (this,"ActiveMQ Transport Status Monitor: "+connector); 108 runner.setDaemon(true); 109 runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT); 110 runner.start(); 111 } 112 } 113 public void stop() throws Exception { 114 started.set(false); 115 if (runner != null) { 116 runner.join(getSweepInterval() * 5); 117 } 118 } 119 } 120 | Popular Tags |