1 package org.jgroups.tests; 2 3 import java.io.IOException ; 4 import java.io.InputStream ; 5 import java.io.ObjectInputStream ; 6 import java.io.ObjectOutputStream ; 7 import java.io.OutputStream ; 8 import java.util.ArrayList ; 9 import java.util.HashMap ; 10 import java.util.List ; 11 import java.util.Map ; 12 import java.util.concurrent.Semaphore ; 13 14 import junit.framework.Test; 15 import junit.framework.TestCase; 16 import junit.framework.TestSuite; 17 18 import org.jgroups.*; 19 import org.jgroups.util.Util; 20 21 22 30 public class StreamingStateTransferTest extends ChannelTestBase 31 { 32 33 34 public void setUp() throws Exception 35 { 36 super.setUp(); 37 CHANNEL_CONFIG = System.getProperty("channel.conf.flush", "flush-udp.xml"); 38 } 39 40 public boolean useBlocking() 41 { 42 return true; 43 } 44 45 public void testTransfer() 46 { 47 String channelNames [] = null; 48 if(isMuxChannelUsed()) 50 { 51 channelNames = createMuxApplicationNames(1); 52 } 53 else 54 { 55 channelNames = new String []{"A", "B", "C", "D"}; 56 } 57 transferHelper(channelNames,false); 58 } 59 60 public void testRpcChannelTransfer() 61 { 62 if(!isMuxChannelUsed()) 64 { 65 String channelNames []= new String []{"A", "B", "C", "D"}; 66 transferHelper(channelNames,true); 67 } 68 } 69 70 public void testMultipleServiceMuxChannel() 71 { 72 String channelNames [] = null; 73 if(isMuxChannelUsed()) 75 { 76 channelNames = createMuxApplicationNames(2); 77 transferHelper(channelNames,false); 78 } 79 } 80 81 public void transferHelper(String channelNames[], boolean useDispatcher) 82 { 83 transferHelper(channelNames,false,false,useDispatcher); 84 } 85 86 public void transferHelper(String channelNames[],boolean crash, boolean largeTransfer,boolean useDispatcher) 87 { 88 int channelCount = channelNames.length; 89 ArrayList channels = new ArrayList (channelCount); 90 91 Semaphore semaphore = new Semaphore (channelCount); 93 94 try 95 { 96 97 takeAllPermits(semaphore, channelCount); 98 boolean crashed = false; 99 for (int i = 0; i < channelCount; i++) 101 { 102 StreamingStateTransferApplication channel = null; 103 if(isMuxChannelUsed()) 104 { 105 channel = new StreamingStateTransferApplication(channelNames[i],muxFactory[i%getMuxFactoryCount()],semaphore,largeTransfer); 106 } 107 else 108 { 109 channel = new StreamingStateTransferApplication(channelNames[i],semaphore,useDispatcher,largeTransfer); 110 } 111 112 channels.add(channel); 114 semaphore.release(1); 115 channel.start(); 116 Util.sleep(2000); 117 118 if(crash && !crashed && i>2) 119 { 120 StreamingStateTransferApplication coord = (StreamingStateTransferApplication) channels.remove(0); 121 coord.cleanup(); 122 crashed = true; 123 } 124 } 125 126 127 if(isMuxChannelUsed()) 128 { 129 blockUntilViewsReceived(channels,getMuxFactoryCount(), 60000); 130 } 131 else 132 { 133 blockUntilViewsReceived(channels, 60000); 134 } 135 136 137 acquireSemaphore(semaphore, 60000, channelCount); 140 141 int getStateInvokedCount = 0; 142 int setStateInvokedCount = 0; 143 int partialGetStateInvokedCount = 0; 144 int partialSetStateInvokedCount = 0; 145 146 Util.sleep(3000); 147 for (int i = 0; i < channels.size(); i++) 148 { 149 StreamingStateTransferApplication current = (StreamingStateTransferApplication) channels.get(i); 150 if(current.getStateInvoked) 151 { 152 getStateInvokedCount++; 153 } 154 if(current.setStateInvoked) 155 { 156 setStateInvokedCount++; 157 } 158 if(current.partialGetStateInvoked) 159 { 160 partialGetStateInvokedCount++; 161 } 162 if(current.partialSetStateInvoked) 163 { 164 partialSetStateInvokedCount++; 165 } 166 Map map = current.getMap(); 167 for (int j = 0; j < channels.size(); j++) 168 { 169 StreamingStateTransferApplication app = (StreamingStateTransferApplication) channels.get(j); 170 List l = (List ) map.get(app.getLocalAddress()); 171 int size = l!=null?l.size():0; 172 assertEquals("Correct element count in map ",StreamingStateTransferApplication.COUNT,size); 173 } 174 } 175 if(isMuxChannelUsed()) 176 { 177 int factor = channelCount/getMuxFactoryCount(); 178 assertEquals("Correct invocation count of getState ",1*factor, getStateInvokedCount); 179 assertEquals("Correct invocation count of setState ",(channelCount/factor)-1,setStateInvokedCount/factor); 180 assertEquals("Correct invocation count of partial getState ",1*factor, partialGetStateInvokedCount); 181 assertEquals("Correct invocation count of partial setState ",(channelCount/factor)-1,partialSetStateInvokedCount/factor); 182 } 183 else 184 { 185 assertEquals("Correct invocation count of getState ",1, getStateInvokedCount); 186 assertEquals("Correct invocation count of setState ",channelCount-1,setStateInvokedCount); 187 assertEquals("Correct invocation count of partial getState ",1, partialGetStateInvokedCount); 188 assertEquals("Correct invocation count of partial setState ",channelCount-1,partialSetStateInvokedCount); 189 } 190 191 } 192 catch (Exception ex) 193 { 194 log.warn(ex); 195 } 196 finally 197 { 198 for (int i = 0; i < channels.size(); i++) 199 { 200 StreamingStateTransferApplication app = (StreamingStateTransferApplication) channels.get(i); 201 Util.sleep(500); 202 app.cleanup(); 203 } 204 } 205 } 206 207 protected class StreamingChannelTestFactory extends DefaultChannelTestFactory 208 { 209 public JChannel createChannel(Object id) throws Exception 210 { 211 return createChannel(CHANNEL_CONFIG, true); 212 } 213 } 214 215 protected class StreamingStateTransferApplication extends PushChannelApplicationWithSemaphore 216 { 217 private final Map stateMap = new HashMap (); 218 219 public static final int COUNT = 25; 220 221 boolean partialSetStateInvoked = false; 222 223 boolean partialGetStateInvoked = false; 224 225 boolean setStateInvoked = false; 226 227 boolean getStateInvoked = false; 228 229 boolean largeTransfer = false; 230 231 public StreamingStateTransferApplication(String name, Semaphore s,boolean useDispatcher,boolean largeTransfer) throws Exception 232 { 233 super(name,new StreamingChannelTestFactory(),s,useDispatcher); 234 this.largeTransfer = largeTransfer; 235 channel.connect("test"); 236 } 237 238 public StreamingStateTransferApplication(String name, JChannelFactory factory,Semaphore s,boolean largeTransfer) throws Exception 239 { 240 super(name,factory,s); 241 this.largeTransfer = largeTransfer; 242 channel.connect("test"); 243 } 244 245 public void receive(Message msg) 246 { 247 Address sender = msg.getSrc(); 248 synchronized(stateMap) 249 { 250 List list = (List ) stateMap.get(sender); 251 if(list == null) 252 { 253 list = new ArrayList (); 254 stateMap.put(sender, list); 255 } 256 list.add(msg.getObject()); 257 } 258 } 259 260 public Map getMap() 261 { 262 return stateMap; 263 } 264 265 public void useChannel() throws Exception 266 { 267 for(int i = 0;i < COUNT;i++) 268 { 269 channel.send(null,null,new Integer (i)); 270 } 271 channel.getState(null, 25000); 272 channel.getState(null, name, 25000); 273 } 274 275 public void getState(OutputStream ostream) 276 { 277 if(largeTransfer) 278 Util.sleep(4000); 279 280 super.getState(ostream); 281 ObjectOutputStream oos = null; 282 try 283 { 284 oos = new ObjectOutputStream (ostream); 285 HashMap copy = null; 286 synchronized (stateMap) 287 { 288 copy = new HashMap (stateMap); 289 } 290 oos.writeObject(copy); 291 oos.flush(); 292 } 293 catch (IOException e) 294 { 295 e.printStackTrace(); 296 } 297 finally 298 { 299 getStateInvoked = true; 300 Util.close(oos); 301 } 302 } 303 304 public byte[] getState() 305 { 306 if(largeTransfer) 307 Util.sleep(4000); 308 309 byte[] result = null; 310 try 311 { 312 synchronized (stateMap) 313 { 314 result = Util.objectToByteBuffer(stateMap); 315 } 316 } 317 catch (Exception e) 318 { 319 e.printStackTrace(); 320 } 321 finally 322 { 323 getStateInvoked = true; 324 } 325 return result; 326 } 327 328 public void setState(byte [] state) 329 { 330 if(largeTransfer) 331 Util.sleep(4000); 332 333 Map result = null; 334 try 335 { 336 result = (Map ) Util.objectFromByteBuffer(state); 337 } 338 catch (Exception e) 339 { 340 e.printStackTrace(); 341 } 342 finally 343 { 344 setStateInvoked = true; 345 } 346 synchronized(stateMap) 347 { 348 stateMap.clear(); 349 stateMap.putAll(result); 350 } 351 } 352 353 public void setState(InputStream istream) 354 { 355 if(largeTransfer) 356 Util.sleep(4000); 357 358 super.setState(istream); 359 ObjectInputStream ois = null; 360 try 361 { 362 ois = new ObjectInputStream (istream); 363 Map map = (Map ) ois.readObject(); 364 synchronized (stateMap) 365 { 366 stateMap.clear(); 367 stateMap.putAll(map); 368 } 369 } 370 catch (Exception e) 371 { 372 e.printStackTrace(); 373 } 374 finally 375 { 376 setStateInvoked = true; 377 Util.close(ois); 378 } 379 } 380 381 public void setState(String stateId,byte [] state) 382 { 383 if(largeTransfer) 384 Util.sleep(4000); 385 386 Object nameTransfer = null; 387 try 388 { 389 nameTransfer = Util.objectFromByteBuffer(state); 390 TestCase.assertEquals("Got partial state requested ", nameTransfer, name); 391 } 392 catch (Exception e) 393 { 394 e.printStackTrace(); 395 } 396 finally 397 { 398 partialSetStateInvoked = true; 399 } 400 } 401 402 public byte[] getState(String stateId) 403 { 404 if(largeTransfer) 405 Util.sleep(4000); 406 407 byte[] result = null; 408 try 409 { 410 result = Util.objectToByteBuffer(stateId); 411 } 412 catch (Exception e) 413 { 414 e.printStackTrace(); 415 } 416 finally 417 { 418 partialGetStateInvoked = true; 419 } 420 return result; 421 } 422 423 public void setState(String state_id, InputStream istream) 424 { 425 if(largeTransfer) 426 Util.sleep(4000); 427 428 super.setState(state_id, istream); 429 ObjectInputStream ois = null; 430 try 431 { 432 ois = new ObjectInputStream (istream); 433 TestCase.assertEquals("Got partial state requested ", ois.readObject(), name); 434 } 435 catch (Exception e) 436 { 437 e.printStackTrace(); 438 } 439 finally 440 { 441 partialSetStateInvoked = true; 442 Util.close(ois); 443 } 444 } 445 446 public void getState(String state_id, OutputStream ostream) 447 { 448 if(largeTransfer) 449 Util.sleep(4000); 450 451 super.getState(state_id, ostream); 452 ObjectOutputStream oos = null; 453 try 454 { 455 oos = new ObjectOutputStream (ostream); 456 oos.writeObject(state_id); 457 oos.flush(); 458 } 459 catch (IOException e) 460 { 461 e.printStackTrace(); 462 } 463 finally 464 { 465 partialGetStateInvoked = true; 466 Util.close(oos); 467 } 468 } 469 } 470 471 public static Test suite() 472 { 473 return new TestSuite(StreamingStateTransferTest.class); 474 } 475 476 public static void main(String [] args) 477 { 478 String [] testCaseName = {StreamingStateTransferTest.class.getName()}; 479 junit.textui.TestRunner.main(testCaseName); 480 } 481 } 482 | Popular Tags |