KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > StateTransferTest


1 package org.jgroups.tests;
2
3 import java.io.IOException JavaDoc;
4 import java.io.InputStream JavaDoc;
5 import java.io.ObjectInputStream JavaDoc;
6 import java.io.ObjectOutputStream JavaDoc;
7 import java.io.OutputStream JavaDoc;
8 import java.util.Collections JavaDoc;
9 import java.util.HashMap JavaDoc;
10 import java.util.Map JavaDoc;
11 import java.util.Set JavaDoc;
12 import java.util.concurrent.Semaphore JavaDoc;
13 import java.util.concurrent.TimeUnit JavaDoc;
14 import java.util.concurrent.locks.ReentrantLock JavaDoc;
15
16 import junit.framework.Test;
17 import junit.framework.TestSuite;
18
19 import org.jgroups.Message;
20 import org.jgroups.util.Util;
21
22 /**
23  * Tests correct state transfer while other members continue sending messages to
24  * the group
25  *
26  * @author Bela Ban
27  * @version $Id: StateTransferTest.java,v 1.15 2007/07/11 02:04:36 vlada Exp $
28  */

29 public class StateTransferTest extends ChannelTestBase {
30     private static final int MSG_SEND_COUNT = 10000;
31     private static final int APP_COUNT = 2;
32
33     public StateTransferTest(String JavaDoc name){
34         super(name);
35     }
36
37     public void testStateTransferWhileSending() throws Exception JavaDoc {
38         StateTransferApplication[] apps = new StateTransferApplication[APP_COUNT];
39
40         // Create a semaphore and take all its permits
41
Semaphore JavaDoc semaphore = new Semaphore JavaDoc(APP_COUNT);
42         semaphore.acquire(APP_COUNT);
43
44         int from = 0, to = MSG_SEND_COUNT;
45         String JavaDoc[] names = createApplicationNames(APP_COUNT);
46         for(int i = 0;i < apps.length;i++){
47             apps[i] = new StateTransferApplication(semaphore, names[i], from, to);
48             from += MSG_SEND_COUNT;
49             to += MSG_SEND_COUNT;
50         }
51
52         for(int i = 0;i < apps.length;i++){
53             StateTransferApplication app = apps[i];
54             app.start();
55             semaphore.release();
56             Util.sleep(500);
57         }
58
59         // Reacquire the semaphore tickets; when we have them all
60
// we know the threads are done
61
semaphore.tryAcquire(APP_COUNT, 40, TimeUnit.SECONDS);
62         
63         
64         Util.sleep(1000);
65         //have we received all and the correct messages?
66
for(int i = 0;i < apps.length;i++){
67             StateTransferApplication w = apps[i];
68             Map JavaDoc m = w.getMap();
69             log.info("map has " + m.size() + " elements");
70             assertEquals(MSG_SEND_COUNT * APP_COUNT, m.size());
71         }
72
73         Set JavaDoc keys = apps[0].getMap().keySet();
74         for(int i = 0;i < apps.length;i++){
75             StateTransferApplication app = apps[i];
76             Map JavaDoc m = app.getMap();
77             Set JavaDoc s = m.keySet();
78             assertEquals(keys, s);
79         }
80         
81         for(StateTransferApplication app:apps){
82             app.cleanup();
83         }
84     }
85
86     protected int getMuxFactoryCount() {
87         //one MuxChannel per real Channel
88
return APP_COUNT;
89     }
90
91     protected class StateTransferApplication extends PushChannelApplicationWithSemaphore {
92         private final ReentrantLock JavaDoc mapLock = new ReentrantLock JavaDoc();
93         private Map JavaDoc map = new HashMap JavaDoc(MSG_SEND_COUNT * APP_COUNT);
94         private int from, to;
95
96         public StateTransferApplication(Semaphore JavaDoc semaphore,String JavaDoc name,int from,int to) throws Exception JavaDoc{
97             super(name, semaphore);
98             this.from = from;
99             this.to = to;
100         }
101
102         public Map JavaDoc getMap() {
103             Map JavaDoc result = null;
104             mapLock.lock();
105             result = Collections.unmodifiableMap(map);
106             mapLock.unlock();
107             return result;
108         }
109         
110         @Override JavaDoc
111         public void receive(Message msg) {
112             Object JavaDoc[] data = (Object JavaDoc[]) msg.getObject();
113             mapLock.lock();
114             map.put(data[0], data[1]);
115             int num_received = map.size();
116             mapLock.unlock();
117             
118             if(num_received % 1000 == 0)
119                 log.info("received " + num_received);
120             
121             //are we done?
122
if(num_received >= MSG_SEND_COUNT * APP_COUNT)
123                 semaphore.release();
124         }
125
126         @Override JavaDoc
127         public byte[] getState() {
128             byte[] result = null;
129             mapLock.lock();
130             try{
131                 result = Util.objectToByteBuffer(map);
132             }catch(Exception JavaDoc e){
133                 e.printStackTrace();
134             }
135             finally{
136                 mapLock.unlock();
137             }
138             return result;
139         }
140
141         @Override JavaDoc
142         public void setState(byte[] state) {
143             mapLock.lock();
144             try{
145                 map = (Map JavaDoc) Util.objectFromByteBuffer(state);
146             }catch(Exception JavaDoc e){
147                 e.printStackTrace();
148             }
149             finally{
150                 mapLock.unlock();
151             }
152             log.info("received state, map has " + map.size() + " elements");
153             
154         }
155         @Override JavaDoc
156         public void getState(OutputStream JavaDoc ostream) {
157             ObjectOutputStream JavaDoc out;
158             mapLock.lock();
159             try{
160                 out = new ObjectOutputStream JavaDoc(ostream);
161                 out.writeObject(map);
162                 out.close();
163             }catch(IOException JavaDoc e){
164                 e.printStackTrace();
165             }
166             finally{
167                 mapLock.unlock();
168             }
169             
170         }
171
172         @Override JavaDoc
173         public void setState(InputStream JavaDoc istream) {
174             ObjectInputStream JavaDoc in;
175             mapLock.lock();
176             try{
177                 in = new ObjectInputStream JavaDoc(istream);
178                 map = (Map JavaDoc) in.readObject();
179                 log.info("received state, map has " + map.size() + " elements");
180                 in.close();
181             }catch(IOException JavaDoc e){
182                 e.printStackTrace();
183             }catch(ClassNotFoundException JavaDoc e){
184                 e.printStackTrace();
185             }
186             finally{
187                 mapLock.unlock();
188             }
189         }
190
191         @Override JavaDoc
192         protected void useChannel() throws Exception JavaDoc {
193             channel.connect("StateTransferTest-Group");
194             channel.getState(null, 10000);
195             Object JavaDoc[] data = new Object JavaDoc[2];
196             for(int i = from;i < to;i++){
197                 data[0] = new Integer JavaDoc(i);
198                 data[1] = "Value #" + i;
199                 try{
200                     channel.send(null, null, data);
201                     if(i % 1000 == 0)
202                         log.info("sent " + i);
203                 }catch(Exception JavaDoc e){
204                     e.printStackTrace();
205                     break;
206                 }
207             }
208         }
209
210         public void run() {
211             boolean acquired = false;
212             try{
213                 acquired = semaphore.tryAcquire(60000L, TimeUnit.MILLISECONDS);
214                 if(!acquired){
215                     throw new Exception JavaDoc(name + " cannot acquire semaphore");
216                 }
217                 useChannel();
218             }catch(Exception JavaDoc e){
219                 log.error(name + ": " + e.getLocalizedMessage(), e);
220                 // Save it for the test to check
221
exception = e;
222             }
223         }
224     }
225     
226     public static Test suite() {
227         return new TestSuite(StateTransferTest.class);
228     }
229
230     public static void main(String JavaDoc[] args) {
231         junit.textui.TestRunner.run(suite());
232     }
233 }
Popular Tags