KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > prevayler > implementation > replication > ServerConnection


1 //Prevayler(TM) - The Free-Software Prevalence Layer.
2
//Copyright (C) 2001-2003 Klaus Wuestefeld
3
//This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
4

5 package org.prevayler.implementation.replication;
6
7 import java.io.*;
8 import java.net.Socket JavaDoc;
9 import java.util.Date JavaDoc;
10
11 import org.prevayler.Transaction;
12 import org.prevayler.implementation.publishing.*;
13
14
15 /** Reserved for future implementation.
16  */

17 class ServerConnection extends Thread JavaDoc implements TransactionSubscriber {
18
19     static final String JavaDoc SUBSCRIBER_UP_TO_DATE = "SubscriberUpToDate";
20     static final String JavaDoc REMOTE_TRANSACTION = "RemoteTransaction";
21     static final String JavaDoc CLOCK_TICK = "ClockTick";
22
23     private final TransactionPublisher _publisher;
24     private Transaction _remoteTransaction;
25
26     private final ObjectOutputStream _toRemote;
27     private final ObjectInputStream _fromRemote;
28
29
30     ServerConnection(TransactionPublisher publisher, Socket JavaDoc remoteSocket) throws IOException {
31         _publisher = publisher;
32         _fromRemote = new ObjectInputStream(remoteSocket.getInputStream());
33         _toRemote = new ObjectOutputStream(remoteSocket.getOutputStream());
34         setDaemon(true);
35         start();
36     }
37
38
39     public void run() {
40         try {
41             long initialTransaction = ((Long JavaDoc)_fromRemote.readObject()).longValue();
42             _publisher.addSubscriber(new POBox(this), initialTransaction);
43             send(SUBSCRIBER_UP_TO_DATE);
44             
45             sendClockTicks();
46             while (true) publishRemoteTransaction();
47         } catch (Exception JavaDoc ex) {
48             ex.printStackTrace();
49         }
50     }
51
52
53     private void sendClockTicks() {
54         Thread JavaDoc clockTickSender = new Thread JavaDoc() {
55             public void run() {
56                 try {
57                     while (true) {
58                         synchronized (_toRemote) {
59                             _toRemote.writeObject(CLOCK_TICK);
60                             _toRemote.writeObject(_publisher.clock().time());
61                         }
62                         Thread.sleep(1000);
63                     }
64                 } catch (Exception JavaDoc ex) {
65                     ex.printStackTrace();
66                 }
67             }
68         };
69         clockTickSender.setDaemon(true);
70         clockTickSender.start();
71     }
72
73
74     void publishRemoteTransaction() throws Exception JavaDoc {
75         _remoteTransaction = (Transaction)_fromRemote.readObject();
76         try {
77             _publisher.publish(_remoteTransaction);
78         } catch (RuntimeException JavaDoc rx) {
79             send(rx);
80         } catch (Error JavaDoc error) {
81             send(error);
82         }
83     }
84
85
86     public void receive(Transaction transaction, Date JavaDoc timestamp) {
87         try {
88             synchronized (_toRemote) {
89                 _toRemote.writeObject(transaction == _remoteTransaction
90                     ? (Object JavaDoc)REMOTE_TRANSACTION
91                     : transaction
92                 );
93                 _toRemote.writeObject(timestamp);
94             }
95         } catch (Exception JavaDoc ex) {
96             ex.printStackTrace();
97         }
98     }
99
100
101     private void send(Object JavaDoc object) {
102         synchronized (_toRemote) {
103             try {
104                 _toRemote.writeObject(object);
105             } catch (IOException e) {
106                 e.printStackTrace();
107             }
108         }
109     }
110
111 }
112
Popular Tags