1 18 package org.objectweb.speedo.runtime.concurrency; 19 20 import org.objectweb.speedo.Speedo; 21 import org.objectweb.speedo.SpeedoTestHelper; 22 import org.objectweb.speedo.api.ExceptionHelper; 23 import org.objectweb.speedo.pobjects.userid.BasicB; 24 import org.objectweb.speedo.pobjects.basic.BasicA; 25 import org.objectweb.util.monolog.api.BasicLevel; 26 27 import javax.jdo.PersistenceManager; 28 import javax.jdo.JDOException; 29 import javax.jdo.JDOFatalException; 30 import javax.jdo.PersistenceManagerFactory; 31 32 import junit.framework.Assert; 33 34 import java.util.ArrayList ; 35 import java.util.Arrays ; 36 import java.util.Collection ; 37 import java.util.HashSet ; 38 import java.util.List ; 39 import java.util.Map ; 40 import java.util.Set ; 41 import java.util.Vector ; 42 import java.util.Iterator ; 43 44 49 public class TestManyUsers extends SpeedoTestHelper { 50 51 public static final int W_ACTION = 1; 52 public static final int R_W_ACTION = 2; 53 public static final int MIXED_ACTION = 3; 54 55 public static Vector rollbackExceptions = new Vector (); 56 public static Vector errors = new Vector (); 57 58 59 public TestManyUsers(String s) { 60 super(s); 61 } 62 63 protected String getLoggerName() { 64 return LOG_NAME + ".rt.concurrency.TestManyUsers"; 65 } 66 67 71 public void testLoading() { 72 final int nbThread = Integer.getInteger( 73 "conform.org.objectweb.speedo.rt.concurrency.TestManyUsers.testLoading.thread", 74 1000).intValue(); 75 logger.log(BasicLevel.INFO, "Run concurrent Loading, " + nbThread + " threads"); 76 Thread [] ts = new Thread [nbThread]; 77 for (int i = 0; i < nbThread; i++) { 78 ts[i] = new Thread ( 79 new Runnable () { 80 public void run() { 81 try { 82 PersistenceManager pm = pmf.getPersistenceManager(); 83 Thread.sleep(1); 84 pm.getObjectIdClass(BasicB.class); 85 pm.close(); 86 Assert.assertTrue(true); 87 } catch (Exception e) { 88 e.printStackTrace(); 89 fail(e.getMessage()); 90 } 91 } 92 } 93 ); 94 } 95 for (int i = 0; i < nbThread; i++) { 96 ts[i].start(); 97 } 98 try { 99 for (int i = 0; i < nbThread; i++) { 100 ts[i].join(); 101 } 102 } catch (InterruptedException e) { 103 fail(e.getMessage()); 104 } 105 } 106 107 111 public void testConcurrentReadAndWrite() { 112 logger.log(BasicLevel.INFO, ""); 113 int nbThread = Integer.getInteger( 114 "conform.org.objectweb.speedo.rt.concurrency.TestManyUsers.testConcurrentReadAndWrite.thread", 115 200).intValue(); 116 logger.log(BasicLevel.INFO, "Run concurrent readAndWrite actions"); 117 int[] actions = new int[10]; 118 Arrays.fill(actions, R_W_ACTION); 119 testAccess(nbThread, actions); 120 } 121 125 public void testConcurrentWrite() { 126 logger.log(BasicLevel.INFO, ""); 127 int nbThread = Integer.getInteger( 128 "conform.org.objectweb.speedo.rt.concurrency.TestManyUsers.testConcurrentWrite.thread", 129 200).intValue(); 130 logger.log(BasicLevel.INFO, "Run concurrent write actions"); 131 int[] actions = new int[10]; 132 Arrays.fill(actions, W_ACTION); 133 testAccess(nbThread, actions); 134 } 135 139 public void testConcurrentMixed() { 140 logger.log(BasicLevel.INFO, ""); 141 int nbThread = Integer.getInteger( 142 "conform.org.objectweb.speedo.rt.concurrency.TestManyUsers.testConcurrentWrite.thread", 143 200).intValue(); 144 logger.log(BasicLevel.INFO, "Run concurrent mixed actions"); 145 int[] actions = new int[10]; 146 Arrays.fill(actions, MIXED_ACTION); 147 testAccess(nbThread, actions); 148 } 149 153 public void testAccess(final int nbThread , final int[] actions) { 154 final int initialValue = 0; 155 rollbackExceptions.clear(); 156 errors.clear(); 157 Thread [] ts = new Thread [nbThread]; 158 BasicA ba = new BasicA(); 159 ba.writeF1("1"); 160 ba.writeF2(initialValue); 161 PersistenceManager pm = pmf.getPersistenceManager(); 162 pm.currentTransaction().begin(); 163 pm.makePersistent(ba); 164 final BasicA a = ba; 165 Object id = pm.getObjectId(ba); 166 pm.currentTransaction().commit(); 167 pm.close(); 168 for (int i = 0; i < nbThread; i++) { 169 final int _i = i; 170 ts[i] = new Thread ( 171 new Runnable () { 172 public void run() { 173 int action = (_i % 2 == 0 ? W_ACTION : R_W_ACTION); 174 for(int j=0; j<actions.length; j++) { 175 PersistenceManager pm = pmf.getPersistenceManager(); 176 boolean rollback = false; 177 try { 178 pm.currentTransaction().begin(); 179 logger.log(BasicLevel.DEBUG, _i + "," + j + " begin tx"); 180 switch(actions[j]) { 181 case W_ACTION: 182 a.incF2(); 183 break; 184 case R_W_ACTION: 185 a.writeF2(a.readF2() + 1); 186 break; 187 case MIXED_ACTION: 188 if (action == W_ACTION) { 189 a.incF2(); 190 action = R_W_ACTION; 191 } else if (action == R_W_ACTION) { 192 a.writeF2(a.readF2() + 1); 193 action = W_ACTION; 194 } 195 break; 196 } 197 pm.currentTransaction().commit(); 198 logger.log(BasicLevel.DEBUG, _i + "," + j + " finished"); 199 } catch (JDOFatalException e) { 200 rollback = true; 201 rollbackExceptions.add(e); 202 logger.log(BasicLevel.DEBUG, "Tx " + _i + "," + j + " has been rolledback"); 203 } catch (Exception e) { 204 Exception ie = ExceptionHelper.getNested(e); 205 errors.add(ie); 206 logger.log(BasicLevel.ERROR, _i + "," + j + " has a problem", ie); 207 pm.currentTransaction().rollback(); 208 } finally { 209 try { 210 pm.close(); 211 } catch (JDOException e) { 212 logger.log(BasicLevel.ERROR, "tx " + _i 213 + "," + j + " has been " 214 + (rollback ? "rolledback" : "committed") 215 + " and the close occurs an error", ExceptionHelper.getNested(e)); 216 throw e; 217 } 218 } 219 } 220 } 221 } 222 ); 223 } 224 long execTime = System.currentTimeMillis(); 225 for (int i = 0; i < nbThread; i++) { 226 ts[i].start(); 227 } 228 int val = 0; 229 ArrayList al = new ArrayList (nbThread); 230 try { 231 logger.log(BasicLevel.INFO, nbThread + " threads launched doing " + actions.length + " actions, waiting them ..."); 232 for (int i = 0; i < nbThread; i++) { 233 ts[i].join(1000); 234 if (ts[i].isAlive()) { 235 al.add(new Integer (i)); 236 logger.log(BasicLevel.DEBUG, i + " is not finished after" + 237 " the delay, it could be blocked"); 238 } 239 } 240 String dg = getDG(pmf); 241 if (dg != null) { 242 logger.log(BasicLevel.INFO, dg); 243 } 244 if (al.size() > 0) { 245 for (Iterator it = al.iterator(); it.hasNext();) { 246 int th = ((Integer ) it.next()).intValue(); 247 if (!ts[th].isAlive()) { 248 logger.log(BasicLevel.DEBUG, th + " is late but ok."); 249 it.remove(); 250 } 251 } 252 } 253 execTime = System.currentTimeMillis() - execTime; 254 if (al.size() > 0) { 255 logger.log(BasicLevel.INFO, "Kill alive threads"); 256 for (Iterator it = al.iterator(); it.hasNext();) { 257 int th = ((Integer ) it.next()).intValue(); 258 if (!ts[th].isAlive()) { 259 it.remove(); 260 } else { 261 try { 262 ts[th].interrupt(); 263 } catch (Exception e1) { 264 e1.printStackTrace(); 265 } 266 } 267 } 268 fail("Thread " + al + " blocked!"); 269 } 270 } catch (InterruptedException e) { 271 } finally { 273 if (al.size() == 0) { 274 logger.log(BasicLevel.DEBUG, "Auto cleaning"); 275 pm = pmf.getPersistenceManager(); 276 ba = (BasicA) pm.getObjectById(id, false); 277 val = ba.readF2(); 278 pm.currentTransaction().begin(); 279 pm.deletePersistent(ba); 280 pm.currentTransaction().commit(); 281 pm.close(); 282 int nbCommittedTx = nbThread*actions.length - errors.size() - rollbackExceptions.size(); 283 logger.log(BasicLevel.INFO, "Commited transaction rate: " 284 + ((nbCommittedTx * 100) / (nbThread*actions.length)) + "%" 285 + ", exec time: " + execTime + "ms" 286 + ", tx/s:" + ((nbCommittedTx * 1000)/execTime)); 287 } 288 } 289 if (errors.size() > 0) { 290 fail("There are " + errors.size() + "/" + nbThread + " errors during the run"); 291 } 292 if (al.size() == 0) { 293 Assert.assertEquals("Bad f2 value", 294 initialValue + nbThread*actions.length - rollbackExceptions.size(), val); 295 } 296 } 297 298 299 300 301 private static String getDG(PersistenceManagerFactory pmf) { 302 Map m = null; 303 try { 304 m = ((Speedo) pmf).getDependencyGraph().getVertexes(); 305 } catch (Exception e) { 306 e.printStackTrace(); 307 return null; 308 } 309 if (m.size() == 0) { 310 return null; 311 } 312 StringBuffer sb = new StringBuffer ("dependency Graph: "); 313 List waiters = new ArrayList (m.keySet()); 314 Set s = new HashSet (waiters.size() * 2); 315 s.addAll(m.keySet()); 316 for (Iterator it = ((Collection ) m.values()).iterator(); it.hasNext();) { 317 s.addAll((Collection ) it.next()); 318 } 319 List all = new ArrayList (s); 320 for (int i = 0; i <all.size() ; i++) { 321 Object t1 = all.get(i); 322 int t1Idx = all.indexOf(t1); 323 Collection dependencies = (Collection ) m.get(t1); 324 if (dependencies != null) { 325 for (Iterator it = dependencies.iterator(); it.hasNext();) { 326 sb.append("\nws"); 327 sb.append(t1Idx); 328 sb.append(" = > "); 329 sb.append("ws"); 330 sb.append(all.indexOf(it.next())); 331 } 332 } 333 } 334 return sb.toString(); 335 } 336 337 } 338 | Popular Tags |