1 5 package org.prevayler.implementation.replication; 6 7 import java.io.IOException ; 8 import java.io.ObjectInputStream ; 9 import java.io.ObjectOutputStream ; 10 import java.net.Socket ; 11 import java.util.Date ; 12 13 import org.prevayler.Clock; 14 import org.prevayler.Transaction; 15 import org.prevayler.implementation.clock.BrokenClock; 16 import org.prevayler.implementation.publishing.TransactionPublisher; 17 import org.prevayler.implementation.publishing.TransactionSubscriber; 18 19 20 22 public class ClientPublisher implements TransactionPublisher { 23 24 private final BrokenClock _clock = new BrokenClock(); 25 26 private TransactionSubscriber _subscriber; 27 private final Object _upToDateMonitor = new Object (); 28 29 private Transaction _myTransaction; 30 private final Object _myTransactionMonitor = new Object (); 31 private RuntimeException _myTransactionRuntimeException; 32 private Error _myTransactionError; 33 34 private final ObjectOutputStream _toServer; 35 private final ObjectInputStream _fromServer; 36 37 38 public ClientPublisher(String serverIpAddress, int serverPort) throws IOException , ClassNotFoundException { 39 System.out.println("The replication logic is still under development."); 40 Socket socket = new Socket (serverIpAddress, serverPort); 41 _toServer = new ObjectOutputStream (socket.getOutputStream()); _fromServer = new ObjectInputStream (socket.getInputStream()); 43 startListening(); 44 } 45 46 47 private void startListening() { 48 Thread listener = new Thread () { 49 public void run() { 50 try { 51 while (true) receiveTransactionFromServer(); 52 } catch (Exception ex) { 53 ex.printStackTrace(); 54 } 55 } 56 }; 57 listener.setDaemon(true); 58 listener.start(); 59 } 60 61 62 public synchronized void addSubscriber(TransactionSubscriber subscriber, long initialTransaction) throws IOException , ClassNotFoundException { 63 if (_subscriber != null) throw new UnsupportedOperationException ("The current implementation can only support one subscriber. Future implementations will support more."); 64 _subscriber = subscriber; 65 synchronized (_upToDateMonitor) { 66 _toServer.writeObject(new Long (initialTransaction)); 67 wait(_upToDateMonitor); 68 } 69 } 70 71 72 public void removeSubscriber(TransactionSubscriber subscriber) { 73 throw new UnsupportedOperationException ("Removing subscribers is not yet supported by the current implementation."); 74 } 75 76 77 public synchronized void publish(Transaction transaction) { 78 if (_subscriber == null) throw new IllegalStateException ("To publish a transaction, this ClientPublisher needs a registered subscriber."); 79 synchronized (_myTransactionMonitor) { 80 _myTransaction = transaction; 81 82 try { 83 _toServer.writeObject(transaction); 84 } catch (IOException iox) { 85 iox.printStackTrace(); 86 while (true) Thread.yield(); 87 } 88 wait(_myTransactionMonitor); 89 90 throwEventualErrors(); 91 } 92 } 93 94 95 private void throwEventualErrors() throws RuntimeException , Error { 96 try { 97 if (_myTransactionRuntimeException != null) throw _myTransactionRuntimeException; 98 if (_myTransactionError != null) throw _myTransactionError; 99 } finally { 100 _myTransactionRuntimeException = null; 101 _myTransactionError = null; 102 } 103 } 104 105 106 private void receiveTransactionFromServer() throws IOException , ClassNotFoundException { 107 Object transactionCandidate = _fromServer.readObject(); 108 109 if (transactionCandidate.equals(ServerConnection.SUBSCRIBER_UP_TO_DATE)) { 110 synchronized (_upToDateMonitor) { _upToDateMonitor.notify(); } 111 return; 112 } 113 114 if (transactionCandidate instanceof RuntimeException ) { 115 _myTransactionRuntimeException = (RuntimeException )transactionCandidate; 116 notifyMyTransactionMonitor(); 117 return; 118 } 119 if (transactionCandidate instanceof Error ) { 120 _myTransactionError = (Error )transactionCandidate; 121 notifyMyTransactionMonitor(); 122 return; 123 } 124 125 Date timestamp = (Date )_fromServer.readObject(); 126 _clock.advanceTo(timestamp); 127 128 if (transactionCandidate.equals(ServerConnection.CLOCK_TICK)) return; 129 130 if (transactionCandidate.equals(ServerConnection.REMOTE_TRANSACTION)) { 131 _subscriber.receive(_myTransaction, timestamp); 132 notifyMyTransactionMonitor(); 133 return; 134 } 135 136 _subscriber.receive((Transaction)transactionCandidate, timestamp); 137 } 138 139 140 private static void wait(Object monitor) { 141 try { 142 monitor.wait(); 143 } catch (InterruptedException ix) { 144 throw new RuntimeException ("Unexpected InterruptedException."); 145 } 146 } 147 148 149 private void notifyMyTransactionMonitor() { 150 synchronized (_myTransactionMonitor) { 151 _myTransactionMonitor.notify(); 152 } 153 } 154 155 156 public Clock clock() { 157 return _clock; 158 } 159 160 161 public void close() throws IOException { 162 _fromServer.close(); 163 _toServer.close(); 164 } 165 166 } 167 | Popular Tags |