1 17 package org.apache.excalibur.event.test; 18 19 import junit.framework.TestCase; 20 21 import org.apache.excalibur.event.Queue; 22 import org.apache.excalibur.event.Sink; 23 import org.apache.excalibur.event.SinkException; 24 import org.apache.excalibur.event.Source; 25 import org.apache.excalibur.event.impl.DefaultQueue; 26 27 33 public class ThreadedQueueTestCase 34 extends TestCase 35 { 36 private QueueStart start; 37 private QueueEnd end; 38 39 private Queue queue; 40 41 private Thread [] stages; 42 43 public ThreadedQueueTestCase( String name ) 44 { 45 super( name ); 46 } 47 48 public void testThreaded() throws Exception 49 { 50 initialize( 10000, 1 ); 51 start(); 52 53 initialize( 10000, 1000 ); 54 start(); 55 56 initialize( 20000, 1000 ); 57 start(); 58 59 initialize( 30000, 1000 ); 60 start(); 61 } 62 63 public void initialize( int count, long timeout ) throws Exception 64 { 65 this.stages = new Thread [ 2 ]; 66 67 this.queue = new DefaultQueue(); 68 this.queue.setTimeout( timeout ); 69 70 this.start = new QueueStart( count ); 71 this.start.setSink( this.queue ); 72 this.stages[ 0 ] = new Thread ( this.start ); 73 74 this.end = new QueueEnd(); 75 this.end.setSource( this.queue ); 76 this.end.setTimeout( timeout ); 77 this.stages[ 1 ] = new Thread ( this.end ); 78 } 79 80 public void start() throws Exception 81 { 82 87 88 for( int i = 0; i < this.stages.length; i++ ) 89 { 90 this.stages[ i ].start(); 91 } 92 93 stop(); 94 } 95 96 public void stop() throws Exception 97 { 98 for( int i = 0; i < this.stages.length; i++ ) 99 { 100 try 101 { 102 this.stages[ i ].join(); 103 } 104 catch( InterruptedException e ) 105 { 106 throw new RuntimeException ( "Stage unexpectedly interrupted: " + e ); 107 } 108 } 109 110 121 122 assertEquals( this.start.getCount(), this.end.getCount() ); 123 assertEquals( this.start.getSum(), this.end.getSum() ); 124 } 125 126 private class QueueInteger 127 { 128 private int integer; 129 130 public QueueInteger( int integer ) 131 { 132 this.integer = integer; 133 } 134 135 public int getInteger() 136 { 137 return integer; 138 } 139 } 140 141 private class QueueStart implements Runnable 142 { 143 private Sink sink; 144 private int queueCount; 145 private int count; 146 private long sum = 0; 147 148 public QueueStart( int queueCount ) 149 { 150 this.queueCount = queueCount; 151 } 152 153 protected void setSink( Sink sink ) 154 { 155 this.sink = sink; 156 } 157 158 public int getCount() 159 { 160 return count; 161 } 162 163 public long getSum() 164 { 165 return sum; 166 } 167 168 public void run() 169 { 170 for( int i = 0; i < this.queueCount; i++ ) 171 { 172 try 173 { 174 this.sink.enqueue( new QueueInteger( i ) ); 175 this.count++; 176 sum = sum * 127 + i; 177 } 178 catch( SinkException e ) 179 { 180 System.out.println( "Unable to queue: " + e.getMessage() ); 181 } 182 } 183 184 try 185 { 186 this.sink.enqueue( new QueueInteger( -1 ) ); 187 } 188 catch( SinkException e ) 189 { 190 System.out.println( "Unable to queue stop" ); 191 } 192 } 193 } 194 195 private class QueueEnd implements Runnable 196 { 197 private Source source; 198 private int count; 199 private long timeout = 0; 200 private long sum = 0; 201 202 protected void setTimeout( long timeout ) 203 { 204 this.timeout = timeout; 205 } 206 207 protected void setSource( Source source ) 208 { 209 this.source = source; 210 } 211 212 public int getCount() 213 { 214 return count; 215 } 216 217 public long getSum() 218 { 219 return sum; 220 } 221 222 public void run() 223 { 224 while( true ) 225 { 226 Object qe = this.source.dequeue(); 227 228 if( qe == null ) 229 { 230 if( timeout > 0 ) 231 { 232 try 233 { 234 Thread.sleep( timeout ); 235 } 236 catch( InterruptedException ie ) 237 { 238 break; 239 } 240 } 241 } 242 else if( qe instanceof QueueInteger ) 243 { 244 QueueInteger qi = (QueueInteger)qe; 245 246 if( qi.getInteger() == -1 ) 247 { 248 break; 249 } 250 else 251 { 252 this.count++; 253 sum = sum * 127 + qi.getInteger(); 254 } 255 } 256 } 257 } 258 } 259 } 260 261 | Popular Tags |