KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > nl > justobjects > pushlet > core > Subscriber


1 // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
2
// Distributable under LGPL license. See terms of license at gnu.org.
3

4 package nl.justobjects.pushlet.core;
5
6 import nl.justobjects.pushlet.util.PushletException;
7 import nl.justobjects.pushlet.util.Rand;
8 import nl.justobjects.pushlet.util.Sys;
9
10 import java.util.Collections JavaDoc;
11 import java.util.HashMap JavaDoc;
12 import java.util.Map JavaDoc;
13
14 /**
15  * Handles data channel between dispatcher and client.
16  *
17  * @version $Id: Subscriber.java,v 1.21 2005/02/28 12:45:59 justb Exp $
18  * @author Just van den Broecke - Just Objects &copy;
19  **/

20 public class Subscriber implements Protocol, ConfigDefs {
21     private Session session;
22
23     /** Blocking queue. */
24     private EventQueue eventQueue = new EventQueue(Config.getIntProperty(QUEUE_SIZE));
25
26     /** URL to be used in refresh requests in pull/poll modes. */
27     private String JavaDoc refreshURL;
28     private long queueReadTimeoutMillis = Config.getLongProperty(QUEUE_READ_TIMEOUT_MILLIS);
29     private long queueWriteTimeoutMillis = Config.getLongProperty(QUEUE_WRITE_TIMEOUT_MILLIS);
30     private long refreshTimeoutMillis = Config.getLongProperty(PULL_REFRESH_TIMEOUT_MILLIS);
31     volatile long lastAlive = Sys.now();
32
33     /** Map of active subscriptions, keyed by their subscription id. */
34     private Map JavaDoc subscriptions = Collections.synchronizedMap(new HashMap JavaDoc(13));
35
36     /** Are we able to accept/send events ?. */
37     private volatile boolean active;
38
39     /** Transfer mode (stream, pull, poll). */
40     private String JavaDoc mode;
41
42     /** Event sequence count. */
43     private long eventSeqNr = 1;
44
45     public Subscriber(Session theSession) {
46         session = theSession;
47     }
48
49     public void activate() {
50         active = true;
51     }
52
53     public void passivate() {
54         active = false;
55     }
56
57     public void bailout() {
58
59         passivate();
60
61         removeSubscriptions();
62
63         session.stop();
64
65     }
66
67     /** Are we still active to handle events. */
68     public boolean isActive() {
69         return active;
70     }
71
72     /** Return client session. */
73     public Session getSession() {
74         return session;
75     }
76
77     /** Get (session) id. */
78     public String JavaDoc getId() {
79         return session.getId();
80     }
81
82     /** Return subscriptions. */
83     public Subscription[] getSubscriptions() {
84         // todo: Optimize
85
return (Subscription[]) subscriptions.values().toArray(new Subscription[0]);
86     }
87
88     /** Add a subscription. */
89     public Subscription addSubscription(String JavaDoc aSubject, String JavaDoc aLabel) {
90         Subscription subscription = new Subscription(aSubject, aLabel);
91         subscriptions.put(subscription.getId(), subscription);
92         info("Subscription added subject=" + aSubject + " sid=" + subscription.getId() + " label=" + aLabel);
93         return subscription;
94     }
95
96     /** Remove a subscription. */
97     public Subscription removeSubscription(String JavaDoc aSubscriptionId) {
98         Subscription subscription = (Subscription) subscriptions.remove(aSubscriptionId);
99         if (subscription == null) {
100             warn("No subscription found sid=" + aSubscriptionId);
101             return null;
102         }
103         info("Subscription removed subject=" + subscription.getSubject() + " sid=" + subscription.getId() + " label=" + subscription.getLabel());
104         return subscription;
105     }
106
107     /** Remove all subscriptions. */
108     public void removeSubscriptions() {
109         subscriptions.clear();
110     }
111
112     public String JavaDoc getMode() {
113         return mode;
114     }
115
116     public void setMode(String JavaDoc aMode) {
117         mode = aMode;
118     }
119
120     public long getRefreshTimeMillis() {
121         String JavaDoc minWaitProperty = PULL_REFRESH_WAIT_MIN_MILLIS;
122         String JavaDoc maxWaitProperty = PULL_REFRESH_WAIT_MAX_MILLIS;
123         if (mode.equals((MODE_POLL))) {
124             minWaitProperty = POLL_REFRESH_WAIT_MIN_MILLIS;
125             maxWaitProperty = POLL_REFRESH_WAIT_MAX_MILLIS;
126
127         }
128         return Rand.randomLong(Config.getLongProperty(minWaitProperty),
129                 Config.getLongProperty(maxWaitProperty));
130     }
131
132     /** Get events from queue and push to client. */
133     public void fetchEvents(Command aCommand) throws PushletException {
134
135         refreshURL = aCommand.httpReq.getRequestURI() + "?" + P_ID + "=" + session.getId() + "&" + P_EVENT + "=" + E_REFRESH;
136
137         // This is the only thing required to support "poll" mode
138
if (mode.equals(MODE_POLL)) {
139             queueReadTimeoutMillis = 0;
140             refreshTimeoutMillis = Config.getLongProperty(POLL_REFRESH_TIMEOUT_MILLIS);
141         }
142
143         // Required for fast bailout (tomcat)
144
aCommand.httpRsp.setBufferSize(128);
145
146         // Try to prevent caching in any form.
147
aCommand.sendResponseHeaders();
148
149         // Let clientAdapter determine how to send event
150
ClientAdapter clientAdapter = aCommand.getClientAdapter();
151         try {
152             clientAdapter.start();
153
154             // Send first event (usually hb-ack or listen-ack)
155
clientAdapter.push(aCommand.getResponseEvent());
156         } catch (Throwable JavaDoc t) {
157             bailout();
158             return;
159         }
160
161
162         Event[] events = null;
163
164         // Main loop: as long as connected, get events and push to client
165

166         while (isActive()) {
167             // Indicate we are still alive
168
lastAlive = Sys.now();
169
170             // Update session time to live
171
session.kick();
172
173             // Get next events; blocks until timeout or entire contents
174
// of event queue is returned. Note that "poll" mode
175
// will return immediately when queue is empty.
176
try {
177                 events = eventQueue.deQueueAll(queueReadTimeoutMillis);
178             } catch (InterruptedException JavaDoc ie) {
179                 warn("interrupted");
180                 bailout();
181             }
182
183             // Send heartbeat when no events received
184
if (events == null) {
185                 events = new Event[1];
186                 events[0] = new Event(E_HEARTBEAT);
187             }
188
189             // ASSERT: one or more events available
190

191             // Send events to client using adapter
192
// debug("received event count=" + events.length);
193
for (int i = 0; i < events.length; i++) {
194                 // Check for abort event
195
if (events[i].getEventType().equals(E_ABORT)) {
196                     warn("Aborting Subscriber");
197                     bailout();
198                 }
199
200                 // Push next Event to client
201
try {
202                     // Set sequence number
203
events[i].setField(P_SEQ, eventSeqNr++);
204
205                     // Push to client through client adapter
206
clientAdapter.push(events[i]);
207                 } catch (Throwable JavaDoc t) {
208                     bailout();
209                     return;
210                 }
211             }
212
213             // Force client refresh request in pull or poll modes
214
if (mode.equals(MODE_PULL) || mode.equals(MODE_POLL)) {
215                 Event refreshEvent = new Event(E_REFRESH);
216
217                 // Set wait time and url for refresh
218
refreshEvent.setField(P_WAIT, "" + getRefreshTimeMillis());
219                 refreshEvent.setField(P_URL, refreshURL);
220
221                 try {
222                     // Push to client through client adapter
223
clientAdapter.push(refreshEvent);
224
225                     // Stop this round until refresh event
226
clientAdapter.stop();
227                 } catch (Throwable JavaDoc t) {
228                     // Leave on any exception
229
bailout();
230                 } finally {
231                     // Always leave loop in pull/poll mode
232
break;
233                 }
234             }
235         }
236     }
237
238     /** Determine if we should receive event. */
239     public Subscription match(Event event) {
240         Subscription[] subscriptions = getSubscriptions();
241         for (int i = 0; i < subscriptions.length; i++) {
242             if (subscriptions[i].match(event)) {
243                 return subscriptions[i];
244             }
245         }
246         return null;
247     }
248
249     /** Event from Dispatcher: enqueue it. */
250     public void onEvent(Event theEvent) {
251         if (!isActive()) {
252             return;
253         }
254
255         // p("send: queue event: "+theEvent.getSubject());
256

257         // Check if we had any active continuation for at
258
// least 'timeOut' millisecs. If the client has left this
259
// instance there would be no way of knowing otherwise.
260
long now = Sys.now();
261         if (now - lastAlive > refreshTimeoutMillis) {
262             warn("not alive for at least: " + refreshTimeoutMillis + "ms, leaving...");
263             bailout();
264             return;
265         }
266
267         // Put event in queue; leave if queue full
268
try {
269             if (!eventQueue.enQueue(theEvent, queueWriteTimeoutMillis)) {
270                 warn("queue full, bailing out...");
271                 bailout();
272             }
273
274             // ASSERTION : Event in queue.
275
// see fetchEvents() where Events are dequeued and pushed to the client.
276
} catch (InterruptedException JavaDoc ie) {
277             bailout();
278         }
279
280     }
281
282     /** Info. */
283     protected void info(String JavaDoc s) {
284         session.info("[Subscriber] " + s);
285     }
286
287     /** Exceptional print util. */
288     protected void warn(String JavaDoc s) {
289         session.warn("[Subscriber] " + s);
290     }
291
292     /** Exceptional print util. */
293     protected void debug(String JavaDoc s) {
294         session.debug("[Subscriber] " + s);
295     }
296
297
298     public String JavaDoc toString() {
299         return session.toString();
300     }
301 }
302
303 /*
304  * $Log: Subscriber.java,v $
305  * Revision 1.21 2005/02/28 12:45:59 justb
306  * introduced Command class
307  *
308  * Revision 1.20 2005/02/21 16:59:09 justb
309  * SessionManager and session lease introduced
310  *
311  * Revision 1.19 2005/02/21 12:32:28 justb
312  * fixed publish event in Controller
313  *
314  * Revision 1.18 2005/02/21 11:50:46 justb
315  * ohase1 of refactoring Subscriber into Session/Controller/Subscriber
316  *
317  * Revision 1.17 2005/02/20 13:05:32 justb
318  * removed the Postlet (integrated in Pushlet protocol)
319  *
320  * Revision 1.16 2005/02/18 12:36:47 justb
321  * changes for renaming and configurability
322  *
323  * Revision 1.15 2005/02/18 10:07:23 justb
324  * many renamings of classes (make names compact)
325  *
326  * Revision 1.14 2005/02/18 09:54:15 justb
327  * refactor: rename Publisher Dispatcher and single Subscriber class
328  *
329  * Revision 1.13 2005/02/16 14:39:34 justb
330  * fixed leave handling and added "poll" mode
331  *
332  * Revision 1.12 2005/01/24 13:42:00 justb
333  * new protocol changes (p_listen)
334  *
335  * Revision 1.11 2005/01/13 14:47:15 justb
336  * control evt: send response on same (control) connection
337  *
338  * Revision 1.10 2004/10/24 20:50:35 justb
339  * refine subscription with label and sending sid and label on events
340  *
341  * Revision 1.9 2004/10/24 12:58:18 justb
342  * revised client and test classes for new protocol
343  *
344  * Revision 1.8 2004/09/26 21:39:43 justb
345  * allow multiple subscriptions and out-of-band requests
346  *
347  * Revision 1.7 2004/09/20 22:01:38 justb
348  * more changes for new protocol
349  *
350  * Revision 1.6 2004/09/03 22:35:37 justb
351  * Almost complete rewrite, just checking in now
352  *
353  * Revision 1.5 2004/08/13 23:36:05 justb
354  * rewrite of Pullet into Pushlet "pull" mode
355  *
356  * Revision 1.4 2004/03/10 14:01:55 justb
357  * formatting and *Subscriber refactoring
358  *
359  * Revision 1.3 2003/08/15 08:37:40 justb
360  * fix/add Copyright+LGPL file headers and footers
361  *
362  * Revision 1.2 2003/05/18 16:15:08 justb
363  * support for XML encoded Events
364  *
365  * Revision 1.1.1.1 2002/09/24 21:02:32 justb
366  * import to sourceforge
367  *
368  * Revision 1.1.1.1 2002/09/20 22:48:18 justb
369  * import to SF
370  *
371  * Revision 1.1.1.1 2002/09/20 14:19:04 justb
372  * first import into SF
373  *
374  * Revision 1.3 2002/04/15 20:42:41 just
375  * reformatting and renaming GuardedQueue to EventQueue
376  *
377  * Revision 1.2 2000/08/21 20:48:29 just
378  * added CVS log and id tags plus copyrights
379  *
380  *
381  */

382
Popular Tags