1 18 package org.objectweb.speedo.runtime.concurrency; 19 20 import org.objectweb.perseus.concurrency.lib.Semaphore; 21 import org.objectweb.speedo.SpeedoTestHelper; 22 import org.objectweb.speedo.api.SpeedoProperties; 23 import org.objectweb.speedo.pobjects.basic.BasicA; 24 import org.objectweb.util.monolog.api.BasicLevel; 25 26 import java.util.ArrayList ; 27 import java.util.Collection ; 28 import java.util.Iterator ; 29 import java.util.Properties ; 30 31 import javax.jdo.PersistenceManager; 32 import javax.jdo.Query; 33 34 import junit.framework.Assert; 35 36 40 public class TestSeveralPMUsers extends SpeedoTestHelper { 41 42 public TestSeveralPMUsers(String s) { 43 super(s); 44 } 45 46 protected String getLoggerName() { 47 return LOG_NAME + ".rt.concurrency.TestSeveralPMUsers"; 48 } 49 50 public Properties getPMFProperties() { 51 Properties p = super.getPMFProperties(); 52 p.setProperty(SpeedoProperties.JDO_OPTION_MULTITREADED, "true"); 53 p.setProperty(SpeedoProperties.MAPPING_STRUCTURE, SpeedoProperties.MAPPING_STRUCTURE_DD); 54 return p; 55 } 56 public void test2thread1PMNoTx() { 57 final PersistenceManager pm = pmf.getPersistenceManager(); 58 BasicA ba = new BasicA(); 59 ba.writeF1("test2thread1PMNoTx.thread1"); 60 ba.writeF2(2); 61 pm.makePersistent(ba); 62 final Object oid = pm.getObjectId(ba); 63 Thread t = new Thread ( 64 new Runnable () { 65 public void run() { 66 try { 67 BasicA _ba = ((BasicA) pm.getObjectById(oid, false)); 68 _ba.readF1_F2(); 69 _ba.writeF1("test2thread1PMNoTx.thread2"); 70 } catch (Exception e) { 71 e.printStackTrace(); 72 fail(e.getMessage()); 73 } 74 } 75 }); 76 t.start(); 77 try { 78 t.join(); 79 } catch (InterruptedException e) { 80 } 81 Assert.assertTrue("The thread2 is not finished", !t.isAlive()); 82 pm.currentTransaction().begin(); 83 pm.deletePersistent(ba); 84 pm.currentTransaction().commit(); 85 pm.close(); 86 87 } 88 89 90 public void test2thread1PMTx() { 91 final PersistenceManager pm = pmf.getPersistenceManager(); 92 BasicA ba = new BasicA(); 93 ba.writeF1("test2thread1PM.thread1"); 94 ba.writeF2(1); 95 pm.currentTransaction().begin(); 96 pm.makePersistent(ba); 97 final Object oid = pm.getObjectId(ba); 98 Thread t = new Thread ( 99 new Runnable () { 100 public void run() { 101 try { 102 BasicA _ba = ((BasicA) pm.getObjectById(oid, false)); 103 _ba.readF1_F2(); 104 _ba.writeF1("test2thread1PM.thread2"); 105 _ba.writeF2(2); 106 pm.currentTransaction().commit(); 107 } catch (Exception e) { 108 e.printStackTrace(); 109 fail(e.getMessage()); 110 } 111 } 112 }); 113 t.start(); 114 try { 115 t.join(); 116 } catch (InterruptedException e) { 117 } 118 Assert.assertTrue("The thread2 is not finished", !t.isAlive()); 119 pm.currentTransaction().begin(); 120 Assert.assertEquals("Bad f2 value", 2, ba.readF2()); 121 ba.writeF2(3); 122 pm.currentTransaction().commit(); 123 pm.currentTransaction().begin(); 124 pm.deletePersistent(ba); 125 pm.currentTransaction().commit(); 126 pm.close(); 127 } 128 129 public void testManyThreadConcurrentData20th70ob30m0q0i() { 130 testManyThreadConcurrentData(20, 70, 30, 5, 5); 131 } 132 133 public void testManyThreadConcurrentData70th70ob0m5q5i() { 134 testManyThreadConcurrentData(70, 70, 0, 5, 5); 135 } 136 137 public void testManyThreadConcurrentData100th100ob10m5q5i() { 138 testManyThreadConcurrentData(100, 100, 10, 5, 5); 139 } 140 141 public void testManyThreadConcurrentData( 142 final int NB_THREAD, 143 final int NB_OBJECT, 144 final int NB_MODIF, 145 final int NB_QUERY, 146 final int INTERVAL) { 147 logger.log(BasicLevel.INFO, "testManyThreadConcurrentData: " 148 + " \n\tNB_THREAD=" + NB_THREAD 149 + " \n\tNB_OBJECT=" + NB_OBJECT 150 + " \n\tNB_MODIF=" + NB_MODIF 151 + " \n\tNB_QUERY=" + NB_QUERY 152 + " \n\tINTERVAL=" + INTERVAL 153 ); 154 assertTrue("Bad configuration, the number of object must be greater or equal than the number of thread", NB_OBJECT >= NB_THREAD); 155 final PersistenceManager pm = pmf.getPersistenceManager(); 156 pm.currentTransaction().begin(); 157 final Object [] oids = new Object [NB_OBJECT]; 158 final Semaphore[] s = new Semaphore[NB_OBJECT]; 159 for(int i=0; i<NB_OBJECT; i++) { 160 BasicA ba = new BasicA(); 161 ba.writeF1("testManyThreadConcurrentData_" + i); 162 ba.writeF2(i); 163 pm.makePersistent(ba); 164 oids[i] = pm.getObjectId(ba); 165 s[i] = new Semaphore(); 166 } 167 pm.currentTransaction().commit(); 168 169 pmf.getDataStoreCache().evictAll(); 171 172 Thread [] threads = new Thread [NB_THREAD]; 173 ThreadTestManyThreadConcurrentData[] runners = new ThreadTestManyThreadConcurrentData[NB_THREAD]; 174 for(int i=0; i<NB_THREAD; i++) { 175 final int threadId = i; 176 threads[i] = new Thread (runners[i]); 177 runners[i] = new ThreadTestManyThreadConcurrentData( 178 threadId, NB_OBJECT, NB_MODIF, NB_QUERY, INTERVAL, 179 oids, s, pm, this); 180 } 181 pm.currentTransaction().begin(); 182 for(int i=0; i<NB_THREAD; i++) { 183 threads[i].start(); 184 } 185 for(int i=0; i<NB_THREAD; i++) { 186 try { 187 threads[i].join(); 188 } catch (InterruptedException e) { 189 } 190 } 191 int error = 0; 192 for(int i=0; i<NB_THREAD; i++) { 193 if (runners[i].throwable!= null) { 194 logger.log(BasicLevel.ERROR, "Error in the thread " + i 195 + ": ", runners[i].throwable); 196 error++; 197 } 198 } 199 pm.currentTransaction().commit(); 200 pm.currentTransaction().begin(); 201 pm.deletePersistentAll(pm.getObjectsById(oids)); 202 pm.currentTransaction().commit(); 203 pm.close(); 204 if (error > 0) { 205 fail(error + " error(s) occur."); 206 } 207 } 208 } 209 210 class ThreadTestManyThreadConcurrentData implements Runnable { 211 212 private final int threadId; 213 private final int NB_OBJECT; 214 private final int NB_MODIF; 215 private final int NB_QUERY; 216 private final int INTERVAL; 217 private final Object [] oids; 218 private final PersistenceManager pm; 219 private final Semaphore[] s; 220 private final SpeedoTestHelper st; 221 public Throwable throwable = null; 222 223 public ThreadTestManyThreadConcurrentData(final int threadId, 224 final int NB_OBJECT, 225 final int NB_MODIF, 226 final int NB_QUERY, 227 final int INTERVAL, 228 final Object [] oids, 229 final Semaphore[] s, 230 final PersistenceManager pm, 231 final SpeedoTestHelper st) { 232 this.threadId = threadId; 233 this.NB_OBJECT = NB_OBJECT; 234 this.NB_MODIF = NB_MODIF; 235 this.NB_QUERY = NB_QUERY; 236 this.INTERVAL = INTERVAL; 237 this.oids = oids; 238 this.s = s; 239 this.pm = pm; 240 this.st = st; 241 } 242 243 public void run() { 244 try { 245 execute(); 246 } catch (Throwable t) { 247 throwable = t; 248 } 249 } 250 251 private void execute() { 252 BasicA ba = ((BasicA) pm.getObjectById(oids[NB_OBJECT-1], false)); 254 ba.readF1_F2(); 255 256 ba = ((BasicA) pm.getObjectById(oids[threadId], false)); 257 ba.writeF1(ba.readF1() + "modifiedBy" + threadId); 258 259 for(int j=0; j<NB_MODIF; j++) { 260 int id = j; 263 if (j % 2 == 0) { 264 id = Math.min(NB_OBJECT-1, threadId + j); 265 } else { 266 id = Math.max(0, threadId - j); 267 } 268 ba = ((BasicA) pm.getObjectById(oids[id], false)); 269 s[id].P(); 270 try { 271 ba.writeF1(ba.readF1() + "modifiedBy" + threadId); 272 } finally { 273 s[id].V(); 274 } 275 } 276 277 for(int j=0; j<NB_QUERY; j++) { 278 final int min = Math.max(0, threadId - (INTERVAL/2)); 279 final int max = Math.min(NB_OBJECT-1, threadId + (INTERVAL/2)); 280 Query q = pm.newQuery(BasicA.class); 281 q.declareParameters("int pmin, int pmax"); 282 q.setFilter("(f2 >= pmin) && (f2 <= pmax)"); 283 Collection c = (Collection ) q.execute( 284 new Integer (min), new Integer (max)); 285 Collection expected = new ArrayList (); 286 for(int id=min; id<=max; id++) { 287 expected.add(new Integer (id)); 288 } 289 st.getLogger().log(BasicLevel.DEBUG, "Thread " + threadId + " expect: " + expected); 290 Collection found = new ArrayList (); 291 for (Iterator iter = c.iterator(); iter.hasNext();) { 292 ba = (BasicA) iter.next(); 293 int f2 = ba.readF2(); 294 found.add(new Integer (f2)); 295 } 296 q.closeAll(); 297 st.getLogger().log(BasicLevel.DEBUG, "Thread " + threadId + " found: " + found); 298 st.assertSameCollection("Bad query result" 299 + ", threadId=" + threadId 300 + ", min=" + min 301 + ", max=" + max, 302 expected, found); 303 } 304 } 305 } 306 | Popular Tags |