KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > chat > business > Discussion


1 package chat.business;
2
3 import java.util.Vector JavaDoc;
4 import java.util.Enumeration JavaDoc;
5
6
7 /**
8  * This class manages the contents of a discussion. Messages may be added
9  * at any time. There is an option limit on the size of the message queue,
10  * if so the oldest messages are deleted as new ones arrive. <P>
11  *
12  * Optionally the method startHarvester() may be called, once, to start
13  * a background thread that enforces an age limit on messages. If this
14  * method is not called, then there will be no age limit. <P>
15  *
16  * This class implements what I call the "on hold push" algorithm.
17  * The server (this class) maintains some state (a list of messages).
18  * Clients (ContentsPresentation.po and the web browser) request snapshots of this state.
19  * If the state has changed
20  * since the last time the client asked for a snapshot, then a new
21  * snapshot is sent to the client right away. If the state has not changed
22  * since the last time the client asked for a snapshot, then the request
23  * for a snapshot blocks until the state changes (either by a new message
24  * being added, or messages being deleted). As soon as the client has
25  * processed the snapshot it gets, it immediatly requests a new snapshot. <P>
26  *
27  * This has the following advantages:
28  *
29  * As soon as new information is available, it is sent immediatly to
30  * all interested parties.
31  *
32  * Since a whole snapshot is sent every time, the problem of having
33  * to do "diffs" between the old and new states is avoided. As is the
34  * problem of how to recover when this mechanism breaks down.
35  *
36  * Only standard HTTP is used, so the method is compatible with firewalls,
37  * proxies, all browsers, and even, in the case of an applet,
38  * paranoid security policies. (HTTP requests are commonly
39  * blocked on a read in real life. But usually it is a filesystem read,
40  * and the delay is much shorter.) For an example of this, try running
41  * this application in the Enhydra Multiserver, and then add a WAI
42  * connection. Your requests are being tunneled via CORBA from the
43  * Netscape server to the Multiserver, yet the application runs fine.
44  *
45  * Because the interested parties all have already established a socket
46  * connection, and are poised and ready to read data, they recieve the
47  * new state the instant it changes, pretty much as quickly as is possible.
48  *
49  * It is simple, robust and effective.
50  *
51  * This has the following disadvantages:
52  *
53  * If you have N users, you will have N open socket connections. This
54  * method does not scale well to very large values of N. If N is too large,
55  * some users will experience network errors, and their automatic
56  * reloading will stop. See your operating system for details on the
57  * maximum number of open sockets, and the optional NumThreads setting
58  * for connections in the Enhydra Multiserver config file.
59  *
60  * The whole state is sent every time. This uses more bandwidth than
61  * methods that only send deltas to the state.
62  *
63  * If the clients stop asking for snapshots, there is no way (from the
64  * server) to do anything about it.
65  *
66  * In order to tell if a client has a current copy of the state of things,
67  * timestamps are used. Every time the state changes, a unique identifier
68  * is assigned to the new current state (using time). The important thing
69  * is that the identifiers uniquely identify their state. Time is just
70  * one way of generating unique identifiers. Every time the client asks
71  * for a snapshot, it also sends the identifier for the state it currently
72  * has (there must be a special identifier used for the first request, when
73  * the client has no data. In this program it sends 0). If the id sent with
74  * the request matches, then the request blocks until the state changes.
75  *
76  * In real life most connections to the server use TCP/IP, which imposes
77  * a timeout limit on how long requests can block on a read.
78  * More importantly, web browsers will give up after a short period of time.
79  * Therefore, when a request is sent in, in addition to the current id a
80  * time limit is sent. This tells the server the maximum number of seconds
81  * the client is willing to wait (in this program 60 seconds is used).
82  * If this time limit expires, and the state has not yet changed, the
83  * server must return an answer to the client. In this program a snapshot
84  * is returned, even though the browser already has the same data
85  * (if the client were an applet, a special "no change" response could be
86  * sent). If a client wants to force an update, or wants to do a
87  * non-blocking read for some other reason, then it simply sets the timeout
88  * to zero.
89  *
90  *
91  * Static methods are used because there is only one chat room. This
92  * program's main goal is to show off a simple yet "real" Enhydra application.
93  */

94 public class Discussion implements Runnable JavaDoc {
95
96     // The current list of Message objects, in the order they were recieved.
97
static private Vector JavaDoc contents = new Vector JavaDoc();
98
99     // The total number of messages added to this discussion.
100
static private long totalReceived = 0;
101  
102     // The unique identifier for the current state.
103
static private long lastStateChangeTime = System.currentTimeMillis();
104
105     // The current number of clients blocked on a read.
106
static private int numWaiting = 0;
107
108     // Only create at most one instance. This is only used by the thread.
109
static private Discussion singleton;
110
111     // The thread that deletes old messages. This will only be started if
112
// startHarvester() is called (which is optional).
113
static private Thread JavaDoc harvester;
114  
115     // Default settings for the thread. Will be overwritten by startHarvester().
116
static private int lifetimeSec = 300;
117     static private int intervalSec = 10;
118
119     // Maximum allowed number of messages. 0 means no limit.
120
public static int maxQueueSize = 200;
121  
122
123     /**
124      * Do not allow instances. Use the static methods.
125      * Only one instance is ever created, and that is just for the thread.
126      */

127     private Discussion() {
128     }
129
130
131     /**
132      * Add a message to the discussion. Any waiting clients will be
133      * instantly notified.
134      */

135     public static void addMessage(String JavaDoc name, String JavaDoc text) {
136         MessageImpl msg = new MessageImpl(name, text);
137         synchronized (contents) {
138             // Add it to the list.
139
totalReceived++;
140             contents.addElement(msg);
141             // If there are too many, delete the oldest ones.
142
while ((maxQueueSize > 0) && (contents.size() > maxQueueSize))
143                 contents.removeElementAt(0);
144             // Pick a new unique state identifier.
145
// Wake up all the clients who are blocked on read.
146
updateCurrentState();
147         }
148     }
149
150
151
152     /**
153      * Create a new unique identifier for the current state.
154      * Then wake up all the clients who are blocked on read.
155      * This uses the current time, but it could be based on the content, or
156      * a counter. The important thing is that the new identifier is one
157      * that has never been used before.
158      */

159     static private void updateCurrentState() {
160         long now = System.currentTimeMillis();
161         synchronized (contents) {
162             // Be sure that the new time will be at least 1 msec greater
163
// than the old time.
164
if (now <= lastStateChangeTime)
165                 // Extremely rare.
166
lastStateChangeTime++;
167             else
168                 // Most of the time.
169
lastStateChangeTime = now;
170             // Wake up any pending readers.
171
contents.notifyAll();
172         }
173     }
174
175
176
177     /**
178      * Throw out all the current messages.
179      * All the waiting clients will be immediatly notified.
180      */

181     static public void clear() {
182         synchronized (contents) {
183             contents.removeAllElements();
184             updateCurrentState();
185         }
186     }
187
188
189     /**
190      * How many messages have been recieved in total?
191      */

192     static public long getTotalReceived() {
193         return totalReceived;
194     }
195
196
197     /**
198      * How many messages are there right now in the list?
199      */

200     static public long getCurrentSize() {
201         return contents.size();
202     }
203
204
205     /**
206      * Get a snapshot of the current state. Might block.
207      *
208      * @param currentState
209      * The state identifier that was returned last time this was called.
210      * This is the id of the state that the client currently has. It is
211      * asking for a snapshot of the state after things change and
212      * this id is not longer current. If this has already happened, the
213      * call will immediatly return. If it has not happned, the call will
214      * block (not return) until things change.
215      * @param wait
216      * The maxmimum number of seconds this call is allowed to block for.
217      * Send 0 for an instant response. If the call blocks and then
218      * runs out of time, a snapshot is returned.
219      * @return
220      * A Snapshot object. This is just a way to return two things at
221      * once: a Vector of Message objects and a state identifier.
222      * The client should use the state identifier for the next call to
223      * this method. The client should call this method again as soon as
224      * it is done displaying the results.
225      */

226     public static SnapshotImpl getContents(long currentState, long wait) {
227         waitForNewMessage(currentState, wait);
228         return new SnapshotImpl((Vector JavaDoc) contents.clone(), lastStateChangeTime);
229     }
230
231
232
233     /**
234      * Helper function. Only does the waiting. Waits till browserState
235      * is not the current state, or for wait seconds, whichever comes
236      * first. This function will return immediatly if no waiting is
237      * needed. While waiting the thread is sleeping, not kept running
238      * in a loop (which would waste processor time).
239      */

240     private static void waitForNewMessage(long browserState, long wait) {
241         if ((browserState != lastStateChangeTime) || (wait <= 0))
242             // Already new data to report.
243
return;
244         long now = System.currentTimeMillis();
245         long giveUpTime = now + (wait * 1000);
246         // We loop here because the wait might return prematurely.
247
while (browserState == lastStateChangeTime) {
248             long leftToGo = giveUpTime - now;
249             if (leftToGo <= 0)
250                 // Either the wait time was 0 or we ran out of time.
251
break;
252             synchronized (contents) {
253                 // Keep track of how many threads are waiting. This is done
254
// by dead reckoning, so care must be taken to make sure the
255
// counter doesn't get off.
256
numWaiting ++;
257                 try {
258                     contents.wait(leftToGo);
259                 } catch (InterruptedException JavaDoc e) {
260                 } finally {
261                     numWaiting--;
262                 }
263             }
264             now = System.currentTimeMillis();
265         }
266     }
267
268
269     /**
270      * How may clients are blocked on a read? This is, effectivly, the
271      * number of people participating in the discussion.
272      */

273     public static int getNumWaiting() {
274         return numWaiting;
275     }
276
277
278
279     /**
280      * Internal use only. This is provided only for the thread that deletes
281      * messages that are too old.
282      */

283     public void run() {
284         while (true) {
285             /*
286              * Sleep for a while.
287              */

288             long now = System.currentTimeMillis();
289             long wakeUp = now + (intervalSec * 1000);
290             while (now < wakeUp) {
291                 long leftToGo = wakeUp - now;
292                 if (leftToGo <= 0)
293                     break;
294                 try {
295                     Thread.sleep(leftToGo);
296                 } catch (InterruptedException JavaDoc e) {
297                 }
298                 now = System.currentTimeMillis();
299             }
300             /*
301              * Now make a pass through the message list and delete any
302              * that are too old. Keep track of whether or not we actually
303              * delete any.
304              */

305             boolean dirty = false;
306             synchronized (contents) {
307                 long expires = System.currentTimeMillis() -
308                                (1000 * lifetimeSec);
309                 Enumeration JavaDoc e = contents.elements();
310                 while (e.hasMoreElements()) {
311                     MessageImpl msg = (MessageImpl) e.nextElement();
312                     if (msg == null)
313                         continue;
314                     if (msg.getWhen() < expires) {
315                         // We found an old one! Remove it.
316
contents.removeElement(msg);
317                         dirty = true;
318                     }
319                 }
320             }
321             /*
322              * If we just changed the contents of the list, then we
323              * need to get a new state identifier and wake up all the
324              * clients.
325              */

326             if (dirty) {
327                 synchronized (contents) {
328                     updateCurrentState();
329                 }
330             }
331             // Go back to sleep...
332
}
333     }
334
335
336     /**
337      * Call this function (only once) if you want to start the
338      * harverter thread. It runs in the background and periodically
339      * deletes messages that are too old. Most of the time the thread
340      * is sleeping. If this method is not called, then no age limit on
341      * messages will be enforced. <P>
342      *
343      * The longest a message can live is lifetimeSec + intervalSec seconds
344      * (in the extreme case). <P>
345      *
346      * The default value for the interval is a little shorter than the
347      * default value for the browser's timeout. This is so that if the
348      * browser is left idle (and messages start being deleted), the
349      * refresh cycle will sync up with this threads interval, and
350      * fewer updates will happen (delete, delete, delete... instead of
351      * timout, delete, timeout, delete, timeout, delete...). It's not
352      * a big deal, but hey...
353      *
354      * @param lifetimeSec
355      * How long should messages be kept for (seconds).
356      * @param
357      */

358     static public void startHarvester(int lifetimeSec, int intervalSec) {
359         // Be sure we don't start two threads.
360
if ((harvester == null) || !harvester.isAlive()) {
361             Discussion.lifetimeSec = lifetimeSec;
362             Discussion.intervalSec = intervalSec;
363             if (singleton == null)
364                 singleton = new Discussion();
365             harvester = new Thread JavaDoc(singleton);
366             harvester.start();
367         }
368     }
369 static public void stopHarvester(){
370    if ((harvester != null) || harvester.isAlive()) {
371              harvester.stop();
372      }
373     
374 }
375
376 }
377
378      
379
Popular Tags