KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > server > proc > DatagramProc


1 package com.ubermq.jms.server.proc;
2
3 import java.io.*;
4 import java.lang.reflect.*;
5 import java.util.*;
6
7 import com.ubermq.jms.client.impl.*;
8 import com.ubermq.jms.common.datagram.*;
9 import com.ubermq.jms.common.datagram.control.*;
10 import com.ubermq.jms.common.overflow.*;
11 import com.ubermq.jms.common.routing.*;
12 import com.ubermq.jms.common.routing.impl.*;
13 import com.ubermq.jms.server.*;
14 import com.ubermq.jms.server.admin.*;
15 import com.ubermq.jms.server.journal.*;
16 import com.ubermq.jms.server.journal.impl.*;
17 import com.ubermq.kernel.*;
18
19 /**
20  * The UberMQ server implementation of a MessageProcessor. This is core of a
21  * messaging server, that accepts incoming datagrams, examines them, and
22  * routes them to zero or more destinations.
23  */

24 public final class DatagramProc
25     implements IMessageProcessor, MessageServerAdmin, IRouterStatistics
26 {
27     private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(DatagramProc.class);
28     
29     // the standard overflow handler for outgoing messages
30
private static final long INITIAL_TIMEOUT = Long.valueOf(Configurator.getProperty(ServerConfig.DGP_INITIAL_TIMEOUT, "50")).longValue();
31     private static final long MAXIMUM_TIMEOUT = Long.valueOf(Configurator.getProperty(ServerConfig.DGP_MAXIMUM_TIMEOUT, "5000")).longValue();
32     private static final int TIMEOUT_FACTOR = Integer.valueOf(Configurator.getProperty(ServerConfig.DGP_BACKOFF_MULTIPLIER, "2")).intValue();
33
34     // whether the server should send ACK datagrams.
35
private static final boolean shouldSendAck = Boolean.valueOf(Configurator.getProperty(ServerConfig.DGP_SHOULD_SEND_ACKS, "true")).booleanValue();
36
37     // or, a user-defined overflow handler available on classpath.
38
private static final String JavaDoc OVERFLOW_HANDLER_CLASS = Configurator.getProperty(ServerConfig.DGP_OVERFLOW_HANDLER, "");
39     private static final String JavaDoc OVERFLOW_HANDLER_INIT = Configurator.getProperty(ServerConfig.DGP_OVERFLOW_HANDLER_INIT, "");
40
41     // the prefix for queue subscription names
42
private static final String JavaDoc QUEUE_NAME_PREFIX = "$queue.";
43
44     // member variables.
45
private ISettingsRepository journal;
46     private DatagramFactoryHolder factories;
47     private Map durableSubscribers;
48
49     // routers. the normal router contains DatagramSink objects.
50
// the ack router only routes locally and requires IDatagramEndpoint objects.
51
private IConfigurableRouter router, ackRouter;
52     private IRouterStatistics routerStats;
53
54     // statistical variables
55
private transient volatile int nMessagesIn, nMessagesOut, nMessagesDropped;
56     private transient volatile int accumLatency;
57     private transient long startTime;
58
59     // my overflow handler
60
private IOverflowHandler overflow;
61
62     /**
63      * Creates a datagram processor with a journal to restore state, and
64      * a datagram factory used to interpret the incoming message stream
65      * and produce well-formed output. This constructor uses the default
66      * overflow handler.
67      *
68      * @param journal the journal used to store state across invocations of the
69      * processor.
70      * @param factories the datagram factory holder for creating datagrams
71      */

72     public DatagramProc(ISettingsRepository journal,
73                         DatagramFactoryHolder factories)
74         throws java.io.IOException JavaDoc
75     {
76         this(journal, factories, null);
77     }
78
79     /**
80      * Creates a datagram processor with a journal to restore state, and
81      * a datagram factory used to interpret the incoming message stream
82      * and produce well-formed output.
83      *
84      * @param journal the journal used to store state across invocations of the
85      * processor.
86      * @param factories the datagram factory holder for creating datagrams
87      * @param handler the overflow handler to use when overflow conditions
88      * occur. If this is null, the configurator is consulted. If the configurator
89      * does not specify a handler, the default is used.
90      */

91     public DatagramProc(ISettingsRepository journal,
92                         DatagramFactoryHolder factories,
93                         IOverflowHandler handler)
94         throws java.io.IOException JavaDoc
95     {
96         this.router = new Router();
97         this.ackRouter = new Router();
98         this.routerStats = this;
99         this.journal = journal;
100         this.factories = factories;
101         this.durableSubscribers = new HashMap();
102         restoreDurableSubscribers();
103
104         // we're up and running
105
startTime = System.currentTimeMillis();
106
107         // set up an overflow handler
108
try
109         {
110             if (OVERFLOW_HANDLER_CLASS.length() > 0)
111             {
112                 Class JavaDoc overflowClass = Class.forName(OVERFLOW_HANDLER_CLASS);
113                 Constructor c = overflowClass.getConstructor(new Class JavaDoc[] {String JavaDoc.class});
114                 overflow = (IOverflowHandler)c.newInstance(new Object JavaDoc[] {OVERFLOW_HANDLER_INIT});
115             }
116             else
117             {
118                 throw new InstantiationException JavaDoc();
119             }
120         }
121         catch (Exception JavaDoc e)
122         {
123             // use mine, a special TTL aware overflow handler.
124

125             // In UMQ 2.3, we now favor receivers in most cases and
126
// we kill rogue senders that may be over-publishing.
127
// it falls on the sender to regulate itself.
128
//
129
// The Old behavior can be resurrected by specifying the KillReceiver
130
// overflow handler in the OVERFLOW_HANDLER_CLASS runtime parameter.
131
overflow = new TTLOverflowHandler(INITIAL_TIMEOUT,
132                                               TIMEOUT_FACTOR,
133                                               MAXIMUM_TIMEOUT,
134                                               true);
135         }
136     }
137
138     /**
139      * Sets the server name.
140      * @param name a name for this server.
141      */

142     public void setServerName(String JavaDoc name)
143     {
144         router.setNodeLabel(name);
145     }
146
147     public void accept(IConnectionInfo ci)
148     {
149         router.addKnownNode(new ConnectionDestNode(ci));
150     }
151
152     public void remove(IConnectionInfo ci)
153     {
154         ConnectionDestNode toRemove = new ConnectionDestNode(ci);
155
156         router.removeRoutesTo(toRemove);
157         router.removeKnownNode(toRemove);
158     }
159
160     public void process(IConnectionInfo conn, IDatagram d)
161     {
162         // process the incoming datagram according to our
163
// RULES.
164
rules(conn, d);
165     }
166
167     private void rules(IConnectionInfo conn, IDatagram d)
168     {
169         // now we'll process it. we only care about
170
// ACK datagrams (for publishers) and
171
// MSG datagrams (for subscribers)
172
if (d instanceof IAckDatagram)
173         {
174             ack(conn, (IAckDatagram)d);
175         }
176         if (d instanceof IControlDatagram)
177         {
178             control(conn, (IControlDatagram)d);
179         }
180         if (d instanceof IMessageDatagram)
181         {
182             msg(conn, (IMessageDatagram)d);
183         }
184     }
185
186     /**
187      * Processes acknowledgement messages. ACKs are never
188      * relayed outside the process; they can only be handled by
189      * local datagram endpoints.
190      */

191     private void ack(IConnectionInfo conn, IAckDatagram ad)
192     {
193         log.debug("ack! " + ad.toString());
194
195         // route this using the ACK router.
196
Iterator iter = ackRouter.getRoutes(new StaticSourceDescriptor(conn.getId())).iterator();
197         while(iter.hasNext())
198         {
199             IDatagramEndpoint e = (IDatagramEndpoint)iter.next();
200             e.deliver(ad);
201         }
202     }
203
204     private void control(IConnectionInfo conn, IControlDatagram cd)
205     {
206         log.debug("control! " + cd.toString());
207
208         boolean ack = false;
209         try
210         {
211             // all Control datagrams contain a sub command. figure out what
212
// type of command it is, and take action.
213
ICommandSubGram command = cd.getCommandSubGram();
214
215             if (command instanceof IStartDatagram)
216             {
217                 router.addKnownNode(new ConnectionDestNode(conn));
218             }
219             else if (command instanceof IStopDatagram)
220             {
221                 router.removeKnownNode(new ConnectionDestNode(conn));
222             }
223             else if (command instanceof ISubscribeDatagram)
224             {
225                 ISubscribeDatagram sd = ((ISubscribeDatagram)command);
226
227                 String JavaDoc selector = sd.getSelector();
228
229                 try
230                 {
231                     // create the source spec.
232
SourceSpec ss = new RegexpSourceSpec(RegexpHelper.xlat(sd.getTopicSpecification()));
233                     
234                     // if we are a selected topic, we add a special qualifier
235
// so that we don't send unnecessary messages.
236
if (selector != null && selector.length() > 0)
237                         ss = new SelectorSourceSpec(ss, new SimpleSelector(selector));
238
239                     // add it.
240
router.addRoute(ss,
241                                     new ConnectionDestNode(conn));
242                 }
243                 catch (javax.jms.InvalidSelectorException JavaDoc e)
244                 {
245                     // ignore this request
246
// the client should check the selector and instantiate
247
// on its side as well.
248
assert false : "Client allowed an invalid selector";
249                 }
250             }
251             else if (command instanceof IUnsubscribeDatagram)
252             {
253                 IUnsubscribeDatagram sd = ((IUnsubscribeDatagram)command);
254
255                 router.remove(new RegexpSourceSpec(RegexpHelper.xlat(sd.getTopicSpecification())),
256                               new ConnectionDestNode(conn));
257             }
258             else if (command instanceof IQueueStartDatagram)
259             {
260                 IQueueStartDatagram iqs = ((IQueueStartDatagram)command);
261                 setupDurableProxy(conn,
262                                   new RoundRobinArbiter(),
263                                   QUEUE_NAME_PREFIX + iqs.getQueueName(),
264                                   iqs.getQueueName(),
265                                   new LocalQueue(iqs.getQueueName()).getInternalTopicName());
266             }
267             else if (command instanceof IQueueStopDatagram)
268             {
269                 IQueueStopDatagram iqs = ((IQueueStopDatagram)command);
270
271                 DurableSubscriptionProxy proxy = getDurableSubscriber(QUEUE_NAME_PREFIX + iqs.getQueueName());
272                 if (proxy != null)
273                 {
274                     // remove the proxy from the ack table.
275
unregisterDurableSubscriber(proxy);
276
277                     // go to disconnected mode on this connection.
278
proxy.disconnect(new ConnectionDestNode(conn));
279                 }
280             }
281             else if (command instanceof IQueueDeleteDatagram)
282             {
283                 unsubscribeDurableSubscriber(QUEUE_NAME_PREFIX + ((IQueueDeleteDatagram)command).getQueueName());
284             }
285             else if (command instanceof IDurableSubscribeDatagram)
286             {
287                 IDurableSubscribeDatagram sd = ((IDurableSubscribeDatagram)command);
288
289                 setupDurableProxy(conn,
290                                   new FailoverArbiter(),
291                                   sd.getSubscriptionName(),
292                                   sd.getSubscriptionName(),
293                                   sd.getTopicSpecification());
294             }
295             else if (command instanceof IDurableUnsubscribeDatagram)
296             {
297                 IDurableUnsubscribeDatagram sd = ((IDurableUnsubscribeDatagram)command);
298
299                 unsubscribeDurableSubscriber(sd.getSubscriptionName());
300             }
301             else if (command instanceof IDurableRecoverDatagram)
302             {
303                 IDurableRecoverDatagram sd = ((IDurableRecoverDatagram)command);
304
305                 DurableSubscriptionProxy proxy = getDurableSubscriber(sd.getSubscriptionName());
306                 if (proxy != null)
307                 {
308                     proxy.recover();
309                 }
310             }
311             else if (command instanceof IDurableGoingAwayDatagram)
312             {
313                 IDurableGoingAwayDatagram sd = ((IDurableGoingAwayDatagram)command);
314
315                 DurableSubscriptionProxy proxy = getDurableSubscriber(sd.getSubscriptionName());
316                 if (proxy != null)
317                 {
318                     // remove the proxy from the ack table.
319
unregisterDurableSubscriber(proxy);
320
321                     // go to disconnected mode on this connection.
322
proxy.disconnect(new ConnectionDestNode(conn));
323                 }
324             }
325
326             ack = true;
327         }
328         catch(Exception JavaDoc x)
329         {
330             log.error("Failed sending acknowledgement", x);
331             ack = false;
332         }
333         finally
334         {
335             log.debug("Sending acknowledgement " + ack + " to " + conn + " for RPC");
336             
337             // send the ACK of the command back to the caller
338
// so the caller can proceed
339
sendAcknowledgement(conn, ack);
340         }
341
342         // show the router status
343
if (log.isDebugEnabled())
344             log.debug(router.toString());
345     }
346
347     private void msg(IConnectionInfo conn, IMessageDatagram md)
348     {
349         routerStats.messageIn();
350         long time = System.currentTimeMillis();
351
352         // send the message to those subscribers that
353
// match the message string.
354
Iterator i = router.getRoutes(new MessageDatagramSourceDescriptor(md)).iterator();
355
356         boolean failed = false;
357         if (!i.hasNext()) routerStats.messageDropped();
358         while(i.hasNext())
359         {
360             // send this message.
361
DatagramSink cdn = (DatagramSink)i.next();
362
363             try
364             {
365                 // output it.
366
cdn.output(md, overflow);
367             }
368             catch(IOException x)
369             {
370                 // the datagram sink tells us we failed here.
371
if (cdn instanceof ConnectionDestNode)
372                     remove(((ConnectionDestNode)cdn).getConnection());
373             }
374             catch(Exception JavaDoc other)
375             {
376                 // any other exception, particularly
377
// the IllegalStateException thrown by durable subscribers,
378
// is a failure that deserves a NACK.
379
failed = true;
380             }
381
382             routerStats.messageOut();
383         }
384
385         // accumulated latency
386
accumLatency += System.currentTimeMillis() - time;
387
388         // ack the message back to the sender.
389
// we do this after we output all the datagrams
390
// to their recipients as a form of flow control
391
// so we don't saturate our internal networking buffers.
392
if (shouldSendAck)
393         {
394             try
395             {
396                 IAckDatagram ad = null;
397                 if (!failed)
398                 {
399                     ad = factories.ackFactory().ack(md.getMessageId());
400                 }
401                 else
402                 {
403                     ad = factories.ackFactory().nack(md.getMessageId());
404                 }
405
406                 // output the ACK or NACK, as appropriate.
407
log.debug("sending " + conn + " message ID " + md.getMessageId() + " ack=" + !ad.isNegativeAck());
408                 conn.output(ad, overflow);
409             }
410             catch (IOException e)
411             {
412                 log.debug("could not send message acknowledgement", e);
413                 
414                 // connection failed.
415
remove(conn);
416             }
417         }
418     }
419
420     private void sendAcknowledgement(IConnectionInfo conn, boolean ack)
421     {
422         try
423         {
424             conn.output(ack ? factories.ackFactory().ack() : factories.ackFactory().nack(), overflow);
425         }
426         catch (IOException e)
427         {
428             log.debug("could not send RPC ack due to I/O problem", e);
429             
430             // looks like the caller went down.
431
// blow him away.
432
remove(conn);
433         }
434     }
435
436     ////////////// DURABLE SUBSCRIPTION MANAGEMENT //////////////////////
437

438     private DurableSubscriptionProxy setupDurableProxy(IConnectionInfo conn,
439                                                        DurableConnectionArbiter arbiter,
440                                                        String JavaDoc durableName,
441                                                        String JavaDoc displayName,
442                                                        String JavaDoc subscription)
443     {
444         try
445         {
446             // create a durable proxy with the outgoing connection
447
// and the durable's name (from the control dgram)
448
DurableSubscriptionProxy proxy =
449                 createDurableSubscriber(arbiter,
450                                         durableName,
451                                         displayName,
452                                         subscription);
453
454             // transition the proxy into connected mode.
455
if (conn != null)
456             {
457                 proxy.connect(new ConnectionDestNode(conn));
458
459                 // register the proxy in the subscription table.
460
registerDurableSubscriber(proxy, conn);
461             }
462             return proxy;
463         }
464         catch (java.io.IOException JavaDoc e)
465         {
466             // do something
467
// this is a severe error if we cannot open the durable logfile
468
// we should report it to the user.
469
log.fatal(e.getMessage());
470             return null;
471         }
472     }
473
474     private static final String JavaDoc DURABLE_SUB_JOURNAL_PROP = "DurableSubscribers";
475     private void addDurableSubscriber(DurableSubscriptionProxy p)
476     {
477         // add to memory cache
478
durableSubscribers.put(p.getName(), p);
479
480         // add to persistent cache.
481
List l = (List)journal.get(DURABLE_SUB_JOURNAL_PROP);
482         if (l == null)
483         {
484             l = new ArrayList();
485         }
486
487         l.add(p);
488         journal.put(DURABLE_SUB_JOURNAL_PROP, l);
489     }
490
491     private void removeDurableSubscriber(DurableSubscriptionProxy p)
492     {
493         // remove fm memory cache
494
durableSubscribers.remove(p.getName());
495
496         // and from persistent cache.
497
List l = (List)journal.get(DURABLE_SUB_JOURNAL_PROP);
498         if (l != null)
499         {
500             l.remove(p);
501             journal.put(DURABLE_SUB_JOURNAL_PROP, l);
502         }
503
504         // unsubscribe
505
router.removeRoutesTo(p);
506         router.removeKnownNode(p);
507     }
508
509     /**
510      * load all durables back from the persistent journal
511      */

512     private void restoreDurableSubscribers()
513     {
514         List l = (List)journal.get(DURABLE_SUB_JOURNAL_PROP);
515         if (l != null)
516         {
517             Iterator iter = l.iterator();
518             while (iter.hasNext())
519             {
520                 DurableSubscriptionProxy p = (DurableSubscriptionProxy)iter.next();
521                 durableSubscribers.put(p.getName(), p);
522                 restoreDurableSubscriber(p);
523             }
524         }
525     }
526
527     /**
528      * restore the durable by adding it back to the subscription table.
529      */

530     private void restoreDurableSubscriber(DurableSubscriptionProxy p)
531     {
532         log.info("restoring durable subscriber " + p.getName());
533
534         // subscribe the proxy to the topics.
535
router.addKnownNode(p);
536         router.addRoute(new RegexpSourceSpec(RegexpHelper.xlat(p.getSubscription())),
537                         p);
538     }
539
540     /**
541      * get a durable subscriber, or create if it doesn't exist
542      */

543     private DurableSubscriptionProxy createDurableSubscriber(DurableConnectionArbiter a,
544                                                              String JavaDoc name,
545                                                              String JavaDoc displayName,
546                                                              String JavaDoc subscription)
547         throws FileNotFoundException, IOException
548
549     {
550         // look for a durable subscriber by that name
551
// if it's there, use it.
552
DurableSubscriptionProxy p = getDurableSubscriber(name);
553
554         // if there is not one, create it.
555
if (p == null)
556         {
557             p = new DurableSubscriptionProxy(factories, a, name, displayName, subscription);
558             addDurableSubscriber(p);
559             restoreDurableSubscriber(p);
560         }
561
562         return p;
563     }
564
565     /**
566      * return a durable from the memory cache if it is there.
567      */

568     private DurableSubscriptionProxy getDurableSubscriber(String JavaDoc name)
569     {
570         return (DurableSubscriptionProxy)durableSubscribers.get(name);
571     }
572
573     /**
574      * Unsubscribes an existing durable subscriber. This will
575      * delete the durable subscription and all saved messages.
576      */

577     private void unsubscribeDurableSubscriber(String JavaDoc name)
578     {
579         DurableSubscriptionProxy proxy = getDurableSubscriber(name);
580         if (proxy != null)
581         {
582             // remove the proxy from the ack table and from
583
// the subscription table and from the list
584
// of all durables.
585
unregisterDurableSubscriber(proxy);
586             removeDurableSubscriber(proxy);
587
588             try
589             {
590                 proxy.close();
591             }
592             finally
593             {
594                 proxy.unsubscribe();
595             }
596         }
597
598     }
599
600     /**
601      * this method registers an active connection with an
602      * existing durableSubscriptionProxy.
603      */

604     private void registerDurableSubscriber(DurableSubscriptionProxy proxy,
605                                            IConnectionInfo conn)
606     {
607         // register proxy for acks.
608
ackRouter.addKnownNode(proxy);
609         ackRouter.addRoute(new StaticSourceSpec(conn.getId()),
610                            proxy);
611     }
612
613     private void unregisterDurableSubscriber(DurableSubscriptionProxy proxy)
614     {
615         ackRouter.removeKnownNode(proxy);
616     }
617
618     /////////// ADMINISTRATIVE HOOKS
619

620     public void close()
621     {
622         // noop
623
}
624
625     /**
626      * Returns an object containing a snapshot of router
627      * statistical information. This includes number of routes,
628      * a full subscription table, uptime, and overall throughput
629      * information.
630      */

631     public IRouterStatistics getRouterStatistics()
632     {
633         return this;
634     }
635
636     /**
637      * Returns a collection of all connections to the server.
638      * @return a Collection of ConnectionAdmin objects.
639      * @see com.ubermq.jms.server.admin.ConnectionAdmin
640      */

641     public Collection getConnections()
642     {
643         ArrayList al = new ArrayList();
644
645         Iterator iter = this.router.getKnownNodes().iterator();
646         while (iter.hasNext())
647         {
648             RouteDestNode rdn = (RouteDestNode)iter.next();
649             if (rdn instanceof ConnectionDestNode)
650                 al.add(new MyConnectionAdmin((ConnectionDestNode)rdn));
651         }
652
653         return al;
654     }
655
656     /**
657      * Returns the name of the server.
658      * @return a description of the server
659      */

660     public String JavaDoc getServerName()
661     {
662         return this.router.getNodeLabel();
663     }
664
665     /**
666      * Returns a collection of all active durable subscriptions
667      * registered on the server.<P>
668      *
669      * @return a Collection of DurableSubscriptionAdmin objects.
670      * @see com.ubermq.jms.server.admin.DurableSubscriptionAdmin
671      */

672     public Collection getPersistentConsumers() throws java.rmi.RemoteException JavaDoc
673     {
674         ArrayList al = new ArrayList();
675
676         Iterator iter = durableSubscribers.values().iterator();
677         while (iter.hasNext())
678         {
679             DurableSubscriptionProxy p = (DurableSubscriptionProxy)iter.next();
680             al.add(new MyDurableAdmin(p));
681         }
682
683         return al;
684     }
685
686     private class MyDurableAdmin
687         implements PersistentConsumerAdmin
688     {
689         private DurableSubscriptionProxy durable;
690
691         MyDurableAdmin(DurableSubscriptionProxy durable)
692         {
693             this.durable = durable;
694         }
695
696         /**
697          * Describes the set of topics that the subscriber is
698          * subscribed to.<P>
699          *
700          * @return a topic or queue specifier
701          */

702         public String JavaDoc getSubscription()
703         {
704             return durable.getSubscription();
705         }
706
707         /**
708          * Describes the persistent consumer in a human-readable
709          * way. This should contain information about connected
710          * subscribers, or active/inactive status.<p>
711          *
712          * @return an HTML formatted String describing
713          * the consumer's state.
714          */

715         public String JavaDoc getDescription() throws java.rmi.RemoteException JavaDoc
716         {
717             return durable.getProxyFor().toHtml();
718         }
719
720         /**
721          * Returns true if the subscriber is representing
722          * a queue.<P>
723          */

724         public boolean isQueue()
725         {
726             return durable.getName().startsWith(QUEUE_NAME_PREFIX);
727         }
728
729         /**
730          * Indicates whether a consumer is connected to this subscription.<P>
731          *
732          * @return true if at least one consumer is connected to this subscription.
733          */

734         public boolean isActive()
735         {
736             return durable.getProxyFor().isOpen();
737         }
738
739         /**
740          * Deletes the subscription from the server. This operation
741          * removes all storage associated with the subscription and any
742          * enqueued messages.
743          *
744          * @throws JMSException if the subscription is being used.
745          */

746         public void close() throws javax.jms.JMSException JavaDoc
747         {
748             if (isActive())
749                 throw new javax.jms.JMSException JavaDoc("Cannot delete a durable subscriber that is in use.");
750
751             unsubscribeDurableSubscriber(durable.getName());
752         }
753
754         /**
755          * Describes the durable subscriber. This should generally
756          * be the human-readable name of the subscriber or queue.<P>
757          *
758          * @return a name describing the connection
759          */

760         public String JavaDoc getName()
761         {
762             return durable.getDisplayName();
763         }
764     }
765
766     private class MyConnectionAdmin
767         implements ConnectionAdmin
768     {
769         private ConnectionDestNode cdn;
770
771         MyConnectionAdmin(ConnectionDestNode conn)
772         {
773             this.cdn = conn;
774         }
775
776         public String JavaDoc getName()
777         {
778             return cdn.getDisplayName();
779         }
780
781         public boolean equals(Object JavaDoc o)
782         {
783             try
784             {
785                 if (o instanceof ConnectionAdmin)
786                     return ((ConnectionAdmin)o).getName().equals(getName());
787                 else return false;
788             }
789             catch (java.rmi.RemoteException JavaDoc e)
790             {
791                 return false;
792             }
793         }
794
795         /**
796          * Determines the current subscriptions registered for this
797          * connection. Each subscription is represented by a
798          * String object.
799          *
800          * @return a Collection of String objects representing
801          * subscriptions.
802          */

803         public Collection getSubscriptions()
804         {
805             ArrayList l = new ArrayList();
806
807             Iterator iter = router.getRoutesTo(cdn).iterator();
808             while (iter.hasNext())
809             {
810                 SourceSpec ss = (SourceSpec)iter.next();
811                 l.add(ss.getDisplayName());
812             }
813
814             return l;
815         }
816
817         /**
818          * Forcibly closes the connection. This can be useful if the connection
819          * is unresponsive.
820          */

821         public void close()
822         {
823             remove(cdn.getConnection());
824         }
825
826         /**
827          * Resumes delivery of messages to the connection.
828          */

829         public void start()
830         {
831             router.addKnownNode(cdn);
832         }
833
834         /**
835          * Stops delivery to the connection. This keeps resources open, and message
836          * delivery can be restarted using the <code>start</code> method.
837          */

838         public void stop()
839         {
840             router.removeKnownNode(cdn);
841         }
842
843         /**
844          * Indicates whether the connection is active. A connection
845          * is active unless it has been stopped using the <code>stop</code>
846          * method. Closed connections are not available administratively.
847          *
848          * @return true if the connection is active, false otherwise.
849          */

850         public boolean isActive()
851         {
852             boolean a = router.getKnownNodes().contains(cdn);
853             return a;
854         }
855     }
856
857     ////////////// STATISTICS
858
public void messageIn()
859     {
860         nMessagesIn++;
861     }
862
863     public void messageOut()
864     {
865         nMessagesOut++;
866     }
867
868     public void messageDropped()
869     {
870         nMessagesDropped++;
871     }
872
873     public boolean isRunning()
874     {
875         return true;
876     }
877
878     public Date getStartTime()
879     {
880         return new Date(startTime);
881     }
882
883     public int getMessagesDropped()
884     {
885         return nMessagesDropped;
886     }
887
888     public int getMessagesIn()
889     {
890         return nMessagesIn;
891     }
892
893     public int getMessagesOut()
894     {
895         return nMessagesOut;
896     }
897
898     public String JavaDoc getStatisticsAsText()
899     {
900         java.text.NumberFormat JavaDoc nf = java.text.NumberFormat.getNumberInstance();
901         nf.setMaximumFractionDigits(1);
902         nf.setMinimumFractionDigits(0);
903         nf.setGroupingUsed(true);
904
905         java.text.NumberFormat JavaDoc pf = java.text.NumberFormat.getPercentInstance();
906         pf.setMaximumFractionDigits(1);
907         pf.setMinimumFractionDigits(1);
908
909         long time = System.currentTimeMillis();
910         long uptime = (long)((time - startTime) / 1000F);
911
912         StringBuffer JavaDoc sb = new StringBuffer JavaDoc();
913         sb.append("\n\tMessages In: " + nf.format(nMessagesIn));
914         sb.append("\n\tMessages Out: " + nf.format(nMessagesOut));
915         sb.append("\n\tMessages Drop: " + nf.format(nMessagesDropped));
916
917         if (nMessagesIn > 0)
918         {
919             sb.append("\n");
920             sb.append("\n\tLatency: " + nf.format(accumLatency / ((double)nMessagesIn)) + " ms");
921             sb.append("\n\tThroughput: " + nf.format(1000 * ((double)nMessagesIn) / accumLatency) + " msg/s");
922             sb.append("\n\tUtilization: " + pf.format( accumLatency / (double)(time - startTime) ));
923         }
924
925         sb.append("\n\n\tUptime: " + formatMillis(uptime));
926
927         return sb.toString();
928     }
929
930     private String JavaDoc formatMillis(long ms)
931     {
932         java.text.NumberFormat JavaDoc tf = java.text.NumberFormat.getNumberInstance();
933         tf.setMinimumIntegerDigits(2);
934         tf.setMaximumFractionDigits(3);
935
936         return tf.format( ms / 3600 ) + "h " + tf.format( (ms % 3600) / 60 ) + "m";
937     }
938
939 }
940
Popular Tags