1 5 package org.prevayler.implementation.publishing; 6 7 import java.io.IOException ; 8 import java.util.Date ; 9 10 import org.prevayler.Clock; 11 import org.prevayler.Transaction; 12 import org.prevayler.foundation.Cool; 13 import org.prevayler.foundation.Turn; 14 import org.prevayler.implementation.clock.PausableClock; 15 import org.prevayler.implementation.logging.TransactionLogger; 16 import org.prevayler.implementation.publishing.censorship.*; 17 18 public class CentralPublisher extends AbstractPublisher { 19 20 private final PausableClock _pausableClock; 21 private final TransactionCensor _censor; 22 private final TransactionLogger _logger; 23 24 private final Object _pendingSubscriptionMonitor = new Object (); 25 private volatile int _pendingPublications = 0; 26 private final Object _pendingPublicationsMonitor = new Object (); 27 28 private Turn _nextTurn = Turn.first(); 29 private final Object _nextTurnMonitor = new Object (); 30 31 private boolean _foodTasterIsDead = false; 32 private int _pipelinedTransactions = 0; 33 private final Object _pipelinedTransactionsMonitor = new Object (); 34 35 36 public CentralPublisher(Clock clock, TransactionCensor censor, TransactionLogger logger) { 37 super(new PausableClock(clock)); 38 _pausableClock = (PausableClock)_clock; 40 _censor = censor; 41 _logger = logger; 42 } 43 44 45 public void publish(Transaction transaction) { 46 synchronized (_pendingSubscriptionMonitor) { synchronized (_pendingPublicationsMonitor) { 48 if (_pendingPublications == 0) _pausableClock.pause(); 49 _pendingPublications++; 50 } 51 } 52 53 try { 54 publishWithoutWorryingAboutNewSubscriptions(transaction); } finally { 56 synchronized (_pendingPublicationsMonitor) { 57 _pendingPublications--; 58 if (_pendingPublications == 0) _pausableClock.resume(); 59 } 60 } 61 } 62 63 64 private void publishWithoutWorryingAboutNewSubscriptions(Transaction transaction) { 65 Turn myTurn = nextTurn(); 66 67 Date executionTime = realTime(myTurn); approve(transaction, executionTime, myTurn); 69 _logger.log(transaction, executionTime, myTurn); 70 notifySubscribers(transaction, executionTime, myTurn); 71 } 72 73 74 private Turn nextTurn() { 75 synchronized (_nextTurnMonitor) { 76 Turn result = _nextTurn; 77 _nextTurn = _nextTurn.next(); 78 return result; 79 } 80 } 81 82 83 private Date realTime(Turn myTurn) { 84 try { 85 myTurn.start(); 86 return _pausableClock.realTime(); 87 } finally { myTurn.end(); } 88 } 89 90 91 private void approve(Transaction transaction, Date executionTime, Turn myTurn) throws RuntimeException , Error { 92 try { 93 myTurn.start(); 94 95 if (_foodTasterIsDead) { 96 synchronized (_pipelinedTransactionsMonitor) { 97 while (_pipelinedTransactions > 0) { 98 Cool.wait(_pipelinedTransactionsMonitor); 99 } 100 } 101 } 102 103 _censor.approve(transaction, executionTime); 104 105 _foodTasterIsDead = false; 106 107 synchronized (_pipelinedTransactionsMonitor) { 108 _pipelinedTransactions++; 109 } 110 111 myTurn.end(); 112 } catch (RuntimeException r) { dealWithError(myTurn); throw r; 113 } catch (Error e) { dealWithError(myTurn); throw e; } 114 } 115 116 117 private void dealWithError(Turn myTurn) { 118 _foodTasterIsDead = true; 119 myTurn.alwaysSkip(); 120 } 121 122 123 private void notifySubscribers(Transaction transaction, Date executionTime, Turn myTurn) { 124 try { 125 myTurn.start(); 126 _pausableClock.advanceTo(executionTime); 127 notifySubscribers(transaction, executionTime); 128 129 synchronized (_pipelinedTransactionsMonitor) { 130 _pipelinedTransactions--; 131 if (_pipelinedTransactions == 0) { 132 _pipelinedTransactionsMonitor.notifyAll(); 133 } 134 } 135 } finally { myTurn.end(); } 136 } 137 138 139 public void addSubscriber(TransactionSubscriber subscriber, long initialTransaction) throws IOException , ClassNotFoundException { 140 synchronized (_pendingSubscriptionMonitor) { 141 while (_pendingPublications != 0) Thread.yield(); 142 143 _logger.update(subscriber, initialTransaction); 144 super.addSubscriber(subscriber); 145 } 146 } 147 148 149 public void close() throws IOException { _logger.close(); } 150 151 } 152 | Popular Tags |