1 package org.jgroups.tests; 2 3 import java.io.IOException ; 4 import java.io.InputStream ; 5 import java.io.ObjectInputStream ; 6 import java.io.ObjectOutputStream ; 7 import java.io.OutputStream ; 8 import java.util.Collections ; 9 import java.util.HashMap ; 10 import java.util.Map ; 11 import java.util.Set ; 12 import java.util.concurrent.Semaphore ; 13 import java.util.concurrent.TimeUnit ; 14 import java.util.concurrent.locks.ReentrantLock ; 15 16 import junit.framework.Test; 17 import junit.framework.TestSuite; 18 19 import org.jgroups.Message; 20 import org.jgroups.util.Util; 21 22 29 public class StateTransferTest extends ChannelTestBase { 30 private static final int MSG_SEND_COUNT = 10000; 31 private static final int APP_COUNT = 2; 32 33 public StateTransferTest(String name){ 34 super(name); 35 } 36 37 public void testStateTransferWhileSending() throws Exception { 38 StateTransferApplication[] apps = new StateTransferApplication[APP_COUNT]; 39 40 Semaphore semaphore = new Semaphore (APP_COUNT); 42 semaphore.acquire(APP_COUNT); 43 44 int from = 0, to = MSG_SEND_COUNT; 45 String [] names = createApplicationNames(APP_COUNT); 46 for(int i = 0;i < apps.length;i++){ 47 apps[i] = new StateTransferApplication(semaphore, names[i], from, to); 48 from += MSG_SEND_COUNT; 49 to += MSG_SEND_COUNT; 50 } 51 52 for(int i = 0;i < apps.length;i++){ 53 StateTransferApplication app = apps[i]; 54 app.start(); 55 semaphore.release(); 56 Util.sleep(500); 57 } 58 59 semaphore.tryAcquire(APP_COUNT, 40, TimeUnit.SECONDS); 62 63 64 Util.sleep(1000); 65 for(int i = 0;i < apps.length;i++){ 67 StateTransferApplication w = apps[i]; 68 Map m = w.getMap(); 69 log.info("map has " + m.size() + " elements"); 70 assertEquals(MSG_SEND_COUNT * APP_COUNT, m.size()); 71 } 72 73 Set keys = apps[0].getMap().keySet(); 74 for(int i = 0;i < apps.length;i++){ 75 StateTransferApplication app = apps[i]; 76 Map m = app.getMap(); 77 Set s = m.keySet(); 78 assertEquals(keys, s); 79 } 80 81 for(StateTransferApplication app:apps){ 82 app.cleanup(); 83 } 84 } 85 86 protected int getMuxFactoryCount() { 87 return APP_COUNT; 89 } 90 91 protected class StateTransferApplication extends PushChannelApplicationWithSemaphore { 92 private final ReentrantLock mapLock = new ReentrantLock (); 93 private Map map = new HashMap (MSG_SEND_COUNT * APP_COUNT); 94 private int from, to; 95 96 public StateTransferApplication(Semaphore semaphore,String name,int from,int to) throws Exception { 97 super(name, semaphore); 98 this.from = from; 99 this.to = to; 100 } 101 102 public Map getMap() { 103 Map result = null; 104 mapLock.lock(); 105 result = Collections.unmodifiableMap(map); 106 mapLock.unlock(); 107 return result; 108 } 109 110 @Override 111 public void receive(Message msg) { 112 Object [] data = (Object []) msg.getObject(); 113 mapLock.lock(); 114 map.put(data[0], data[1]); 115 int num_received = map.size(); 116 mapLock.unlock(); 117 118 if(num_received % 1000 == 0) 119 log.info("received " + num_received); 120 121 if(num_received >= MSG_SEND_COUNT * APP_COUNT) 123 semaphore.release(); 124 } 125 126 @Override 127 public byte[] getState() { 128 byte[] result = null; 129 mapLock.lock(); 130 try{ 131 result = Util.objectToByteBuffer(map); 132 }catch(Exception e){ 133 e.printStackTrace(); 134 } 135 finally{ 136 mapLock.unlock(); 137 } 138 return result; 139 } 140 141 @Override 142 public void setState(byte[] state) { 143 mapLock.lock(); 144 try{ 145 map = (Map ) Util.objectFromByteBuffer(state); 146 }catch(Exception e){ 147 e.printStackTrace(); 148 } 149 finally{ 150 mapLock.unlock(); 151 } 152 log.info("received state, map has " + map.size() + " elements"); 153 154 } 155 @Override 156 public void getState(OutputStream ostream) { 157 ObjectOutputStream out; 158 mapLock.lock(); 159 try{ 160 out = new ObjectOutputStream (ostream); 161 out.writeObject(map); 162 out.close(); 163 }catch(IOException e){ 164 e.printStackTrace(); 165 } 166 finally{ 167 mapLock.unlock(); 168 } 169 170 } 171 172 @Override 173 public void setState(InputStream istream) { 174 ObjectInputStream in; 175 mapLock.lock(); 176 try{ 177 in = new ObjectInputStream (istream); 178 map = (Map ) in.readObject(); 179 log.info("received state, map has " + map.size() + " elements"); 180 in.close(); 181 }catch(IOException e){ 182 e.printStackTrace(); 183 }catch(ClassNotFoundException e){ 184 e.printStackTrace(); 185 } 186 finally{ 187 mapLock.unlock(); 188 } 189 } 190 191 @Override 192 protected void useChannel() throws Exception { 193 channel.connect("StateTransferTest-Group"); 194 channel.getState(null, 10000); 195 Object [] data = new Object [2]; 196 for(int i = from;i < to;i++){ 197 data[0] = new Integer (i); 198 data[1] = "Value #" + i; 199 try{ 200 channel.send(null, null, data); 201 if(i % 1000 == 0) 202 log.info("sent " + i); 203 }catch(Exception e){ 204 e.printStackTrace(); 205 break; 206 } 207 } 208 } 209 210 public void run() { 211 boolean acquired = false; 212 try{ 213 acquired = semaphore.tryAcquire(60000L, TimeUnit.MILLISECONDS); 214 if(!acquired){ 215 throw new Exception (name + " cannot acquire semaphore"); 216 } 217 useChannel(); 218 }catch(Exception e){ 219 log.error(name + ": " + e.getLocalizedMessage(), e); 220 exception = e; 222 } 223 } 224 } 225 226 public static Test suite() { 227 return new TestSuite(StateTransferTest.class); 228 } 229 230 public static void main(String [] args) { 231 junit.textui.TestRunner.run(suite()); 232 } 233 } | Popular Tags |