KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > server > journal > DurableSubscriptionProxy


1 package com.ubermq.jms.server.journal;
2
3 import EDU.oswego.cs.dl.util.concurrent.*;
4 import com.ubermq.jms.client.DeliveryMode;
5 import com.ubermq.jms.server.*;
6 import com.ubermq.jms.common.datagram.*;
7 import com.ubermq.jms.server.journal.impl.*;
8 import com.ubermq.jms.common.routing.*;
9 import com.ubermq.jms.common.routing.impl.*;
10 import com.ubermq.kernel.*;
11 import com.ubermq.kernel.overflow.*;
12 import java.io.*;
13
14 /**
15  * This class implements the server-side proxy for a durable subscriber,
16  * which can be connected or disconnected.
17  * <P>
18  * The first thing that we do is write the message to disk in case we fail.
19  * Next, we operate depending on the state of the receiver:
20  * <P>
21  * In the connected state, messages are simply forwarded to
22  * the intended recipient. When the message is acknowledged, we write an
23  * acknowledgement into the rolling log file.
24  * <P>
25  * In the disconnected state, messages are kept on disk w/o acknowledgement.
26  * They just accumulate there.
27  * <P>
28  * When the subscriber reconnects after a long absence, we go through the log
29  * file to find the last acknowledge point. all messages after that are
30  * redelivered to the subscriber recipient via the normal process.
31  */

32 public final class DurableSubscriptionProxy
33     implements RouteDestNode,
34     IDatagramEndpoint,
35     DatagramSink,
36     Externalizable,
37     IMessageProcessor
38 {
39     private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(DurableSubscriptionProxy.class);
40     
41     /**
42      * The output nodes that we are proxy on behalf of. The arbiter allows
43      * us to choose a single recipient among many actively
44      * connected subscribers.
45      */

46     private DurableConnectionArbiter proxyFor;
47
48     private String JavaDoc name, displayName, subscription;
49
50     /**
51      * The factories used for this subscription.
52      */

53     private DatagramFactoryHolder factories;
54
55     /**
56      * A datagram for ping'ing.
57      */

58     private IControlDatagram noopDatagram;
59
60     /**
61      * The journal used to keep track of recorded messages.
62      */

63     private IJournal journal;
64
65     /**
66      * The overflow handler for recovery purposes.
67      */

68     private static final IOverflowHandler recoveryHandler = new ExponentialBackoff();
69
70     /**
71      * Clock used for periodic tasks.
72      */

73     private static final ClockDaemon cd;
74     static
75     {
76         cd = new ClockDaemon();
77         cd.setThreadFactory(new ThreadFactory()
78                             {
79                     public Thread JavaDoc newThread(Runnable JavaDoc p0)
80                     {
81                         return new Thread JavaDoc(p0, "DurableSubscription Periodic Tasks");
82                     }
83                 });
84     }
85
86
87     /**
88      * This datagram flag indicates that the datagram was logged in
89      * disconnected mode.
90      */

91     private static final int DGF_LOGGED_DISCONNECTED = 1 << 8;
92
93     private static final String JavaDoc LOG_FILE_PATH = Configurator.getProperty(ServerConfig.DURABLE_LOG_PATH, ".");
94     private static final String JavaDoc LOG_FILE_PREFIX = Configurator.getProperty(ServerConfig.DURABLE_LOG_FILE_PREFIX, "durable-");
95     private static final String JavaDoc LOG_FILE_EXTENSION = ".log";
96     private static final long LOG_FILE_FIXED_SIZE = Long.valueOf(Configurator.getProperty(ServerConfig.DURABLE_LOG_SIZE, "10485760")).longValue(); // 10 MB.
97
public static final long serialVersionUID = 1L;
98
99     private static final long AUTO_RECOVER_INTERVAL = 10000L;
100
101     /**
102      * Creates a durable subscription proxy, with a connection
103      * already connected.
104      */

105     public DurableSubscriptionProxy(ConnectionDestNode e,
106                                     DatagramFactoryHolder factories,
107                                     DurableConnectionArbiter arbiter,
108                                     String JavaDoc name,
109                                     String JavaDoc displayName,
110                                     String JavaDoc subscription)
111         throws FileNotFoundException, IOException
112     {
113         this.name = name;
114         this.displayName = displayName;
115         this.subscription = subscription;
116         this.factories = factories;
117         this.noopDatagram = factories.controlFactory().noop();
118         this.proxyFor = arbiter;
119         proxyFor.connect(e);
120
121         init();
122     }
123
124     /**
125      * Creates a durable subscription proxy in off-line mode (disconnected).
126      */

127     public DurableSubscriptionProxy(DatagramFactoryHolder factories,
128                                     DurableConnectionArbiter arbiter,
129                                     String JavaDoc name,
130                                     String JavaDoc displayName,
131                                     String JavaDoc subscription)
132         throws FileNotFoundException, IOException
133     {
134         this.name = name;
135         this.displayName = displayName;
136         this.subscription = subscription;
137         this.factories = factories;
138         this.noopDatagram = factories.controlFactory().noop();
139         this.proxyFor = arbiter;
140
141         init();
142     }
143
144     /**
145      * Constructs a proxy for serialization purposes. Not intended
146      * for general use.
147      */

148     public DurableSubscriptionProxy()
149     {
150     }
151
152     public boolean equals(Object JavaDoc o)
153     {
154         if (o instanceof DurableSubscriptionProxy)
155         {
156             return ((DurableSubscriptionProxy)o).getName().equals(getName());
157         }
158         else return false;
159     }
160
161     public int hashCode() {return getName().hashCode();}
162
163     private void init()
164         throws FileNotFoundException, IOException
165     {
166         this.journal = new SimpleJournal(createFile(name), factories.datagramFactory(), LOG_FILE_FIXED_SIZE);
167
168         // begin a periodic durable heartbeat task, to test
169
// that a given subscriber is alive and well.
170
final IOverflowHandler handler = new DropIncoming();
171         cd.executePeriodically(AUTO_RECOVER_INTERVAL,
172                                new Runnable JavaDoc()
173                                {
174                     public void run()
175                     {
176                         try
177                         {
178                             if (proxyFor.isOpen())
179                                 outputToProxy(noopDatagram, handler);
180                         }
181                         catch(Throwable JavaDoc ise)
182                         {
183                             log.error("", ise);
184                         }
185                     }
186                 },
187                                false);
188     }
189
190     /**
191      * The object implements the writeExternal method to save its contents
192      * by calling the methods of DataOutput for its primitive values or
193      * calling the writeObject method of ObjectOutput for objects, strings,
194      * and arrays.
195      *
196      * @serialData UTF-8 subscription name, UTF-8 subscription specifier,
197      * IConnectionInfo connection we are proxy for, factory holder.
198      *
199      * @param out the stream to write the object to
200      * @exception IOException Includes any I/O exceptions that may occur
201      */

202     public void writeExternal(ObjectOutput out) throws IOException
203     {
204         out.writeUTF(name);
205         out.writeUTF(subscription);
206         out.writeObject(proxyFor);
207         out.writeObject(factories);
208
209         // VERSION 2.0
210
out.writeUTF(displayName);
211     }
212
213     /**
214      * The object implements the readExternal method to restore its
215      * contents by calling the methods of DataInput for primitive
216      * types and readObject for objects, strings and arrays. The
217      * readExternal method must read the values in the same sequence
218      * and with the same types as were written by writeExternal.
219      *
220      * @param in the stream to read data from in order to restore the object
221      * @exception IOException if I/O errors occur
222      * @exception ClassNotFoundException If the class for an object being
223      * restored cannot be found.
224      */

225     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc
226     {
227         this.name = in.readUTF();
228         this.subscription = in.readUTF();
229         this.proxyFor = (DurableConnectionArbiter)in.readObject();
230         this.factories = (DatagramFactoryHolder)in.readObject();
231         this.noopDatagram = factories.controlFactory().noop();
232
233         // VERSION 2.0
234
try
235         {
236             this.displayName = in.readUTF();
237         }
238         catch (IOException e)
239         {
240             log.error("", e);
241             this.displayName = name;
242         }
243
244         init();
245     }
246
247     /**
248      * transition to connected mode.
249      */

250     public synchronized void connect(ConnectionDestNode cdn)
251     {
252         this.proxyFor.connect(cdn);
253     }
254
255     public synchronized void disconnect(ConnectionDestNode cdn)
256     {
257         this.proxyFor.disconnect(cdn);
258         recover();
259     }
260
261     public synchronized void close()
262     {
263         proxyFor.disconnectAll();
264         journal.close();
265     }
266
267     /**
268      * Resets existing log files, and deletes them. It is imperative to call
269      * close() before calling this.
270      */

271     public synchronized void unsubscribe()
272     {
273         journal.destroy();
274         createFile(name).delete();
275     }
276
277     /**
278      * called when a new ConnectionInfo may possibly call process()
279      */

280     public void accept(IConnectionInfo conn)
281     {
282     }
283
284     /**
285      * Removes a connection.
286      */

287     public void remove(IConnectionInfo conn)
288     {
289     }
290
291     /**
292      * Outputs a datagram.
293      *
294      * @param d an IDatagram
295      * @param h an IOverflowHandler
296      *
297      * @throws IOException if an IO error occurs.
298      * @throws IllegalStateException if the proxy is disconnected,
299      * and the message delivery mode requires the subscriber
300      * to process the message.
301      */

302     public synchronized void output(IDatagram d, IOverflowHandler h)
303         throws IOException
304     {
305         // output the datagram to the recovery buffer;
306
// if we overflow, compress the buffer and try again.
307

308         // we remember if we are connected by setting flag bits.
309
// if we are not connected, using the GUARNATEED_PROCESSING delivery mode
310
// will cause an IllegalStateException and refuse the message delivery.
311
if (!proxyFor.isOpen())
312         {
313             if (d instanceof IMessageDatagram)
314             {
315                 int deliveryMode = ((Number JavaDoc)((IMessageDatagram)d).getStandardProperty(IMessageDatagram.STDPROP_DELIVERYMODE)).intValue();
316
317                 // if the delivery mode requires
318
// guaranteed processing, we bail out if the proxy is
319
// not open.
320
if (deliveryMode == DeliveryMode.GUARANTEED_PROCESSING)
321                 {
322                     throw new IllegalStateException JavaDoc("Guaranteed processing required, but the subscriber is disconnected.");
323                 }
324             }
325
326             // remember that we were disconnected.
327
d.setDatagramFlagBits(DGF_LOGGED_DISCONNECTED);
328         }
329         journal.output(d, h);
330
331         // output to the proxy if we succeed and we
332
// are connected
333
outputToProxy(d, h);
334     }
335
336     /**
337      * Delivers a datagram to this object.
338      */

339     public void deliver(IDatagram d)
340     {
341         if (d instanceof IAckDatagram)
342             ack(((IAckDatagram)d).getAckMessageId());
343     }
344
345     private void ack(MessageId id)
346     {
347         log.debug("ack " + id );
348         journal.ack(id);
349     }
350
351     /**
352      * Processes messages from the recovery log. Our special file connection
353      * specifies the durable proxy as the DatagramProc for the connection.
354      */

355     public void process(IConnectionInfo conn, IDatagram read)
356     {
357         log.debug("recovering " + read);
358
359         if (read instanceof IMessageDatagram)
360         {
361             IMessageDatagram md = (IMessageDatagram)read;
362
363             // the disconnected flag is set whenever we explicitly do not
364
// attempt delivery to the socket connection. if it's not set,
365
// we should set the redelivery flag because we don't know
366
// whether the client has seen the message before. JP 8/24/02
367
if ((md.getDatagramFlags() & DGF_LOGGED_DISCONNECTED) == 0)
368             {
369                 md.setStandardProperty(IMessageDatagram.STDPROP_REDELIVERY,
370                                        Boolean.TRUE);
371             }
372         }
373
374         outputToProxy(read, recoveryHandler);
375     }
376
377     private void outputToProxy(IDatagram d, IOverflowHandler h)
378     {
379         if (proxyFor != null &&
380             proxyFor.isOpen())
381         {
382             try
383             {
384                 proxyFor.output(d, h);
385             }
386             catch(IOException ise)
387             {
388                 // if the output to the proxy failed,
389
// we check the state of the proxy,
390
// recover unack'd messages, and retry
391
// this operation again.
392
if (proxyFor.isOpen())
393                 {
394                     recover();
395                     outputToProxy(d, h);
396                 }
397                 else
398                 {
399                     // no more proxies left.
400
// go to disconnected mode.
401
;
402                 }
403             }
404         }
405         else
406         {
407             log.debug("journaled message in disconnected mode");
408         }
409     }
410
411     /**
412      * Returns the arbiter that this durable connection is
413      * proxy for.
414      *
415      * @return a DurableConnectionArbiter
416      */

417     public DurableConnectionArbiter getProxyFor()
418     {
419         return proxyFor;
420     }
421
422     public String JavaDoc getSubscription()
423     {
424         return subscription;
425     }
426
427     public String JavaDoc getName()
428     {
429         return name;
430     }
431
432     public String JavaDoc getDisplayName()
433     {
434         return displayName;
435     }
436
437     public int compareTo(Object JavaDoc o)
438     {
439         return getNodeName().compareTo(((RouteDestNode)o).getNodeName());
440     }
441
442     public String JavaDoc getNodeName()
443     {
444         return "$D$" + name;
445     }
446
447     public String JavaDoc toString()
448     {
449         return getDisplayName();
450     }
451
452     public boolean isOpen()
453     {
454         return true;
455     }
456
457     /**
458      * This should be called before anything else is called, and certainly
459      * not while deliver() is being called from externally.
460      */

461     public synchronized void recover()
462     {
463         if (proxyFor.isOpen())
464             journal.recover(this);
465     }
466
467     private static File createFile(String JavaDoc name)
468     {
469         return new File(LOG_FILE_PATH, LOG_FILE_PREFIX + name + LOG_FILE_EXTENSION);
470     }
471
472 }
473
Popular Tags