1 5 package org.prevayler.implementation.replication; 6 7 import java.io.*; 8 import java.net.Socket ; 9 import java.util.Date ; 10 11 import org.prevayler.Transaction; 12 import org.prevayler.implementation.publishing.*; 13 14 15 17 class ServerConnection extends Thread implements TransactionSubscriber { 18 19 static final String SUBSCRIBER_UP_TO_DATE = "SubscriberUpToDate"; 20 static final String REMOTE_TRANSACTION = "RemoteTransaction"; 21 static final String CLOCK_TICK = "ClockTick"; 22 23 private final TransactionPublisher _publisher; 24 private Transaction _remoteTransaction; 25 26 private final ObjectOutputStream _toRemote; 27 private final ObjectInputStream _fromRemote; 28 29 30 ServerConnection(TransactionPublisher publisher, Socket remoteSocket) throws IOException { 31 _publisher = publisher; 32 _fromRemote = new ObjectInputStream(remoteSocket.getInputStream()); 33 _toRemote = new ObjectOutputStream(remoteSocket.getOutputStream()); 34 setDaemon(true); 35 start(); 36 } 37 38 39 public void run() { 40 try { 41 long initialTransaction = ((Long )_fromRemote.readObject()).longValue(); 42 _publisher.addSubscriber(new POBox(this), initialTransaction); 43 send(SUBSCRIBER_UP_TO_DATE); 44 45 sendClockTicks(); 46 while (true) publishRemoteTransaction(); 47 } catch (Exception ex) { 48 ex.printStackTrace(); 49 } 50 } 51 52 53 private void sendClockTicks() { 54 Thread clockTickSender = new Thread () { 55 public void run() { 56 try { 57 while (true) { 58 synchronized (_toRemote) { 59 _toRemote.writeObject(CLOCK_TICK); 60 _toRemote.writeObject(_publisher.clock().time()); 61 } 62 Thread.sleep(1000); 63 } 64 } catch (Exception ex) { 65 ex.printStackTrace(); 66 } 67 } 68 }; 69 clockTickSender.setDaemon(true); 70 clockTickSender.start(); 71 } 72 73 74 void publishRemoteTransaction() throws Exception { 75 _remoteTransaction = (Transaction)_fromRemote.readObject(); 76 try { 77 _publisher.publish(_remoteTransaction); 78 } catch (RuntimeException rx) { 79 send(rx); 80 } catch (Error error) { 81 send(error); 82 } 83 } 84 85 86 public void receive(Transaction transaction, Date timestamp) { 87 try { 88 synchronized (_toRemote) { 89 _toRemote.writeObject(transaction == _remoteTransaction 90 ? (Object )REMOTE_TRANSACTION 91 : transaction 92 ); 93 _toRemote.writeObject(timestamp); 94 } 95 } catch (Exception ex) { 96 ex.printStackTrace(); 97 } 98 } 99 100 101 private void send(Object object) { 102 synchronized (_toRemote) { 103 try { 104 _toRemote.writeObject(object); 105 } catch (IOException e) { 106 e.printStackTrace(); 107 } 108 } 109 } 110 111 } 112 | Popular Tags |