1 18 package org.objectweb.speedo.runtime.query; 19 20 import org.objectweb.speedo.SpeedoTestHelper; 21 import org.objectweb.speedo.pobjects.basic.BasicA; 22 import org.objectweb.util.monolog.api.BasicLevel; 23 import org.objectweb.util.monolog.api.Logger; 24 25 import java.util.ArrayList ; 26 import java.util.Arrays ; 27 import java.util.Collection ; 28 import java.util.Iterator ; 29 import java.util.List ; 30 31 import javax.jdo.JDOHelper; 32 import javax.jdo.PersistenceManager; 33 import javax.jdo.Query; 34 35 36 41 public class TestCreateDeleteDuringQueries extends SpeedoTestHelper { 42 43 private static long queryExecTime = 0; 44 private static long nbqueryok = 0; 45 private static long nbcreatedone = 0; 46 private static long nbdeletedone = 0; 47 private static long errors = 0; 48 private static long nbprint = 500; 49 private static int nbRunnerThread = 0; 50 51 public static final List pos = new ArrayList (); 52 public final List removed = new ArrayList (); 53 public final static int[] finishedThreads = new int[1]; 54 private synchronized static void queryError(Logger logger, Throwable t) { 55 logger.log(BasicLevel.ERROR, "", t); 56 errors ++; 57 } 58 private synchronized static void queryOk(Logger logger, int threadId) { 59 nbqueryok++; 60 if ((nbqueryok % nbprint) == 0) { 61 printStat(logger, threadId); 62 } 63 } 64 private static void printStat(Logger logger, int threadId) { 65 int size = 0; 66 synchronized (pos) { 67 size = pos.size(); 68 } 69 logger.log(BasicLevel.INFO, "Thread " + threadId + ": " 70 + " / Que:" + nbqueryok 71 + " / Cre:" + nbcreatedone 72 + " / Del:" + nbdeletedone 73 + " / PO:" + size 74 + " / delta:" + (nbcreatedone - nbdeletedone - size) 75 + " / Err:" + errors 76 + " / F:" + finishedThreads[0] + "/" + nbRunnerThread 77 ); 78 } 79 80 private synchronized static void creationDone(Logger logger, int threadId) { 81 nbcreatedone++; 82 if ((nbcreatedone % nbprint) == 0) { 83 printStat(logger, threadId); 84 } 85 } 86 87 private synchronized static void deletionDone(Logger logger, int threadId) { 88 nbdeletedone++; 89 if ((nbdeletedone % nbprint) == 0) { 90 printStat(logger, threadId); 91 } 92 } 93 94 public TestCreateDeleteDuringQueries() { 95 super("TestCreateDeleteDuringQueries"); 96 } 97 98 public TestCreateDeleteDuringQueries(String n) { 99 super(n); 100 } 101 102 protected String getLoggerName() { 103 return SpeedoTestHelper.LOG_NAME + ".query.TestCreateDeleteDuringQueries"; 104 } 105 106 public void testCreateDeleteDuringQueries1() { 107 final int createNbThread = getIntProperty(getLoggerName() +".create.nbthread", 5); 108 final int createNbLoop = getIntProperty(getLoggerName() +".create.loop", 1000); 109 final int deleteNbThread = getIntProperty(getLoggerName() +".delete.nbthread", 5); 110 final int deleteNbLoop = getIntProperty(getLoggerName() +".delete.loop", 1000); 111 final int queryNbThread = getIntProperty(getLoggerName() +".query.nbthread", 3); 112 testCreateDeleteDuringQueries1( 113 createNbThread, 114 createNbLoop, 115 deleteNbThread, 116 deleteNbLoop, 117 queryNbThread); 118 } 119 120 121 public void testCreateDeleteDuringQueries1( 122 final int createNbThread, 123 final int createNbLoop, 124 final int deleteNbThread, 125 final int deleteNbLoop, 126 final int queryNbThread) { 127 logger.log(BasicLevel.INFO, "TestCreateDeleteDuringQueries1: " 128 + "\n\t-create: " + createNbThread + " thread(s) will do " + createNbLoop + " action(s)" 129 + "\n\t-delete: " + deleteNbThread + " thread(s) will do " + deleteNbLoop + " action(s)" 130 + "\n\t-query: " + queryNbThread + " thread(s)" 131 ); 132 int nbTotalThread = createNbThread + deleteNbThread + queryNbThread; 133 nbRunnerThread = createNbThread + deleteNbThread; 134 Thread [] ts = new Thread [nbTotalThread]; 135 finishedThreads[0] = 0; 136 int thcounter = 0; 137 for(int threadType=0; threadType<3; threadType++) { 138 switch (threadType) { 139 case 0: 140 for(int i=0; i<createNbThread; i++) { 141 final int threadId = thcounter; 142 ts[thcounter] = new Thread ( 143 new Runnable () { 144 public void run () { 145 try { 146 for (int k = 0; k< createNbLoop; k++) { 147 create(threadId); 148 } 149 } finally { 150 logger.log(BasicLevel.DEBUG, "thread: " + threadId + " has finished to create"); 151 synchronized (finishedThreads) { 152 finishedThreads[0]++; 153 } 154 } 155 } 156 } 157 ); 158 thcounter++; 159 } 160 break; 161 162 case 1: 163 for(int i=0; i<deleteNbThread; i++) { 164 final int threadId = thcounter; 165 ts[thcounter] = new Thread ( 166 new Runnable () { 167 public void run () { 168 try { 169 for (int k = 0; k < createNbLoop; k++) { 170 delete(threadId); 171 } 172 } finally { 173 logger.log(BasicLevel.DEBUG, "thread: " + threadId + " has finished to delete"); 174 synchronized (finishedThreads) { 175 finishedThreads[0]++; 176 } 177 } 178 } 179 } 180 ); 181 thcounter++; 182 } 183 break; 184 case 2: 185 for(int i=0; i<queryNbThread; i++) { 186 final int threadId = thcounter; 187 ts[thcounter] = new Thread ( 188 new Runnable () { 189 public void run () { 190 int fts = 0; 191 do { 192 synchronized (finishedThreads) { 193 fts = finishedThreads[0]; 194 } 195 query(threadId); 196 } while(fts < (createNbThread + deleteNbThread)); 197 } 198 } 199 ); 200 thcounter++; 201 } 202 break; 203 } 204 } 205 List l = Arrays.asList(ts); 207 ts = (Thread []) l.toArray(new Thread [ts.length]); 209 210 long exectime = System.currentTimeMillis(); 212 for(int i=0; i<nbTotalThread; i++) { 213 ts[i].start(); 214 } 215 try { 216 for(int i=0; i<nbTotalThread; i++) { 217 ts[i].join(); 218 } 219 } catch (InterruptedException e) { 220 fail(e.getMessage()); 221 } 222 exectime = System.currentTimeMillis() - exectime; 223 long nbRunQuery = nbqueryok + errors; 224 logger.log(BasicLevel.INFO, "Query successed: " 225 + ((nbqueryok * 100) / nbRunQuery) + "% (" 226 + nbqueryok + "/" + nbRunQuery + ", " 227 + (nbRunQuery - nbqueryok) + " error)"); 228 logger.log(BasicLevel.INFO, "Query average execution time: " 229 + (queryExecTime / nbqueryok) +"ms"); 230 logger.log(BasicLevel.INFO, "Rate: " + ((nbqueryok * 1000) / exectime) + " query/sec"); 231 if (errors > 0) { 232 fail(errors + " errors occured!"); 233 } 234 } 235 236 boolean run = true; 237 Object o = new Object (); 238 239 private void stop() { 240 synchronized(o) { 241 run = false; 242 } 243 } 244 private synchronized void start() { 245 synchronized(o) { 246 run = true; 247 o.notifyAll(); 248 } 249 } 250 private void again() { 251 synchronized(o) { 252 while(!run) { 253 try { 254 o.wait(); 255 } catch (InterruptedException e) { 256 } 257 } 258 } 259 } 260 261 262 public void create(final int threadId) { 263 again(); 264 PersistenceManager pm = pmf.getPersistenceManager(); 265 pm.currentTransaction().begin(); 266 BasicA po = new BasicA(); 267 pm.makePersistent(po); 268 po.setUndeclaredField(pm.getObjectId(po).toString()); 269 pm.currentTransaction().commit(); 270 pm.close(); 271 try { 272 Thread.sleep(10); 273 } catch (InterruptedException e) { 274 } 275 creationDone(logger, threadId); 276 synchronized (pos) { 277 pos.add(po); 278 pos.notifyAll(); 279 } 280 } 281 282 public void delete(final int threadId) { 283 again(); 284 BasicA po = null; 285 synchronized (pos) { 286 while(pos.size() == 0) { 287 try { 288 pos.wait(); 289 } catch (InterruptedException e) { 290 } 291 } 292 po = (BasicA) pos.remove(0); 293 } 294 if (po != null) { 295 PersistenceManager pm = pmf.getPersistenceManager(); 296 pm.currentTransaction().begin(); 297 po.writeF1(); 298 Object oid = pm.getObjectId(po); 299 pm.deletePersistent(po); 300 synchronized (removed) { 301 removed.add(oid); 302 } 303 pm.currentTransaction().commit(); 304 pm.close(); 305 deletionDone(logger, threadId); 306 } 307 } 308 309 public void query(final int threadId) { 310 again(); 311 PersistenceManager pm = pmf.getPersistenceManager(); 312 pm.currentTransaction().begin(); 313 Query q = null; 314 ArrayList oids = new ArrayList (); 315 Object po = null; 316 try { 317 q = pm.newQuery(BasicA.class); 318 Collection res = (Collection ) q.execute(); 319 for (Iterator iter = res.iterator(); iter.hasNext();) { 320 po = iter.next(); 321 assertNotNull("Result element is null", po); 322 Object oid = pm.getObjectId(po); 323 boolean alreadyInRemoved = false; 324 synchronized (removed) { 325 alreadyInRemoved = removed.contains(oid); 326 } 327 oids.add(oid); 328 assertTrue("Object already removed !!!!", !alreadyInRemoved); 329 po = null; 330 } 331 q.closeAll(); 332 q = null; 333 pm.currentTransaction().commit(); 334 pm.close(); 335 pm = null; 336 queryOk(logger, threadId); 337 } catch(Throwable t) { 338 queryError(logger, t); 340 try { 341 JDOHelper.isPersistent(po); 342 } catch(Throwable _t) { 343 } 344 if (q != null) { 345 q.closeAll(); 346 } 347 if (pm != null) { 348 if (pm.currentTransaction().isActive()) { 349 pm.currentTransaction().rollback(); 350 } 351 pm.close(); 352 pm = null; 353 } 354 } 355 oids.clear(); 356 } 357 } 358 | Popular Tags |