KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > InactivityMonitor


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport;
19
20 import org.apache.activemq.command.KeepAliveInfo;
21 import org.apache.activemq.command.WireFormatInfo;
22 import org.apache.activemq.thread.Scheduler;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25
26 import java.io.IOException JavaDoc;
27 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
28
29 /**
30  * Used to make sure that commands are arriving periodically from the peer of the transport.
31  *
32  * @version $Revision$
33  */

34 public class InactivityMonitor extends TransportFilter {
35
36     private final Log log = LogFactory.getLog(InactivityMonitor.class);
37     
38     private WireFormatInfo localWireFormatInfo;
39     private WireFormatInfo remoteWireFormatInfo;
40     private final AtomicBoolean JavaDoc monitorStarted= new AtomicBoolean JavaDoc(false);
41
42     private final AtomicBoolean JavaDoc commandSent=new AtomicBoolean JavaDoc(false);
43     private final AtomicBoolean JavaDoc inSend=new AtomicBoolean JavaDoc(false);
44
45     private final AtomicBoolean JavaDoc commandReceived=new AtomicBoolean JavaDoc(true);
46     private final AtomicBoolean JavaDoc inReceive=new AtomicBoolean JavaDoc(false);
47     
48     private final Runnable JavaDoc readChecker = new Runnable JavaDoc() {
49         public void run() {
50             readCheck();
51         }
52     };
53     
54     private final Runnable JavaDoc writeChecker = new Runnable JavaDoc() {
55         public void run() {
56             writeCheck();
57         }
58     };
59     
60     
61     public InactivityMonitor(Transport next) {
62         super(next);
63     }
64
65     public void stop() throws Exception JavaDoc {
66         stopMonitorThreads();
67         next.stop();
68     }
69
70         
71     private void writeCheck() {
72         if( inSend.get() ) {
73             log.trace("A send is in progress");
74             return;
75         }
76         
77         if( !commandSent.get() ) {
78             log.trace("No message sent since last write check, sending a KeepAliveInfo");
79             try {
80                 next.oneway(new KeepAliveInfo());
81             } catch (IOException JavaDoc e) {
82                 onException(e);
83             }
84         } else {
85             log.trace("Message sent since last write check, resetting flag");
86         }
87         
88         commandSent.set(false);
89         
90     }
91
92     private void readCheck() {
93         if( inReceive.get() ) {
94             log.trace("A receive is in progress");
95             return;
96         }
97         
98         if( !commandReceived.get() ) {
99             log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
100             onException(new InactivityIOException("Channel was inactive for too long."));
101         } else {
102             log.trace("Message received since last read check, resetting flag: ");
103         }
104         
105         commandReceived.set(false);
106     }
107
108     public void onCommand(Object JavaDoc command) {
109         inReceive.set(true);
110         try {
111             if( command.getClass() == WireFormatInfo.class ) {
112                 synchronized( this ) {
113                     remoteWireFormatInfo = (WireFormatInfo) command;
114                     try {
115                         startMonitorThreads();
116                     } catch (IOException JavaDoc e) {
117                         onException(e);
118                     }
119                 }
120             }
121             transportListener.onCommand(command);
122         } finally {
123             inReceive.set(false);
124             commandReceived.set(true);
125         }
126     }
127
128     
129     public void oneway(Object JavaDoc o) throws IOException JavaDoc {
130         // Disable inactivity monitoring while processing a command.
131
inSend.set(true);
132         commandSent.set(true);
133         try {
134             if( o.getClass() == WireFormatInfo.class ) {
135                 synchronized( this ) {
136                     localWireFormatInfo = (WireFormatInfo) o;
137                     startMonitorThreads();
138                 }
139             }
140             next.oneway(o);
141         } finally {
142             inSend.set(false);
143         }
144     }
145     
146     public void onException(IOException JavaDoc error) {
147         if( monitorStarted.get() ) {
148             stopMonitorThreads();
149         }
150         getTransportListener().onException(error);
151     }
152     
153     
154     synchronized private void startMonitorThreads() throws IOException JavaDoc {
155         if( monitorStarted.get() )
156             return;
157         if( localWireFormatInfo == null )
158             return;
159         if( remoteWireFormatInfo == null )
160             return;
161         
162         long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
163         if( l > 0 ) {
164             monitorStarted.set(true);
165             Scheduler.executePeriodically(writeChecker, l/2);
166             Scheduler.executePeriodically(readChecker, l);
167         }
168     }
169     
170     /**
171      *
172      */

173     synchronized private void stopMonitorThreads() {
174         if( monitorStarted.compareAndSet(true, false) ) {
175             Scheduler.cancel(readChecker);
176             Scheduler.cancel(writeChecker);
177         }
178     }
179     
180
181 }
182
Popular Tags