KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > ConcurrentStartupTest


1 package org.jgroups.tests;
2
3 import java.io.IOException JavaDoc;
4 import java.io.InputStream JavaDoc;
5 import java.io.ObjectInputStream JavaDoc;
6 import java.io.ObjectOutputStream JavaDoc;
7 import java.io.OutputStream JavaDoc;
8 import java.util.LinkedList JavaDoc;
9 import java.util.List JavaDoc;
10 import java.util.Map JavaDoc;
11 import java.util.TreeMap JavaDoc;
12 import java.util.concurrent.Semaphore JavaDoc;
13
14 import junit.framework.Test;
15 import junit.framework.TestSuite;
16
17 import org.jgroups.Channel;
18 import org.jgroups.JChannelFactory;
19 import org.jgroups.Message;
20 import org.jgroups.View;
21 import org.jgroups.util.Util;
22
23
24 /**
25  * Tests concurrent startup with state transfer and concurrent state tranfer.
26  * @author bela
27  * @version $Id: ConcurrentStartupTest.java,v 1.22 2007/07/04 08:31:41 belaban Exp $
28  */

29 public class ConcurrentStartupTest extends ChannelTestBase
30 {
31
32    private int mod = 1;
33    
34    public void setUp() throws Exception JavaDoc
35    {
36       super.setUp();
37       mod = 1;
38       CHANNEL_CONFIG = System.getProperty("channel.conf.flush", "flush-udp.xml");
39    }
40    
41    public boolean useBlocking()
42    {
43       return true;
44    }
45    
46    public void testConcurrentStartupLargeState()
47    {
48       concurrentStartupHelper(true,false);
49    }
50    
51    public void testConcurrentStartupSmallState()
52    {
53       concurrentStartupHelper(false,true);
54    }
55
56    /**
57     * Tests concurrent startup and message sending directly after joining
58     * See doc/design/ConcurrentStartupTest.txt for details. This will only work 100% correctly once we have
59     * FLUSH support (JGroups 2.4)
60     *
61     * NOTE: This test is not guaranteed to pass at 100% rate until combined join
62     * and state transfer using one FLUSH phase is introduced (Jgroups 2.5)[1].
63     *
64     * [1] http://jira.jboss.com/jira/browse/JGRP-236
65     *
66     *
67     */

68    protected void concurrentStartupHelper(boolean largeState,boolean useDispatcher)
69    {
70       String JavaDoc[] names = null;
71       
72       //mux applications on top of same channel have to have unique name
73
if(isMuxChannelUsed())
74       {
75          names = createMuxApplicationNames(1);
76       }
77       else
78       {
79          names = new String JavaDoc[]{"A", "B", "C", "D"};
80       }
81       
82       int count = names.length;
83       
84       ConcurrentStartupChannel[] channels = new ConcurrentStartupChannel[count];
85       try
86       {
87          // Create a semaphore and take all its permits
88
Semaphore JavaDoc semaphore = new Semaphore JavaDoc(count);
89          takeAllPermits(semaphore, count);
90
91          // Create activation threads that will block on the semaphore
92
for (int i = 0; i < count; i++)
93          {
94             if(largeState)
95             {
96                if(isMuxChannelUsed())
97                {
98                   channels[i] = new ConcurrentStartupChannelWithLargeState(names[i],muxFactory[i%getMuxFactoryCount()],semaphore);
99                }
100                else
101                {
102                   channels[i] = new ConcurrentStartupChannelWithLargeState(semaphore, names[i],useDispatcher);
103                }
104             }
105             else
106             {
107              
108                if(isMuxChannelUsed())
109                {
110                   channels[i] = new ConcurrentStartupChannel(names[i],muxFactory[i%getMuxFactoryCount()],semaphore);
111                }
112                else
113                {
114                   channels[i] = new ConcurrentStartupChannel(names[i],semaphore,useDispatcher);
115                }
116             }
117
118             // Release one ticket at a time to allow the thread to start working
119
channels[i].start();
120             semaphore.release(1);
121             sleepRandom(1500);
122          }
123
124          // Make sure everyone is in sync
125
if(isMuxChannelUsed())
126          {
127             blockUntilViewsReceived(channels,getMuxFactoryCount(), 60000);
128          }
129          else
130          {
131             blockUntilViewsReceived(channels, 60000);
132          }
133
134          // Sleep to ensure the threads get all the semaphore tickets
135
Util.sleep(1000);
136
137          // Reacquire the semaphore tickets; when we have them all
138
// we know the threads are done
139
acquireSemaphore(semaphore, 60000, count);
140          
141          //Sleep to ensure async message arrive
142
Util.sleep(6000);
143
144          //do test verification
145
List JavaDoc[] lists = new List JavaDoc[count];
146          for (int i = 0; i < count; i++)
147          {
148             lists[i] = channels[i].getList();
149          }
150
151          Map JavaDoc[] mods = new Map JavaDoc[count];
152          for (int i = 0; i < count; i++)
153          {
154             mods[i] = channels[i].getModifications();
155          }
156          
157          printLists(lists);
158          printModifications(mods);
159
160          int len = lists.length;
161          for (int i = 0; i < lists.length; i++)
162          {
163             List JavaDoc l = lists[i];
164             assertEquals("list #" + i + " should have " + len + " elements", len, l.size());
165          }
166       }
167       catch (Exception JavaDoc ex)
168       {
169          log.warn("Exception encountered during test",ex);
170       }
171       finally
172       {
173          for (int i = 0; i < count; i++)
174          {
175             Util.sleep(500);
176             channels[i].cleanup();
177          }
178       }
179    }
180    
181    public void testConcurrentLargeStateTranfer()
182    {
183       concurrentStateTranferHelper(true,false);
184    }
185    
186    public void testConcurrentSmallStateTranfer()
187    {
188       concurrentStateTranferHelper(false,true);
189    }
190      
191      
192
193    /**
194     * Tests concurrent state transfer. This test should pass at 100% rate when [1]
195     * is solved.
196     *
197     * [1]http://jira.jboss.com/jira/browse/JGRP-332
198     *
199     *
200     */

201    protected void concurrentStateTranferHelper(boolean largeState, boolean useDispatcher)
202    {
203       String JavaDoc[] names = null;
204       
205       //mux applications on top of same channel have to have unique name
206
if(isMuxChannelUsed())
207       {
208          names = createMuxApplicationNames(1);
209       }
210       else
211       {
212          names = new String JavaDoc[]{"A", "B", "C", "D"};
213       }
214       
215       int count = names.length;
216       ConcurrentStateTransfer[] channels = new ConcurrentStateTransfer[count];
217       
218       //Create a semaphore and take all its tickets
219
Semaphore JavaDoc semaphore = new Semaphore JavaDoc(count);
220       takeAllPermits(semaphore, count);
221       
222       try
223       {
224
225          // Create activation threads that will block on the semaphore
226
for (int i = 0; i < count; i++)
227          {
228             if(largeState)
229             {
230                if(isMuxChannelUsed())
231                {
232                   channels[i] = new ConcurrentLargeStateTransfer(names[i],muxFactory[i%getMuxFactoryCount()],semaphore);
233                }
234                else
235                {
236                   channels[i] = new ConcurrentLargeStateTransfer(names[i],semaphore,useDispatcher);
237                }
238             }
239             else
240             {
241                if(isMuxChannelUsed())
242                {
243                   channels[i] = new ConcurrentStateTransfer(names[i],muxFactory[i%getMuxFactoryCount()],semaphore);
244                }
245                else
246                {
247                   channels[i] = new ConcurrentStateTransfer(names[i],semaphore,useDispatcher);
248                }
249             }
250
251             // Start threads and let them join the channel
252
channels[i].start();
253             Util.sleep(2000);
254          }
255
256          // Make sure everyone is in sync
257
if(isMuxChannelUsed())
258          {
259             blockUntilViewsReceived(channels,getMuxFactoryCount(), 60000);
260          }
261          else
262          {
263             blockUntilViewsReceived(channels, 60000);
264          }
265
266          Util.sleep(2000);
267          //Unleash hell !
268
semaphore.release(count);
269
270          // Sleep to ensure the threads get all the semaphore tickets
271
Util.sleep(2000);
272
273          //Reacquire the semaphore tickets; when we have them all
274
//we know the threads are done
275
acquireSemaphore(semaphore, 60000, count);
276
277          //Sleep to ensure async message arrive
278
Util.sleep(6000);
279          //do test verification
280
List JavaDoc[] lists = new List JavaDoc[count];
281          for (int i = 0; i < count; i++)
282          {
283             lists[i] = channels[i].getList();
284          }
285
286          Map JavaDoc[] mods = new Map JavaDoc[count];
287          for (int i = 0; i < count; i++)
288          {
289             mods[i] = channels[i].getModifications();
290          }
291
292          printLists(lists);
293          printModifications(mods);
294
295          int len = lists.length;
296          for (int i = 0; i < lists.length; i++)
297          {
298             List JavaDoc l = lists[i];
299             assertEquals("list #" + i + " should have " + len + " elements", len, l.size());
300          }
301       }
302       catch (Exception JavaDoc ex)
303       {
304          log.warn("Exception encountered during test",ex);
305       }
306       finally
307       {
308          for (int i = 0; i < count; i++)
309          {
310             Util.sleep(500);
311             channels[i].cleanup();
312          }
313       }
314    }
315    
316    protected int getMod()
317    {
318       synchronized (this)
319       {
320          int retval = mod;
321          mod++;
322          return retval;
323       }
324    }
325
326    protected void printModifications(Map JavaDoc[] modifications)
327    {
328       for (int i = 0; i < modifications.length; i++)
329       {
330          Map JavaDoc modification = modifications[i];
331          log.info("modifications for #" + i + ": " + modification);
332       }
333    }
334
335    protected void printLists(List JavaDoc[] lists)
336    {
337       for (int i = 0; i < lists.length; i++)
338       {
339          List JavaDoc l = lists[i];
340          log.info(i + ": " + l);
341       }
342    }
343
344    protected class ConcurrentStateTransfer extends ConcurrentStartupChannel
345    {
346       public ConcurrentStateTransfer(String JavaDoc name,Semaphore JavaDoc semaphore,boolean useDispatcher) throws Exception JavaDoc
347       {
348          super(name,semaphore,useDispatcher);
349          channel.connect("test");
350       }
351       
352       public ConcurrentStateTransfer(String JavaDoc name,JChannelFactory factory, Semaphore JavaDoc semaphore) throws Exception JavaDoc
353       {
354          super(name,factory,semaphore);
355          channel.connect("test");
356       }
357
358       public void useChannel() throws Exception JavaDoc
359       {
360          boolean success = channel.getState(null, 30000);
361          log.info("channel.getState at " + getName() + getLocalAddress() + " returned " + success);
362          channel.send(null, null, channel.getLocalAddress());
363       }
364    }
365    
366    protected class ConcurrentLargeStateTransfer extends ConcurrentStateTransfer
367    {
368       public ConcurrentLargeStateTransfer(String JavaDoc name,Semaphore JavaDoc semaphore,boolean useDispatcher) throws Exception JavaDoc
369       {
370          super(name,semaphore,useDispatcher);
371       }
372       
373       public ConcurrentLargeStateTransfer(String JavaDoc name,JChannelFactory factory,Semaphore JavaDoc semaphore) throws Exception JavaDoc
374       {
375          super(name,factory,semaphore);
376       }
377       
378       public void setState(byte[] state)
379       {
380          super.setState(state);
381          Util.sleep(5000);
382       }
383
384       public byte[] getState()
385       {
386          Util.sleep(5000);
387          return super.getState();
388       }
389
390       public void getState(OutputStream JavaDoc ostream)
391       {
392          super.getState(ostream);
393          Util.sleep(5000);
394       }
395
396       public void setState(InputStream JavaDoc istream)
397       {
398          super.setState(istream);
399          Util.sleep(5000);
400       }
401    }
402    
403    protected class ConcurrentStartupChannelWithLargeState extends ConcurrentStartupChannel
404    {
405       public ConcurrentStartupChannelWithLargeState(Semaphore JavaDoc semaphore, String JavaDoc name,boolean useDispatcher) throws Exception JavaDoc
406       {
407          super(name,semaphore,useDispatcher);
408       }
409       
410       public ConcurrentStartupChannelWithLargeState(String JavaDoc name,JChannelFactory f,Semaphore JavaDoc semaphore) throws Exception JavaDoc
411       {
412          super(name,f,semaphore);
413       }
414       
415       public void setState(byte[] state)
416       {
417          super.setState(state);
418          Util.sleep(5000);
419       }
420
421       public byte[] getState()
422       {
423          Util.sleep(5000);
424          return super.getState();
425       }
426
427       public void getState(OutputStream JavaDoc ostream)
428       {
429          super.getState(ostream);
430          Util.sleep(5000);
431       }
432
433       public void setState(InputStream JavaDoc istream)
434       {
435          super.setState(istream);
436          Util.sleep(5000);
437       }
438    }
439    
440
441    protected class ConcurrentStartupChannel extends PushChannelApplicationWithSemaphore
442    {
443       final List JavaDoc l = new LinkedList JavaDoc();
444
445       Channel ch;
446
447       int modCount = 1;
448
449       final Map JavaDoc mods = new TreeMap JavaDoc();
450       
451       public ConcurrentStartupChannel(String JavaDoc name,Semaphore JavaDoc semaphore) throws Exception JavaDoc
452       {
453          super(name,semaphore,true);
454       }
455       
456       public ConcurrentStartupChannel(String JavaDoc name,JChannelFactory f,Semaphore JavaDoc semaphore) throws Exception JavaDoc
457       {
458          super(name,f,semaphore);
459       }
460       
461       public ConcurrentStartupChannel(String JavaDoc name,Semaphore JavaDoc semaphore,boolean useDispatcher) throws Exception JavaDoc
462       {
463          super(name,semaphore,useDispatcher);
464       }
465
466       public void useChannel() throws Exception JavaDoc
467       {
468          channel.connect("test");
469          channel.getState(null, 25000);
470          channel.send(null, null, channel.getLocalAddress());
471       }
472
473       List JavaDoc getList()
474       {
475          return l;
476       }
477
478       Map JavaDoc getModifications()
479       {
480          return mods;
481       }
482
483       public void receive(Message msg)
484       {
485          if (msg.getBuffer() == null)
486             return;
487          Object JavaDoc obj = msg.getObject();
488          synchronized (this)
489          {
490             l.add(obj);
491             Integer JavaDoc key = new Integer JavaDoc(getMod());
492             mods.put(key, obj);
493          }
494       }
495
496       public void viewAccepted(View new_view)
497       {
498          super.viewAccepted(new_view);
499          synchronized (this)
500          {
501             Integer JavaDoc key = new Integer JavaDoc(getMod());
502             mods.put(key, new_view.getVid());
503          }
504       }
505
506       public void setState(byte[] state)
507       {
508          super.setState(state);
509          try
510          {
511             List JavaDoc tmp = (List JavaDoc) Util.objectFromByteBuffer(state);
512             synchronized (this)
513             {
514                l.clear();
515                l.addAll(tmp);
516                log.info("-- [#" + getName() + " (" + channel.getLocalAddress() + ")]: state is " + l);
517                Integer JavaDoc key = new Integer JavaDoc(getMod());
518                mods.put(key, tmp);
519             }
520          }
521          catch (Exception JavaDoc e)
522          {
523             e.printStackTrace();
524          }
525       }
526
527       public byte[] getState()
528       {
529          super.getState();
530          List JavaDoc tmp = null;
531          synchronized (this)
532          {
533             tmp = new LinkedList JavaDoc(l);
534             try
535             {
536                return Util.objectToByteBuffer(tmp);
537             }
538             catch (Exception JavaDoc e)
539             {
540                e.printStackTrace();
541                return null;
542             }
543          }
544       }
545
546       public void getState(OutputStream JavaDoc ostream)
547       {
548          super.getState(ostream);
549          ObjectOutputStream JavaDoc oos = null;
550          try
551          {
552             oos = new ObjectOutputStream JavaDoc(ostream);
553             List JavaDoc tmp = null;
554             synchronized (this)
555             {
556                tmp = new LinkedList JavaDoc(l);
557             }
558             oos.writeObject(tmp);
559             oos.flush();
560          }
561          catch (IOException JavaDoc e)
562          {
563             e.printStackTrace();
564          }
565          finally
566          {
567             Util.close(oos);
568          }
569       }
570
571       public void setState(InputStream JavaDoc istream)
572       {
573          super.setState(istream);
574          ObjectInputStream JavaDoc ois = null;
575          try
576          {
577             ois = new ObjectInputStream JavaDoc(istream);
578             List JavaDoc tmp = (List JavaDoc) ois.readObject();
579             synchronized (this)
580             {
581                l.clear();
582                l.addAll(tmp);
583                log.info("-- [#" + getName() + " (" + channel.getLocalAddress() + ")]: state is " + l);
584                Integer JavaDoc key = new Integer JavaDoc(getMod());
585                mods.put(key, tmp);
586             }
587          }
588          catch (Exception JavaDoc e)
589          {
590             e.printStackTrace();
591          }
592          finally
593          {
594             Util.close(ois);
595          }
596       }
597    }
598
599    public static Test suite()
600    {
601       return new TestSuite(ConcurrentStartupTest.class);
602    }
603
604    public static void main(String JavaDoc[] args)
605    {
606       String JavaDoc[] testCaseName = {ConcurrentStartupTest.class.getName()};
607       junit.textui.TestRunner.main(testCaseName);
608    }
609 }
610
Popular Tags