KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > client > proc > ClientProc


1 package com.ubermq.jms.client.proc;
2
3 import EDU.oswego.cs.dl.util.concurrent.*;
4 import com.ubermq.jms.client.*;
5 import com.ubermq.jms.client.impl.*;
6 import com.ubermq.jms.common.datagram.*;
7 import com.ubermq.jms.common.routing.*;
8 import com.ubermq.jms.common.routing.impl.*;
9 import com.ubermq.kernel.*;
10 import com.ubermq.kernel.overflow.*;
11 import java.io.*;
12 import java.util.*;
13
14 /**
15  * An implementation of the client-side processor that manages
16  * subscriptions, overflow conditions, and sends control datagrams
17  * when appropriate.
18  */

19 public class ClientProc
20     implements com.ubermq.jms.client.IClientProcessor
21 {
22     private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(ClientProc.class);
23     
24     /**
25      * Our routers, one for subscriptions, one for ack's.
26      */

27     private final IConfigurableRouter subRouter = new Router(),
28         ackRouter = new Router();
29
30     /**
31      * The object that is notified when a control ack comes in.
32      */

33     private final Object JavaDoc controlAckNotifier;
34     
35     /**
36      * Whether a control ACK succeeded.
37      */

38     private SynchronizedBoolean controlAckSuccess;
39     
40     /**
41      * Whether a control ACK was received, either positive or negative.
42      */

43     private SynchronizedBoolean controlAckWasNotified;
44
45     /**
46      * Keep a log of control datagrams that we sent in case
47      * we reconnect and need to resume our state.
48      */

49     private final List replay;
50
51     /**
52      * The connection we are managing.
53      */

54     private IConnectionInfo managedConn;
55
56     /**
57      * The overflow handler to be used when sending
58      * control datagrams.
59      */

60     private final IOverflowHandler controlHandler;
61
62     /**
63      * The factory for creating control datagrams.
64      */

65     private final IControlDatagramFactory controlFactory;
66
67     /**
68      * Subclasses can change this value to indicate whether the
69      * main logic should send control datagrams.
70      */

71     protected boolean fSendControlDgrams = true;
72
73     private static final boolean SHOULD_WAIT_FOR_ACK =
74         Boolean.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_SHOULD_WAIT_FOR_ACK, "true")).booleanValue();
75     private static final int RPC_TIMEOUT = 10000; // 10seconds for an RPC ack to come back.
76

77     /**
78      * Creates a client processor with the given control datagram factory
79      * to be used to create control datagrams when appropriate.
80      * @param controlFactory a control datagram factory
81      */

82     public ClientProc(IControlDatagramFactory controlFactory)
83     {
84         this.controlFactory = controlFactory;
85         this.controlHandler = new ExponentialBackoff(
86             Long.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_INITIAL_TIMEOUT, "50")).longValue(),
87             Integer.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_BACKOFF_MULTIPLIER, "2")).intValue(),
88             Long.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_MAXIMUM_TIMEOUT, "5000")).longValue(),
89             false);
90         this.replay = new LinkedList();
91
92         this.controlAckNotifier = new Object JavaDoc();
93         this.controlAckSuccess = new SynchronizedBoolean(false);
94         this.controlAckWasNotified = new SynchronizedBoolean(false);
95     }
96
97     public void accept(IConnectionInfo conn)
98     {
99         this.managedConn = conn;
100     }
101
102     public void remove(IConnectionInfo conn)
103     {
104         this.managedConn = null;
105
106         // not valid here.
107
}
108
109     /**
110      * The connection has been reconnected. This indicates that the client
111      * processor should restore any server-side state, as necessary.
112      */

113     public void reconnected()
114     {
115         // replay any state that needs to be recreated on the server-side.
116
Iterator iter = replay.iterator();
117         while (iter.hasNext())
118         {
119             IControlDatagram cd = (IControlDatagram)iter.next();
120             controlSequence(cd, controlHandler, false);
121         }
122     }
123
124     public void process(IConnectionInfo conn, IDatagram d)
125     {
126         try {
127             // now we'll process it. we only care about
128
// ACK datagrams (for publishers) and
129
// MSG datagrams (for subscribers)
130
if (d instanceof IAckDatagram) {
131                 ack((IAckDatagram)d);
132             }
133             if (d instanceof IControlDatagram) {
134                 ctl((IControlDatagram)d);
135             }
136             if (d instanceof IMessageDatagram) {
137                 msg((IMessageDatagram)d);
138             }
139         } catch(Exception JavaDoc x) {
140             log.error("", x);
141         }
142     }
143
144     private void ack(IAckDatagram ad)
145     {
146         // if we don't have a message ID, use the special
147
// notifier
148
if (ad.getAckMessageId() == null)
149         {
150             controlAckSuccess.set(!ad.isNegativeAck());
151             controlAckWasNotified.set(true);
152             synchronized(controlAckNotifier) {
153                 controlAckNotifier.notifyAll();
154             }
155         }
156         else
157         {
158             MessageIdSourceSpec ss = new MessageIdSourceSpec(ad.getAckMessageId());
159             synchronized(ackRouter)
160             {
161                 Iterator acks = ackRouter.getRoutes(ss).iterator();
162                 if (!acks.hasNext()) return;
163
164                 EndpointDestNode edn = ((EndpointDestNode)acks.next());
165                 edn.getEndpoint().deliver(ad);
166
167                 // remove the ack request.
168
ackRouter.remove(ss, edn);
169             }
170         }
171     }
172
173     private void msg(IMessageDatagram md)
174     {
175         for(Iterator acks = subRouter.getRoutes(new StaticSourceDescriptor(md.getTopicName())).iterator();acks.hasNext();)
176         {
177             EndpointDestNode edn = ((EndpointDestNode)acks.next());
178             edn.getEndpoint().deliver(md);
179         }
180     }
181
182     private void ctl(IControlDatagram cd)
183     {
184         // does not make sense for client-side connections
185
// to be receiving control datagrams in this implementation.
186
}
187
188     public void registerSubscription(String JavaDoc spec,
189                                      String JavaDoc selector,
190                                      IDatagramEndpoint e)
191     {
192         TopicSourceSpec ss = new TopicSourceSpec(spec);
193
194         // check if there is an endpoint already registered here.
195
boolean alreadyRegistered = internalRegister(ss, e);
196
197         // tell the server that I care about this
198
// subscription, if needed
199
if (!alreadyRegistered) {
200             controlSequence(controlFactory.subscribe(spec, selector), controlHandler);
201         }
202     }
203
204     private boolean internalRegister(TopicSourceSpec ss,
205                                      IDatagramEndpoint e)
206     {
207         // register with our internal router
208
RouteDestNode rdn = new EndpointDestNode(e);
209         subRouter.addKnownNode(rdn);
210         subRouter.addRoute(ss, rdn);
211         return false;
212     }
213
214     public void registerDurableSubscription(String JavaDoc spec,
215                                             String JavaDoc name,
216                                             String JavaDoc selector,
217                                             IDatagramEndpoint e)
218     {
219         TopicSourceSpec ss = new TopicSourceSpec(spec);
220
221         // check if there is an endpoint already registered here.
222
boolean alreadyRegistered = internalRegister(ss, e);
223
224         // subscribe durably
225
// and recover messages from my absence.
226
if (!alreadyRegistered) {
227             controlSequence(controlFactory.durableSubscribe(name, spec, selector), controlHandler);
228             controlSequence(controlFactory.durableRecover(name), controlHandler);
229         }
230
231     }
232
233
234     public void unregisterSubscription(String JavaDoc spec,
235                                        IDatagramEndpoint e)
236     {
237         TopicSourceSpec ss = new TopicSourceSpec(spec);
238         RouteDestNode rdn = new EndpointDestNode(e);
239         subRouter.remove(ss, rdn);
240
241         // tell server that i no longer care if there are no more in the
242
// internal router.
243
if (!(subRouter.getRoutes(ss).size() > 0))
244             controlSequence(controlFactory.unsubscribe(spec), controlHandler);
245     }
246
247     public void unregisterDurableSubscription(String JavaDoc name)
248     {
249         controlSequence(controlFactory.durableUnsubscribe(name), controlHandler);
250     }
251
252     public void durableGoingAway(String JavaDoc name)
253     {
254         controlSequence(controlFactory.durableGoingAway(name), controlHandler);
255     }
256
257     /**
258      * Begins receiving messages from a queue, with the given selector,
259      * to the specified endpoint.
260      */

261     public void startQueue(String JavaDoc queue, String JavaDoc selector, IDatagramEndpoint e)
262     {
263         TopicSourceSpec ss = new TopicSourceSpec(new LocalQueue(queue).getInternalTopicName());
264
265         // check if there is an endpoint already registered here.
266
boolean alreadyRegistered = internalRegister(ss, e);
267         if (!alreadyRegistered)
268             controlSequence(controlFactory.queueStart(queue, selector), controlHandler);
269     }
270
271     /**
272      * Stops a queue from sending messages to the endpoint.
273      */

274     public void stopQueue(String JavaDoc queue, IDatagramEndpoint e)
275     {
276         SourceSpec ss = new TopicSourceSpec(new LocalQueue(queue).getInternalTopicName());
277
278         subRouter.remove(ss, new EndpointDestNode(e));
279         controlSequence(controlFactory.queueStop(queue), controlHandler);
280     }
281
282     public boolean shouldWaitForAck()
283     {
284         return SHOULD_WAIT_FOR_ACK;
285     }
286
287     public void registerNeedAck(MessageId msgId,
288                                 IDatagramEndpoint e)
289     {
290         RouteDestNode rdn = new EndpointDestNode(e);
291
292         synchronized(ackRouter)
293         {
294             ackRouter.addKnownNode(rdn);
295             ackRouter.addRoute(new MessageIdSourceSpec(msgId), rdn);
296         }
297     }
298
299     public boolean controlSequence(IControlDatagram d,
300                                    IOverflowHandler h)
301     {
302         return controlSequence(d, h, true);
303     }
304
305     private synchronized boolean controlSequence(IControlDatagram d,
306                                                  IOverflowHandler h,
307                                                  boolean saveInReplayLog)
308     {
309         // if we don't send these or we have no connection, bail out
310
if (!fSendControlDgrams ||
311             managedConn == null)
312             return true;
313
314         // save the datagram state in our replay history
315
if (saveInReplayLog)
316             replay.add(d);
317
318         return outputAndWait(d, h);
319     }
320
321     /**
322      * Outputs a datagram and waits for an anonymous acknowledgement
323      * from the remote side.
324      *
325      * @param d an IDatagram
326      * @param h an IOverflowHandler
327      *
328      * @return a boolean indicating if the acknowledgement was
329      * positive or negative. If no acknowledgement is received in the
330      * timeout period, an Exception is thrown.
331      * @throws IllegalStateException the datagram was not acknowledged
332      * in the timeout period, indicating a problem in the infrastructure.
333      */

334     private synchronized boolean outputAndWait(IDatagram d,
335                                                IOverflowHandler h)
336         throws IllegalStateException JavaDoc
337     {
338         log.debug("Outputting RPC datagram " + d + " on conn " + this.managedConn);
339         
340         // reset ack success state
341
controlAckSuccess.set(false);
342         controlAckWasNotified.set(false);
343
344         // output the data gram.
345
synchronized(controlAckNotifier) {
346             try
347             {
348                 managedConn.output(d, h);
349             }
350             catch (IOException e) {
351                 // if we fail, just return false. don't propagate the
352
// exception, even though we could contractually.
353
remove(managedConn);
354                 return false;
355             }
356
357             // wait for reply...
358
try
359             {
360                 log.debug("Waiting for RPC reply on conn " + this.managedConn);
361                 controlAckNotifier.wait(RPC_TIMEOUT);
362                 if (!controlAckWasNotified.get())
363                 {
364                     RuntimeException JavaDoc x = new IllegalStateException JavaDoc("Datagram was not acknowledged in the timeout period.");
365                     log.debug("RPC reply timed out", x);
366                     throw x;
367                 }
368             }
369             catch (InterruptedException JavaDoc e) {}
370         }
371
372         return controlAckSuccess.get();
373     }
374     
375     public String JavaDoc toString()
376     {
377         return "ClientProc for " + this.managedConn.toString();
378     }
379
380     private final static class TopicSourceSpec
381         extends com.ubermq.jms.common.routing.impl.RegexpSourceSpec
382         implements SourceDescriptor
383     {
384         private String JavaDoc spec;
385         
386         public TopicSourceSpec(String JavaDoc spec)
387         {
388             super(com.ubermq.jms.common.routing.impl.RegexpHelper.xlat(spec),
389                   spec);
390             this.spec = spec;
391         }
392         
393         public String JavaDoc getMatchValue()
394         {
395             return spec;
396         }
397     }
398
399     private final static class MessageIdSourceSpec
400         implements SourceSpec, SourceDescriptor
401     {
402         MessageId msgId;
403
404         public MessageIdSourceSpec(MessageId msgId) {this.msgId = msgId;}
405         public boolean matches(SourceDescriptor ss)
406         {
407             try {
408                 return (msgId.equals(((MessageIdSourceSpec)ss).msgId));
409             } catch(ClassCastException JavaDoc x) {return false;}
410         }
411         public boolean isMoreSpecificThan(SourceSpec ss) {return false;}
412         public String JavaDoc getDisplayName() {return msgId.toString();}
413         public String JavaDoc getMatchValue()
414         {
415             return msgId.toString();
416         }
417         public boolean isIdempotentForEqualDescriptors()
418         {
419             return true;
420         }
421
422         public boolean equals(Object JavaDoc obj)
423         {
424             return (((MessageIdSourceSpec)obj).msgId.equals(msgId));
425         }
426         public int hashCode() {return msgId.hashCode();}
427     }
428
429     private final static class EndpointDestNode
430         implements RouteDestNode
431     {
432         IDatagramEndpoint e;
433
434         public EndpointDestNode(IDatagramEndpoint e) {this.e = e;}
435         public String JavaDoc getDisplayName() {return getNodeName();}
436         public String JavaDoc getNodeName() {return e.toString();}
437         public boolean equals(Object JavaDoc o) {
438             try {
439                 return (e == ((EndpointDestNode)o).e);
440             } catch(ClassCastException JavaDoc cce ) {return false;}
441         }
442         public int compareTo(Object JavaDoc o) {
443             try {
444                 boolean gr = (e.hashCode() > ((EndpointDestNode)o).e.hashCode());
445                 boolean eq = equals(o);
446                 return (eq) ? 0 : (gr ? 1 : -1);
447             } catch(ClassCastException JavaDoc cce ) {return 1;}
448         }
449         public int hashCode() {return e.hashCode();}
450         public IDatagramEndpoint getEndpoint() {return e;}
451     }
452 }
453
Popular Tags