KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > prevayler > implementation > replication > ClientPublisher


1 //Prevayler(TM) - The Free-Software Prevalence Layer.
2
//Copyright (C) 2001-2003 Klaus Wuestefeld
3
//This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
4

5 package org.prevayler.implementation.replication;
6
7 import java.io.IOException JavaDoc;
8 import java.io.ObjectInputStream JavaDoc;
9 import java.io.ObjectOutputStream JavaDoc;
10 import java.net.Socket JavaDoc;
11 import java.util.Date JavaDoc;
12
13 import org.prevayler.Clock;
14 import org.prevayler.Transaction;
15 import org.prevayler.implementation.clock.BrokenClock;
16 import org.prevayler.implementation.publishing.TransactionPublisher;
17 import org.prevayler.implementation.publishing.TransactionSubscriber;
18
19
20 /** Reserved for future implementation.
21  */

22 public class ClientPublisher implements TransactionPublisher {
23
24     private final BrokenClock _clock = new BrokenClock();
25
26     private TransactionSubscriber _subscriber;
27     private final Object JavaDoc _upToDateMonitor = new Object JavaDoc();
28
29     private Transaction _myTransaction;
30     private final Object JavaDoc _myTransactionMonitor = new Object JavaDoc();
31     private RuntimeException JavaDoc _myTransactionRuntimeException;
32     private Error JavaDoc _myTransactionError;
33
34     private final ObjectOutputStream JavaDoc _toServer;
35     private final ObjectInputStream JavaDoc _fromServer;
36
37
38     public ClientPublisher(String JavaDoc serverIpAddress, int serverPort) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
39         System.out.println("The replication logic is still under development.");
40         Socket JavaDoc socket = new Socket JavaDoc(serverIpAddress, serverPort);
41         _toServer = new ObjectOutputStream JavaDoc(socket.getOutputStream()); // Get the OUTPUT stream first. JDK 1.3.1_01 for Windows will lock up if you get the INPUT stream first.
42
_fromServer = new ObjectInputStream JavaDoc(socket.getInputStream());
43         startListening();
44     }
45
46
47     private void startListening() {
48         Thread JavaDoc listener = new Thread JavaDoc() {
49             public void run() {
50                 try {
51                     while (true) receiveTransactionFromServer();
52                 } catch (Exception JavaDoc ex) {
53                     ex.printStackTrace();
54                 }
55             }
56         };
57         listener.setDaemon(true);
58         listener.start();
59     }
60
61
62     public synchronized void addSubscriber(TransactionSubscriber subscriber, long initialTransaction) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
63         if (_subscriber != null) throw new UnsupportedOperationException JavaDoc("The current implementation can only support one subscriber. Future implementations will support more.");
64         _subscriber = subscriber;
65         synchronized (_upToDateMonitor) {
66             _toServer.writeObject(new Long JavaDoc(initialTransaction));
67             wait(_upToDateMonitor);
68         }
69     }
70
71
72     public void removeSubscriber(TransactionSubscriber subscriber) {
73         throw new UnsupportedOperationException JavaDoc("Removing subscribers is not yet supported by the current implementation.");
74     }
75
76
77     public synchronized void publish(Transaction transaction) {
78         if (_subscriber == null) throw new IllegalStateException JavaDoc("To publish a transaction, this ClientPublisher needs a registered subscriber.");
79         synchronized (_myTransactionMonitor) {
80             _myTransaction = transaction;
81             
82             try {
83                 _toServer.writeObject(transaction);
84             } catch (IOException JavaDoc iox) {
85                 iox.printStackTrace();
86                 while (true) Thread.yield();
87             }
88             wait(_myTransactionMonitor);
89             
90             throwEventualErrors();
91         }
92     }
93
94
95     private void throwEventualErrors() throws RuntimeException JavaDoc, Error JavaDoc {
96         try {
97             if (_myTransactionRuntimeException != null) throw _myTransactionRuntimeException;
98             if (_myTransactionError != null) throw _myTransactionError;
99         } finally {
100             _myTransactionRuntimeException = null;
101             _myTransactionError = null;
102         }
103     }
104
105
106     private void receiveTransactionFromServer() throws IOException JavaDoc, ClassNotFoundException JavaDoc {
107         Object JavaDoc transactionCandidate = _fromServer.readObject();
108         
109         if (transactionCandidate.equals(ServerConnection.SUBSCRIBER_UP_TO_DATE)) {
110             synchronized (_upToDateMonitor) { _upToDateMonitor.notify(); }
111             return;
112         }
113
114         if (transactionCandidate instanceof RuntimeException JavaDoc) {
115             _myTransactionRuntimeException = (RuntimeException JavaDoc)transactionCandidate;
116             notifyMyTransactionMonitor();
117             return;
118         }
119         if (transactionCandidate instanceof Error JavaDoc) {
120             _myTransactionError = (Error JavaDoc)transactionCandidate;
121             notifyMyTransactionMonitor();
122             return;
123         }
124
125         Date JavaDoc timestamp = (Date JavaDoc)_fromServer.readObject();
126         _clock.advanceTo(timestamp);
127
128         if (transactionCandidate.equals(ServerConnection.CLOCK_TICK)) return;
129
130         if (transactionCandidate.equals(ServerConnection.REMOTE_TRANSACTION)) {
131             _subscriber.receive(_myTransaction, timestamp);
132             notifyMyTransactionMonitor();
133             return;
134         }
135
136         _subscriber.receive((Transaction)transactionCandidate, timestamp);
137     }
138
139
140     private static void wait(Object JavaDoc monitor) {
141         try {
142             monitor.wait();
143         } catch (InterruptedException JavaDoc ix) {
144             throw new RuntimeException JavaDoc("Unexpected InterruptedException.");
145         }
146     }
147
148
149     private void notifyMyTransactionMonitor() {
150         synchronized (_myTransactionMonitor) {
151             _myTransactionMonitor.notify();
152         }
153     }
154
155
156     public Clock clock() {
157         return _clock;
158     }
159
160
161     public void close() throws IOException JavaDoc {
162         _fromServer.close();
163         _toServer.close();
164     }
165
166 }
167
Popular Tags