1 18 package org.apache.activemq.transport; 19 20 import org.apache.activemq.command.KeepAliveInfo; 21 import org.apache.activemq.command.WireFormatInfo; 22 import org.apache.activemq.thread.Scheduler; 23 import org.apache.commons.logging.Log; 24 import org.apache.commons.logging.LogFactory; 25 26 import java.io.IOException ; 27 import java.util.concurrent.atomic.AtomicBoolean ; 28 29 34 public class InactivityMonitor extends TransportFilter { 35 36 private final Log log = LogFactory.getLog(InactivityMonitor.class); 37 38 private WireFormatInfo localWireFormatInfo; 39 private WireFormatInfo remoteWireFormatInfo; 40 private final AtomicBoolean monitorStarted= new AtomicBoolean (false); 41 42 private final AtomicBoolean commandSent=new AtomicBoolean (false); 43 private final AtomicBoolean inSend=new AtomicBoolean (false); 44 45 private final AtomicBoolean commandReceived=new AtomicBoolean (true); 46 private final AtomicBoolean inReceive=new AtomicBoolean (false); 47 48 private final Runnable readChecker = new Runnable () { 49 public void run() { 50 readCheck(); 51 } 52 }; 53 54 private final Runnable writeChecker = new Runnable () { 55 public void run() { 56 writeCheck(); 57 } 58 }; 59 60 61 public InactivityMonitor(Transport next) { 62 super(next); 63 } 64 65 public void stop() throws Exception { 66 stopMonitorThreads(); 67 next.stop(); 68 } 69 70 71 private void writeCheck() { 72 if( inSend.get() ) { 73 log.trace("A send is in progress"); 74 return; 75 } 76 77 if( !commandSent.get() ) { 78 log.trace("No message sent since last write check, sending a KeepAliveInfo"); 79 try { 80 next.oneway(new KeepAliveInfo()); 81 } catch (IOException e) { 82 onException(e); 83 } 84 } else { 85 log.trace("Message sent since last write check, resetting flag"); 86 } 87 88 commandSent.set(false); 89 90 } 91 92 private void readCheck() { 93 if( inReceive.get() ) { 94 log.trace("A receive is in progress"); 95 return; 96 } 97 98 if( !commandReceived.get() ) { 99 log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); 100 onException(new InactivityIOException("Channel was inactive for too long.")); 101 } else { 102 log.trace("Message received since last read check, resetting flag: "); 103 } 104 105 commandReceived.set(false); 106 } 107 108 public void onCommand(Object command) { 109 inReceive.set(true); 110 try { 111 if( command.getClass() == WireFormatInfo.class ) { 112 synchronized( this ) { 113 remoteWireFormatInfo = (WireFormatInfo) command; 114 try { 115 startMonitorThreads(); 116 } catch (IOException e) { 117 onException(e); 118 } 119 } 120 } 121 transportListener.onCommand(command); 122 } finally { 123 inReceive.set(false); 124 commandReceived.set(true); 125 } 126 } 127 128 129 public void oneway(Object o) throws IOException { 130 inSend.set(true); 132 commandSent.set(true); 133 try { 134 if( o.getClass() == WireFormatInfo.class ) { 135 synchronized( this ) { 136 localWireFormatInfo = (WireFormatInfo) o; 137 startMonitorThreads(); 138 } 139 } 140 next.oneway(o); 141 } finally { 142 inSend.set(false); 143 } 144 } 145 146 public void onException(IOException error) { 147 if( monitorStarted.get() ) { 148 stopMonitorThreads(); 149 } 150 getTransportListener().onException(error); 151 } 152 153 154 synchronized private void startMonitorThreads() throws IOException { 155 if( monitorStarted.get() ) 156 return; 157 if( localWireFormatInfo == null ) 158 return; 159 if( remoteWireFormatInfo == null ) 160 return; 161 162 long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration()); 163 if( l > 0 ) { 164 monitorStarted.set(true); 165 Scheduler.executePeriodically(writeChecker, l/2); 166 Scheduler.executePeriodically(readChecker, l); 167 } 168 } 169 170 173 synchronized private void stopMonitorThreads() { 174 if( monitorStarted.compareAndSet(true, false) ) { 175 Scheduler.cancel(readChecker); 176 Scheduler.cancel(writeChecker); 177 } 178 } 179 180 181 } 182 | Popular Tags |