KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > StreamingStateTransferTest


1 package org.jgroups.tests;
2
3 import java.io.IOException JavaDoc;
4 import java.io.InputStream JavaDoc;
5 import java.io.ObjectInputStream JavaDoc;
6 import java.io.ObjectOutputStream JavaDoc;
7 import java.io.OutputStream JavaDoc;
8 import java.util.ArrayList JavaDoc;
9 import java.util.HashMap JavaDoc;
10 import java.util.List JavaDoc;
11 import java.util.Map JavaDoc;
12 import java.util.concurrent.Semaphore JavaDoc;
13
14 import junit.framework.Test;
15 import junit.framework.TestCase;
16 import junit.framework.TestSuite;
17
18 import org.jgroups.*;
19 import org.jgroups.util.Util;
20
21
22 /**
23  * Tests streaming state transfer.
24  *
25  *
26  * @author Vladimir Blagojevic
27  * @version $Id$
28  *
29  */

30 public class StreamingStateTransferTest extends ChannelTestBase
31 {
32    
33    
34    public void setUp() throws Exception JavaDoc
35    {
36       super.setUp();
37       CHANNEL_CONFIG = System.getProperty("channel.conf.flush", "flush-udp.xml");
38    }
39    
40    public boolean useBlocking()
41    {
42       return true;
43    }
44    
45    public void testTransfer()
46    {
47       String JavaDoc channelNames [] = null;
48       //mux applications on top of same channel have to have unique name
49
if(isMuxChannelUsed())
50       {
51          channelNames = createMuxApplicationNames(1);
52       }
53       else
54       {
55          channelNames = new String JavaDoc[]{"A", "B", "C", "D"};
56       }
57       transferHelper(channelNames,false);
58    }
59    
60    public void testRpcChannelTransfer()
61    {
62       //do this test for regular channels only
63
if(!isMuxChannelUsed())
64       {
65          String JavaDoc channelNames []= new String JavaDoc[]{"A", "B", "C", "D"};
66          transferHelper(channelNames,true);
67       }
68    }
69    
70    public void testMultipleServiceMuxChannel()
71    {
72       String JavaDoc channelNames [] = null;
73       //mux applications on top of same channel have to have unique name
74
if(isMuxChannelUsed())
75       {
76          channelNames = createMuxApplicationNames(2);
77          transferHelper(channelNames,false);
78       }
79    }
80    
81    public void transferHelper(String JavaDoc channelNames[], boolean useDispatcher)
82    {
83       transferHelper(channelNames,false,false,useDispatcher);
84    }
85
86    public void transferHelper(String JavaDoc channelNames[],boolean crash, boolean largeTransfer,boolean useDispatcher)
87    {
88       int channelCount = channelNames.length;
89       ArrayList JavaDoc channels = new ArrayList JavaDoc(channelCount);
90
91       //Create a semaphore and take all its tickets
92
Semaphore JavaDoc semaphore = new Semaphore JavaDoc(channelCount);
93       
94       try
95       {
96
97          takeAllPermits(semaphore, channelCount);
98          boolean crashed = false;
99          // Create activation threads that will block on the semaphore
100
for (int i = 0; i < channelCount; i++)
101          {
102             StreamingStateTransferApplication channel = null;
103             if(isMuxChannelUsed())
104             {
105                channel = new StreamingStateTransferApplication(channelNames[i],muxFactory[i%getMuxFactoryCount()],semaphore,largeTransfer);
106             }
107             else
108             {
109                channel = new StreamingStateTransferApplication(channelNames[i],semaphore,useDispatcher,largeTransfer);
110             }
111
112             // Start threads and let them join the channel
113
channels.add(channel);
114             semaphore.release(1);
115             channel.start();
116             Util.sleep(2000);
117             
118             if(crash && !crashed && i>2)
119             {
120                StreamingStateTransferApplication coord = (StreamingStateTransferApplication) channels.remove(0);
121                coord.cleanup();
122                crashed = true;
123             }
124          }
125          
126          
127          if(isMuxChannelUsed())
128          {
129             blockUntilViewsReceived(channels,getMuxFactoryCount(), 60000);
130          }
131          else
132          {
133             blockUntilViewsReceived(channels, 60000);
134          }
135          
136
137          //Reacquire the semaphore tickets; when we have them all
138
// we know the threads are done
139
acquireSemaphore(semaphore, 60000, channelCount);
140          
141          int getStateInvokedCount = 0;
142          int setStateInvokedCount = 0;
143          int partialGetStateInvokedCount = 0;
144          int partialSetStateInvokedCount = 0;
145          
146          Util.sleep(3000);
147          for (int i = 0; i < channels.size(); i++)
148          {
149             StreamingStateTransferApplication current = (StreamingStateTransferApplication) channels.get(i);
150             if(current.getStateInvoked)
151             {
152                getStateInvokedCount++;
153             }
154             if(current.setStateInvoked)
155             {
156                setStateInvokedCount++;
157             }
158             if(current.partialGetStateInvoked)
159             {
160                partialGetStateInvokedCount++;
161             }
162             if(current.partialSetStateInvoked)
163             {
164                partialSetStateInvokedCount++;
165             }
166             Map JavaDoc map = current.getMap();
167             for (int j = 0; j < channels.size(); j++)
168             {
169                StreamingStateTransferApplication app = (StreamingStateTransferApplication) channels.get(j);
170                List JavaDoc l = (List JavaDoc) map.get(app.getLocalAddress());
171                int size = l!=null?l.size():0;
172                assertEquals("Correct element count in map ",StreamingStateTransferApplication.COUNT,size);
173             }
174          }
175          if(isMuxChannelUsed())
176          {
177             int factor = channelCount/getMuxFactoryCount();
178             assertEquals("Correct invocation count of getState ",1*factor, getStateInvokedCount);
179             assertEquals("Correct invocation count of setState ",(channelCount/factor)-1,setStateInvokedCount/factor);
180             assertEquals("Correct invocation count of partial getState ",1*factor, partialGetStateInvokedCount);
181             assertEquals("Correct invocation count of partial setState ",(channelCount/factor)-1,partialSetStateInvokedCount/factor);
182          }
183          else
184          {
185             assertEquals("Correct invocation count of getState ",1, getStateInvokedCount);
186             assertEquals("Correct invocation count of setState ",channelCount-1,setStateInvokedCount);
187             assertEquals("Correct invocation count of partial getState ",1, partialGetStateInvokedCount);
188             assertEquals("Correct invocation count of partial setState ",channelCount-1,partialSetStateInvokedCount);
189          }
190                
191       }
192       catch (Exception JavaDoc ex)
193       {
194          log.warn(ex);
195       }
196       finally
197       {
198          for (int i = 0; i < channels.size(); i++)
199          {
200             StreamingStateTransferApplication app = (StreamingStateTransferApplication) channels.get(i);
201             Util.sleep(500);
202             app.cleanup();
203          }
204       }
205    }
206    
207    protected class StreamingChannelTestFactory extends DefaultChannelTestFactory
208    {
209       public JChannel createChannel(Object JavaDoc id) throws Exception JavaDoc
210       {
211          return createChannel(CHANNEL_CONFIG, true);
212       }
213    }
214
215    protected class StreamingStateTransferApplication extends PushChannelApplicationWithSemaphore
216    {
217       private final Map JavaDoc stateMap = new HashMap JavaDoc();
218       
219       public static final int COUNT = 25;
220       
221       boolean partialSetStateInvoked = false;
222
223       boolean partialGetStateInvoked = false;
224
225       boolean setStateInvoked = false;
226
227       boolean getStateInvoked = false;
228       
229       boolean largeTransfer = false;
230
231       public StreamingStateTransferApplication(String JavaDoc name, Semaphore JavaDoc s,boolean useDispatcher,boolean largeTransfer) throws Exception JavaDoc
232       {
233          super(name,new StreamingChannelTestFactory(),s,useDispatcher);
234          this.largeTransfer = largeTransfer;
235          channel.connect("test");
236       }
237       
238       public StreamingStateTransferApplication(String JavaDoc name, JChannelFactory factory,Semaphore JavaDoc s,boolean largeTransfer) throws Exception JavaDoc
239       {
240          super(name,factory,s);
241          this.largeTransfer = largeTransfer;
242          channel.connect("test");
243       }
244
245       public void receive(Message msg)
246       {
247          Address sender = msg.getSrc();
248          synchronized(stateMap)
249          {
250             List JavaDoc list = (List JavaDoc) stateMap.get(sender);
251             if(list == null)
252             {
253                list = new ArrayList JavaDoc();
254                stateMap.put(sender, list);
255             }
256             list.add(msg.getObject());
257          }
258       }
259       
260       public Map JavaDoc getMap()
261       {
262          return stateMap;
263       }
264
265       public void useChannel() throws Exception JavaDoc
266       {
267          for(int i = 0;i < COUNT;i++)
268          {
269             channel.send(null,null,new Integer JavaDoc(i));
270          }
271          channel.getState(null, 25000);
272          channel.getState(null, name, 25000);
273       }
274
275       public void getState(OutputStream JavaDoc ostream)
276       {
277          if(largeTransfer)
278             Util.sleep(4000);
279          
280          super.getState(ostream);
281          ObjectOutputStream JavaDoc oos = null;
282          try
283          {
284             oos = new ObjectOutputStream JavaDoc(ostream);
285             HashMap JavaDoc copy = null;
286             synchronized (stateMap)
287             {
288                copy = new HashMap JavaDoc(stateMap);
289             }
290             oos.writeObject(copy);
291             oos.flush();
292          }
293          catch (IOException JavaDoc e)
294          {
295             e.printStackTrace();
296          }
297          finally
298          {
299             getStateInvoked = true;
300             Util.close(oos);
301          }
302       }
303       
304       public byte[] getState()
305       {
306          if(largeTransfer)
307             Util.sleep(4000);
308          
309          byte[] result = null;
310          try
311          {
312             synchronized (stateMap)
313             {
314                result = Util.objectToByteBuffer(stateMap);
315             }
316          }
317          catch (Exception JavaDoc e)
318          {
319             e.printStackTrace();
320          }
321          finally
322          {
323             getStateInvoked = true;
324          }
325          return result;
326       }
327       
328       public void setState(byte [] state)
329       {
330          if(largeTransfer)
331             Util.sleep(4000);
332          
333          Map JavaDoc result = null;
334          try
335          {
336             result = (Map JavaDoc) Util.objectFromByteBuffer(state);
337          }
338          catch (Exception JavaDoc e)
339          {
340             e.printStackTrace();
341          }
342          finally
343          {
344             setStateInvoked = true;
345          }
346          synchronized(stateMap)
347          {
348             stateMap.clear();
349             stateMap.putAll(result);
350          }
351       }
352
353       public void setState(InputStream JavaDoc istream)
354       {
355          if(largeTransfer)
356             Util.sleep(4000);
357          
358          super.setState(istream);
359          ObjectInputStream JavaDoc ois = null;
360          try
361          {
362             ois = new ObjectInputStream JavaDoc(istream);
363             Map JavaDoc map = (Map JavaDoc) ois.readObject();
364             synchronized (stateMap)
365             {
366                stateMap.clear();
367                stateMap.putAll(map);
368             }
369          }
370          catch (Exception JavaDoc e)
371          {
372             e.printStackTrace();
373          }
374          finally
375          {
376             setStateInvoked = true;
377             Util.close(ois);
378          }
379       }
380       
381       public void setState(String JavaDoc stateId,byte [] state)
382       {
383          if(largeTransfer)
384             Util.sleep(4000);
385          
386          Object JavaDoc nameTransfer = null;
387          try
388          {
389             nameTransfer = Util.objectFromByteBuffer(state);
390             TestCase.assertEquals("Got partial state requested ", nameTransfer, name);
391          }
392          catch (Exception JavaDoc e)
393          {
394             e.printStackTrace();
395          }
396          finally
397          {
398             partialSetStateInvoked = true;
399          }
400       }
401       
402       public byte[] getState(String JavaDoc stateId)
403       {
404          if(largeTransfer)
405             Util.sleep(4000);
406          
407          byte[] result = null;
408          try
409          {
410             result = Util.objectToByteBuffer(stateId);
411          }
412          catch (Exception JavaDoc e)
413          {
414             e.printStackTrace();
415          }
416          finally
417          {
418             partialGetStateInvoked = true;
419          }
420          return result;
421       }
422
423       public void setState(String JavaDoc state_id, InputStream JavaDoc istream)
424       {
425          if(largeTransfer)
426             Util.sleep(4000);
427          
428          super.setState(state_id, istream);
429          ObjectInputStream JavaDoc ois = null;
430          try
431          {
432             ois = new ObjectInputStream JavaDoc(istream);
433             TestCase.assertEquals("Got partial state requested ", ois.readObject(), name);
434          }
435          catch (Exception JavaDoc e)
436          {
437             e.printStackTrace();
438          }
439          finally
440          {
441             partialSetStateInvoked = true;
442             Util.close(ois);
443          }
444       }
445
446       public void getState(String JavaDoc state_id, OutputStream JavaDoc ostream)
447       {
448          if(largeTransfer)
449             Util.sleep(4000);
450          
451          super.getState(state_id, ostream);
452          ObjectOutputStream JavaDoc oos = null;
453          try
454          {
455             oos = new ObjectOutputStream JavaDoc(ostream);
456             oos.writeObject(state_id);
457             oos.flush();
458          }
459          catch (IOException JavaDoc e)
460          {
461             e.printStackTrace();
462          }
463          finally
464          {
465             partialGetStateInvoked = true;
466             Util.close(oos);
467          }
468       }
469    }
470
471    public static Test suite()
472    {
473       return new TestSuite(StreamingStateTransferTest.class);
474    }
475
476    public static void main(String JavaDoc[] args)
477    {
478       String JavaDoc[] testCaseName = {StreamingStateTransferTest.class.getName()};
479       junit.textui.TestRunner.main(testCaseName);
480    }
481 }
482
Popular Tags