KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > tcp > InactivityMonitorTest


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.activemq.transport.tcp;
18
19 import java.io.IOException JavaDoc;
20 import java.net.URI JavaDoc;
21 import java.net.URISyntaxException JavaDoc;
22
23 import org.apache.activemq.CombinationTestSupport;
24 import org.apache.activemq.command.WireFormatInfo;
25 import org.apache.activemq.openwire.OpenWireFormat;
26 import org.apache.activemq.transport.Transport;
27 import org.apache.activemq.transport.TransportAcceptListener;
28 import org.apache.activemq.transport.TransportFactory;
29 import org.apache.activemq.transport.TransportListener;
30 import org.apache.activemq.transport.TransportServer;
31
32 import javax.net.SocketFactory;
33
34 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
35 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
36
37
38 public class InactivityMonitorTest extends CombinationTestSupport implements TransportAcceptListener {
39     
40     private TransportServer server;
41     private Transport clientTransport;
42     private Transport serverTransport;
43     
44     private final AtomicInteger JavaDoc clientReceiveCount = new AtomicInteger JavaDoc(0);
45     private final AtomicInteger JavaDoc clientErrorCount = new AtomicInteger JavaDoc(0);
46     private final AtomicInteger JavaDoc serverReceiveCount = new AtomicInteger JavaDoc(0);
47     private final AtomicInteger JavaDoc serverErrorCount = new AtomicInteger JavaDoc(0);
48     
49     private final AtomicBoolean JavaDoc ignoreClientError = new AtomicBoolean JavaDoc(false);
50     private final AtomicBoolean JavaDoc ignoreServerError = new AtomicBoolean JavaDoc(false);
51     
52     public Runnable JavaDoc serverRunOnCommand;
53     public Runnable JavaDoc clientRunOnCommand;
54     
55     protected void setUp() throws Exception JavaDoc {
56         super.setUp();
57         startTransportServer();
58     }
59
60     /**
61      * @throws Exception
62      * @throws URISyntaxException
63      */

64     private void startClient() throws Exception JavaDoc, URISyntaxException JavaDoc {
65         clientTransport = TransportFactory.connect(new URI JavaDoc("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000"));
66         clientTransport.setTransportListener(new TransportListener() {
67             public void onCommand(Object JavaDoc command) {
68                 clientReceiveCount.incrementAndGet();
69                 if( clientRunOnCommand !=null ) {
70                     clientRunOnCommand.run();
71                 }
72             }
73             public void onException(IOException JavaDoc error) {
74                 if( !ignoreClientError.get() ) {
75                     log.info("Client transport error:");
76                     error.printStackTrace();
77                     clientErrorCount.incrementAndGet();
78                 }
79             }
80             public void transportInterupted() {
81             }
82             public void transportResumed() {
83             }});
84         clientTransport.start();
85     }
86
87     /**
88      * @throws IOException
89      * @throws URISyntaxException
90      * @throws Exception
91      */

92     private void startTransportServer() throws IOException JavaDoc, URISyntaxException JavaDoc, Exception JavaDoc {
93         server = TransportFactory.bind("localhost", new URI JavaDoc("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000"));
94         server.setAcceptListener(this);
95         server.start();
96     }
97     
98     protected void tearDown() throws Exception JavaDoc {
99         ignoreClientError.set(true);
100         ignoreServerError.set(true);
101         try {
102             if( clientTransport!=null )
103                 clientTransport.stop();
104             if( serverTransport!=null )
105                 serverTransport.stop();
106             if( server!=null )
107                 server.stop();
108         } catch (Throwable JavaDoc e) {
109             e.printStackTrace();
110         }
111         super.tearDown();
112     }
113     
114     public void onAccept(Transport transport) {
115         try {
116             log.info("["+getName()+"] Server Accepted a Connection");
117             serverTransport = transport;
118             serverTransport.setTransportListener(new TransportListener() {
119                 public void onCommand(Object JavaDoc command) {
120                     serverReceiveCount.incrementAndGet();
121                     if( serverRunOnCommand !=null ) {
122                         serverRunOnCommand.run();
123                     }
124                 }
125                 public void onException(IOException JavaDoc error) {
126                     if( !ignoreClientError.get() ) {
127                         log.info("Server transport error:");
128                         error.printStackTrace();
129                         serverErrorCount.incrementAndGet();
130                     }
131                 }
132                 public void transportInterupted() {
133                 }
134                 public void transportResumed() {
135                 }});
136             serverTransport.start();
137         } catch (Exception JavaDoc e) {
138             e.printStackTrace();
139         }
140     }
141
142     public void onAcceptError(Exception JavaDoc error) {
143         error.printStackTrace();
144     }
145
146     public void testClientHang() throws Exception JavaDoc {
147         
148         //
149
// Manually create a client transport so that it does not send KeepAlive packets.
150
// this should simulate a client hang.
151
clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI JavaDoc("tcp://localhost:61616"), null);
152         clientTransport.setTransportListener(new TransportListener() {
153             public void onCommand(Object JavaDoc command) {
154                 clientReceiveCount.incrementAndGet();
155                 if( clientRunOnCommand !=null ) {
156                     clientRunOnCommand.run();
157                 }
158             }
159             public void onException(IOException JavaDoc error) {
160                 if( !ignoreClientError.get() ) {
161                     log.info("Client transport error:");
162                     error.printStackTrace();
163                     clientErrorCount.incrementAndGet();
164                 }
165             }
166             public void transportInterupted() {
167             }
168             public void transportResumed() {
169             }});
170         clientTransport.start();
171         WireFormatInfo info = new WireFormatInfo();
172         info.seMaxInactivityDuration(1000);
173         clientTransport.oneway(info);
174         
175         assertEquals(0, serverErrorCount.get());
176         assertEquals(0, clientErrorCount.get());
177         
178         // Server should consider the client timed out right away since the client is not hart beating fast enough.
179
Thread.sleep(3000);
180         
181         assertEquals(0, clientErrorCount.get());
182         assertTrue(serverErrorCount.get()>0);
183     }
184     
185     public void testNoClientHang() throws Exception JavaDoc {
186         startClient();
187         
188         assertEquals(0, serverErrorCount.get());
189         assertEquals(0, clientErrorCount.get());
190         
191         Thread.sleep(4000);
192         
193         assertEquals(0, clientErrorCount.get());
194         assertEquals(0, serverErrorCount.get());
195     }
196
197     /**
198      * Used to test when a operation blocks. This should
199      * not cause transport to get disconnected.
200      * @throws Exception
201      * @throws URISyntaxException
202      */

203     public void initCombosForTestNoClientHangWithServerBlock() throws Exception JavaDoc {
204         
205         startClient();
206
207         addCombinationValues("clientInactivityLimit", new Object JavaDoc[] { new Long JavaDoc(1000)});
208         addCombinationValues("serverInactivityLimit", new Object JavaDoc[] { new Long JavaDoc(1000)});
209         addCombinationValues("serverRunOnCommand", new Object JavaDoc[] { new Runnable JavaDoc() {
210                 public void run() {
211                     try {
212                         log.info("Sleeping");
213                         Thread.sleep(4000);
214                     } catch (InterruptedException JavaDoc e) {
215                     }
216                 }
217             }});
218     }
219     
220     public void testNoClientHangWithServerBlock() throws Exception JavaDoc {
221         
222         startClient();
223
224         assertEquals(0, serverErrorCount.get());
225         assertEquals(0, clientErrorCount.get());
226         
227         Thread.sleep(4000);
228         
229         assertEquals(0, clientErrorCount.get());
230         assertEquals(0, serverErrorCount.get());
231     }
232
233 }
234
Popular Tags