KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > prevayler > implementation > publishing > CentralPublisher


1 //Prevayler(TM) - The Free-Software Prevalence Layer.
2
//Copyright (C) 2001-2003 Klaus Wuestefeld
3
//This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
4

5 package org.prevayler.implementation.publishing;
6
7 import java.io.IOException JavaDoc;
8 import java.util.Date JavaDoc;
9
10 import org.prevayler.Clock;
11 import org.prevayler.Transaction;
12 import org.prevayler.foundation.Cool;
13 import org.prevayler.foundation.Turn;
14 import org.prevayler.implementation.clock.PausableClock;
15 import org.prevayler.implementation.logging.TransactionLogger;
16 import org.prevayler.implementation.publishing.censorship.*;
17
18 public class CentralPublisher extends AbstractPublisher {
19
20     private final PausableClock _pausableClock;
21     private final TransactionCensor _censor;
22     private final TransactionLogger _logger;
23
24     private final Object JavaDoc _pendingSubscriptionMonitor = new Object JavaDoc();
25     private volatile int _pendingPublications = 0;
26     private final Object JavaDoc _pendingPublicationsMonitor = new Object JavaDoc();
27
28     private Turn _nextTurn = Turn.first();
29     private final Object JavaDoc _nextTurnMonitor = new Object JavaDoc();
30
31     private boolean _foodTasterIsDead = false;
32     private int _pipelinedTransactions = 0;
33     private final Object JavaDoc _pipelinedTransactionsMonitor = new Object JavaDoc();
34
35
36     public CentralPublisher(Clock clock, TransactionCensor censor, TransactionLogger logger) {
37         super(new PausableClock(clock));
38         _pausableClock = (PausableClock)_clock; //This is just to avoid casting the inherited _clock every time.
39

40         _censor = censor;
41         _logger = logger;
42     }
43
44
45     public void publish(Transaction transaction) {
46         synchronized (_pendingSubscriptionMonitor) { //Blocks all new publications until the subscription is over.
47
synchronized (_pendingPublicationsMonitor) {
48                 if (_pendingPublications == 0) _pausableClock.pause();
49                 _pendingPublications++;
50             }
51         }
52
53         try {
54             publishWithoutWorryingAboutNewSubscriptions(transaction); // Suggestions for a better method name are welcome. :)
55
} finally {
56             synchronized (_pendingPublicationsMonitor) {
57                 _pendingPublications--;
58                 if (_pendingPublications == 0) _pausableClock.resume();
59             }
60         }
61     }
62
63
64     private void publishWithoutWorryingAboutNewSubscriptions(Transaction transaction) {
65         Turn myTurn = nextTurn();
66
67         Date JavaDoc executionTime = realTime(myTurn); //TODO realTime() and approve in the same turn.
68
approve(transaction, executionTime, myTurn);
69         _logger.log(transaction, executionTime, myTurn);
70         notifySubscribers(transaction, executionTime, myTurn);
71     }
72
73
74     private Turn nextTurn() {
75         synchronized (_nextTurnMonitor) {
76             Turn result = _nextTurn;
77             _nextTurn = _nextTurn.next();
78             return result;
79         }
80     }
81
82
83     private Date JavaDoc realTime(Turn myTurn) {
84         try {
85             myTurn.start();
86             return _pausableClock.realTime();
87         } finally { myTurn.end(); }
88     }
89
90
91     private void approve(Transaction transaction, Date JavaDoc executionTime, Turn myTurn) throws RuntimeException JavaDoc, Error JavaDoc {
92         try {
93             myTurn.start();
94
95             if (_foodTasterIsDead) {
96                 synchronized (_pipelinedTransactionsMonitor) {
97                     while (_pipelinedTransactions > 0) {
98                         Cool.wait(_pipelinedTransactionsMonitor);
99                     }
100                 }
101             }
102
103             _censor.approve(transaction, executionTime);
104
105             _foodTasterIsDead = false;
106
107             synchronized (_pipelinedTransactionsMonitor) {
108                 _pipelinedTransactions++;
109             }
110
111             myTurn.end();
112         } catch (RuntimeException JavaDoc r) { dealWithError(myTurn); throw r;
113         } catch (Error JavaDoc e) { dealWithError(myTurn); throw e; }
114     }
115
116     
117     private void dealWithError(Turn myTurn) {
118         _foodTasterIsDead = true;
119         myTurn.alwaysSkip();
120     }
121
122
123     private void notifySubscribers(Transaction transaction, Date JavaDoc executionTime, Turn myTurn) {
124         try {
125             myTurn.start();
126             _pausableClock.advanceTo(executionTime);
127             notifySubscribers(transaction, executionTime);
128
129             synchronized (_pipelinedTransactionsMonitor) {
130                 _pipelinedTransactions--;
131                 if (_pipelinedTransactions == 0) {
132                     _pipelinedTransactionsMonitor.notifyAll();
133                 }
134             }
135         } finally { myTurn.end(); }
136     }
137
138
139     public void addSubscriber(TransactionSubscriber subscriber, long initialTransaction) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
140         synchronized (_pendingSubscriptionMonitor) {
141             while (_pendingPublications != 0) Thread.yield();
142             
143             _logger.update(subscriber, initialTransaction);
144             super.addSubscriber(subscriber);
145         }
146     }
147
148
149     public void close() throws IOException JavaDoc { _logger.close(); }
150
151 }
152
Popular Tags