1 21 22 package com.rift.coad.util.change; 24 25 import java.io.Serializable ; 27 import java.io.File ; 28 import java.io.InputStream ; 29 import java.io.IOException ; 30 import java.io.FileInputStream ; 31 import java.io.FileOutputStream ; 32 import java.io.ObjectInputStream ; 33 import java.io.ObjectOutputStream ; 34 import java.io.ObjectStreamClass ; 35 import java.util.ArrayList ; 36 import java.util.HashMap ; 37 import java.util.Iterator ; 38 import java.util.List ; 39 import java.util.Map ; 40 import java.util.Queue ; 41 import java.util.concurrent.ConcurrentHashMap ; 42 import java.util.concurrent.ConcurrentLinkedQueue ; 43 import javax.naming.Context ; 44 import javax.naming.InitialContext ; 45 import javax.transaction.UserTransaction ; 46 import javax.transaction.xa.XAException ; 47 import javax.transaction.xa.XAResource ; 48 import javax.transaction.xa.Xid ; 49 50 import org.apache.log4j.Logger; 52 53 import com.rift.coad.lib.configuration.Configuration; 55 import com.rift.coad.lib.configuration.ConfigurationException; 56 import com.rift.coad.lib.configuration.ConfigurationFactory; 57 import com.rift.coad.lib.thread.CoadunationThread; 58 import com.rift.coad.lib.thread.ThreadStateMonitor; 59 import com.rift.coad.util.transaction.TransactionManager; 60 import com.rift.coad.util.transaction.UserTransactionWrapper; 61 62 68 public class ChangeLog implements XAResource { 69 70 74 public static class ClassLoaderObjectInputStream extends ObjectInputStream { 75 80 public ClassLoaderObjectInputStream() throws IOException { 81 super(); 82 } 83 84 85 91 public ClassLoaderObjectInputStream(InputStream in) throws IOException { 92 super(in); 93 } 94 95 96 104 protected Class resolveClass(ObjectStreamClass desc) throws IOException , 105 ClassNotFoundException { 106 try { 107 return Class.forName(desc.getName()); 108 } catch (Exception ex) { 109 return Thread.currentThread().getContextClassLoader().loadClass( 110 desc.getName()); 111 } 112 } 113 114 } 115 116 117 120 public class ChangeLogProcessor extends CoadunationThread { 121 122 private ThreadStateMonitor state = new ThreadStateMonitor(); 124 private Context context = null; 125 private UserTransactionWrapper utw = null; 126 private boolean process = false; 127 128 133 public ChangeLogProcessor() throws Exception { 134 try { 135 utw = new UserTransactionWrapper(); 136 } catch (Exception ex) { 137 throw new ChangeException( 138 "Failed to init the change log processor : " + 139 ex.getMessage(),ex); 140 } 141 } 142 143 144 149 public void process() throws Exception { 150 synchronized(this) { 151 if (process == false) { 152 try { 153 wait(); 154 } catch(Exception ex) { 155 log.error("Wait threw and exception : " + 156 ex.getMessage(),ex); 157 } 158 } 159 } 160 while(!state.isTerminated()) { 161 ChangeEntry change = null; 162 synchronized(changes) { 163 change = (ChangeEntry)changes.poll(); 164 if (change == null) { 165 try { 166 changes.wait(500); 167 } catch (Exception ex) { 168 log.error("Failed to wait : " + ex.getMessage(),ex); 169 } 170 continue; 171 } 172 } 173 while(true) { 174 try { 175 utw.begin(); 176 change.applyChanges(); 177 utw.commit(); 178 break; 179 } catch (Exception ex) { 180 log.error("Failed to apply the changes : " + 181 ex.getMessage(),ex); 182 } finally { 183 utw.release(); 184 } 185 try { 186 Thread.sleep(1000); 187 } catch(Exception ex2) { 188 log.error("Failed to back off : " + 189 ex2.getMessage(),ex2); 190 } 191 } 192 } 193 } 194 195 196 200 public void terminate() { 201 state.terminate(true); 202 synchronized(this) { 203 notify(); 204 } 205 } 206 207 208 211 public synchronized void startProcessing() { 212 process = true; 213 notify(); 214 } 215 } 216 217 220 public static class ChangeEntry implements Serializable { 221 222 private List changes = new ArrayList (); 224 225 226 229 public ChangeEntry() { 230 231 } 232 233 234 240 public void addChange(Change change) { 241 changes.add(change); 242 } 243 244 245 250 public void applyChanges() throws ChangeException { 251 for (Iterator iter = changes.iterator(); iter.hasNext();) { 252 Change change = (Change)iter.next(); 253 change.applyChanges(); 254 } 255 } 256 } 257 258 private final static String USERNAME = "changelog_username"; 260 private final static String DATA_DIR = "changelog_data_dir"; 261 private final static String DATA_FILE = "changelog.dmp"; 262 263 protected Logger log = 265 Logger.getLogger(ChangeLog.class.getName()); 266 267 private static Map singletons = new HashMap (); 269 270 private ThreadStateMonitor state = new ThreadStateMonitor(); 272 private Map changesMap = new ConcurrentHashMap (); 273 private ThreadLocal currentChange = new ThreadLocal (); 274 private ChangeLogProcessor processor = null; 275 private Queue changes = new ConcurrentLinkedQueue (); 276 private String dataDirectory = null; 277 private UserTransactionWrapper utw = null; 278 279 280 285 private ChangeLog(Class configInfo) throws ChangeException { 286 try { 287 utw = new UserTransactionWrapper(); 288 Configuration configuration = ConfigurationFactory.getInstance(). 289 getConfig(configInfo); 290 dataDirectory = configuration.getString(DATA_DIR); 291 loadData(); 292 applyChanges(); 293 processor = new ChangeLogProcessor(); 294 processor.start(configuration.getString(USERNAME)); 295 } catch (Exception ex) { 296 log.error("Failed to instanciate the change " + 297 "log object : " + ex.getMessage(),ex); 298 throw new ChangeException("Failed to instanciate the change " + 299 "log object : " + ex.getMessage(),ex); 300 } 301 } 302 303 304 310 public synchronized static void init(Class configInfo) throws 311 ChangeException { 312 synchronized (singletons) { 313 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 314 if (!singletons.containsKey(loader)) { 315 ChangeLog changeLog = new ChangeLog(configInfo); 316 singletons.put(loader,changeLog); 317 } 318 } 319 } 320 321 322 328 public static ChangeLog getInstance() throws 329 ChangeException { 330 synchronized(singletons) { 331 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 332 ChangeLog changeLog = (ChangeLog)singletons.get(loader); 333 if (changeLog == null) { 334 throw new ChangeException( 335 "The change log has not been instanciated."); 336 } 337 return changeLog; 338 } 339 } 340 341 342 345 public static void terminate() throws 346 ChangeException { 347 ChangeLog changeLog = null; 348 synchronized(singletons) { 349 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 350 changeLog = (ChangeLog)singletons.get(loader); 351 if (changeLog == null) { 352 throw new ChangeException( 353 "The change log has not been instanciated."); 354 } 355 singletons.remove(loader); 356 } 357 changeLog.terminateChangeLog(); 358 359 } 360 361 364 protected void terminateChangeLog() { 365 try { 366 state.terminate(false); 367 processor.terminate(); 368 processor.join(); 369 storeData(); 370 } catch (Exception ex) { 371 log.error("Failed to terminate the change log object : " + 372 ex.getMessage(),ex); 373 } 374 } 375 376 377 380 public void start() throws ChangeException { 381 processor.startProcessing(); 382 } 383 384 385 391 public void addChange(Change change) throws ChangeException { 392 if (state.isTerminated()) { 393 log.error("The change log has been terminated cannot accept " + 394 "anymore changes."); 395 throw new ChangeException( 396 "The change log has been terminated cannot accept " + 397 "anymore changes."); 398 } 399 try { 400 TransactionManager.getInstance().bindResource(this,false); 401 ChangeEntry changeEntry = (ChangeEntry)currentChange.get(); 402 changeEntry.addChange(change); 403 } catch (Exception ex) { 404 log.error("Failed to add the change to the list :" 405 + ex.getMessage(),ex); 406 throw new ChangeException("Failed to add the change to the list :" 407 + ex.getMessage(),ex); 408 } 409 } 410 411 412 419 public void commit(Xid xid, boolean b) throws XAException { 420 synchronized (changes) { 421 changes.add(changesMap.remove(xid)); 422 changes.notify(); 423 } 424 } 425 426 427 434 public void end(Xid xid, int i) throws XAException { 435 } 436 437 438 444 public void forget(Xid xid) throws XAException { 445 changesMap.remove(xid); 446 } 447 448 449 455 public int getTransactionTimeout() throws XAException { 456 return -1; 457 } 458 459 460 468 public boolean isSameRM(XAResource xAResource) throws XAException { 469 return this == xAResource; 470 } 471 472 473 480 public int prepare(Xid xid) throws XAException { 481 return XAResource.XA_OK; 482 } 483 484 485 493 public Xid [] recover(int i) throws XAException { 494 return null; 495 } 496 497 498 504 public void rollback(Xid xid) throws XAException { 505 changesMap.remove(xid); 506 } 507 508 509 516 public boolean setTransactionTimeout(int i) throws XAException { 517 return true; 518 } 519 520 521 528 public void start(Xid xid, int i) throws XAException { 529 if (changesMap.containsKey(xid)) { 530 currentChange.set(changesMap.get(xid)); 531 } else { 532 ChangeEntry changeEntry = new ChangeEntry(); 533 changesMap.put(xid,changeEntry); 534 currentChange.set(changeEntry); 535 } 536 } 537 538 539 542 private void loadData() throws ChangeException { 543 try { 544 File dataFile = new File (dataDirectory,DATA_FILE); 545 if (!dataFile.exists()) { 546 return; 547 } 548 FileInputStream in = new FileInputStream (dataFile); 549 ClassLoaderObjectInputStream ois = new 550 ClassLoaderObjectInputStream(in); 551 changes = (ConcurrentLinkedQueue )ois.readObject(); 552 ois.close(); 553 in.close(); 554 } catch (Exception ex) { 555 log.error("Failed to load the data : " + 556 ex.getMessage(),ex); 557 throw new ChangeException("Failed to load the data : " + 558 ex.getMessage(),ex); 559 } 560 } 561 562 563 566 private void storeData() throws ChangeException { 567 try { 568 File dataFile = new File (dataDirectory,DATA_FILE); 569 if (changes.size() == 0) { 570 if (dataFile.exists()) { 572 dataFile.delete(); 573 } 574 return; 575 } 576 FileOutputStream out = new FileOutputStream (dataFile); 577 ObjectOutputStream oos = new ObjectOutputStream (out); 578 oos.writeObject(changes); 579 oos.close(); 580 out.close(); 581 } catch (Exception ex) { 582 log.error("Failed to store the data : " + 583 ex.getMessage(),ex); 584 throw new ChangeException("Failed to store the data : " + 585 ex.getMessage(),ex); 586 } 587 } 588 589 590 593 private void applyChanges() throws ChangeException { 594 log.info("Applying changes from change log"); 595 while(changes.size() > 0) { 596 ChangeEntry change = (ChangeEntry)changes.poll(); 597 while(true) { 598 try { 599 utw.begin(); 600 change.applyChanges(); 601 utw.commit(); 602 break; 603 } catch (Exception ex) { 604 log.error("Failed to apply the changes : " + 605 ex.getMessage(),ex); 606 } finally { 607 utw.release(); 608 } 609 try { 610 Thread.sleep(1000); 611 } catch(Exception ex2) { 612 log.error("Failed to back off : " + 613 ex2.getMessage(),ex2); 614 } 615 } 616 } 617 log.info("After applying changes from change log"); 618 } 619 } 620 | Popular Tags |