KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > client > impl > LocalTopicSubscriber


1 package com.ubermq.jms.client.impl;
2
3 import EDU.oswego.cs.dl.util.concurrent.*;
4 import com.ubermq.jms.client.*;
5 import com.ubermq.jms.common.datagram.*;
6 import com.ubermq.jms.common.routing.*;
7 import com.ubermq.jms.common.routing.impl.*;
8 import com.ubermq.kernel.*;
9 import com.ubermq.kernel.overflow.*;
10 import com.ubermq.util.*;
11 import com.ubermq.*;
12 import java.io.*;
13 import java.util.*;
14 import javax.jms.*;
15
16 /**
17  * A client uses a TopicSubscriber for receiving messages that have been
18  * published to a topic. TopicSubscriber is the Pub/Sub variant of a JMS
19  * message consumer.
20  *
21  * <P>Regular TopicSubscriber's are not durable. They only receive messages
22  * that are published while they are active.
23  *
24  * <P>Messages filtered out by a subscriber's message selector will never
25  * be delivered to the subscriber. From the subscriber's perspective they
26  * simply don't exist.
27  *
28  * <P>In some cases, a connection may both publish and subscribe to a topic.
29  * The subscriber NoLocal attribute allows a subscriber to inhibit the
30  * delivery of messages published by its own connection.
31  *
32  * <P>If a client needs to receive all the messages published on a topic
33  * including the ones published while the subscriber is inactive, it uses
34  * a durable TopicSubscriber. JMS retains a record of this durable
35  * subscription and insures that all messages from the topic's publishers
36  * are retained until they are either acknowledged by this durable
37  * subscriber or they have expired.
38  *
39  * <P>Sessions with durable subscribers must always provide the same client
40  * identifier. In addition, each client must specify a name which uniquely
41  * identifies (within client identifier) each durable subscription it creates.
42  * Only one session at a time can have a TopicSubscriber for a particular
43  * durable subscription.
44  *
45  * <P>A client can change an existing durable subscription by creating a
46  * durable TopicSubscriber with the same name and a new topic and/or message
47  * selector. Changing a durable subscription is equivalent to deleting and
48  * recreating it.
49  *
50  * <P>TopicSessions provide the unsubscribe method for deleting a durable
51  * subscription created by their client. This deletes the state being
52  * maintained on behalf of the subscriber by its provider.
53  *
54  */

55 final class LocalTopicSubscriber
56     extends AbstractConsumer
57     implements javax.jms.TopicSubscriber JavaDoc
58 {
59     private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(LocalTopicSubscriber.class);
60     
61     // the maximum number of messages we buffer for sync delivery
62
private static int BOUNDED_BUFFER_SIZE = Integer.valueOf(Configurator.getProperty(ClientConfig.SUB_BOUNDED_BUFFER_SIZE, "50")).intValue();
63
64     /**
65      * JMS properties
66      */

67     private Topic t;
68     private boolean noLocal;
69     private String JavaDoc name; // for durables.
70
private boolean durable;
71
72     /**
73      * Construct a non-durable TopicSubscriber to the specified topic
74      *
75      * @param t the topic to subscribe to
76      * @param selector the message selector to use
77      * @param noLocal set to true to inhibit delivery of messages published
78      * on this connection.
79      * @param ts the session
80      * @param delivery the delivery manager
81      *
82      * @exception JMSException if a session fails to create a subscriber
83      * due to some JMS error.
84      */

85     LocalTopicSubscriber(Topic t,
86                          String JavaDoc selector,
87                          boolean noLocal,
88                          Session ts,
89                          IDeliveryManager delivery)
90         throws JMSException
91     {
92         super(ts,
93               selector,
94               delivery,
95               new BoundedPriorityQueue(BOUNDED_BUFFER_SIZE));
96
97         // set properties
98
this.t = t;
99         this.noLocal = noLocal;
100         this.durable = false;
101
102         // register myself with the subscription router.
103
ts.conn.getClientProcessor().registerSubscription(t.getTopicName(),
104                                                           selector,
105                                                           this);
106     }
107
108     /**
109      * This constructor creates (or restores) a durable subscription
110      * with the given name.
111      * @param t the topic to subscribe to
112      * @param selector the message selector to use
113      * @param noLocal set to true to inhibit delivery of messages published
114      * on this connection.
115      * @param name the durable subscription's unique name
116      * @param ts the session
117      * @param delivery the delivery manager
118      */

119     LocalTopicSubscriber(Topic t,
120                          String JavaDoc selector,
121                          boolean noLocal,
122                          String JavaDoc name,
123                          Session ts,
124                          IDeliveryManager delivery)
125         throws JMSException
126     {
127         super(ts,
128               selector,
129               delivery,
130               new LinkedQueue());
131
132         this.t = t;
133         this.noLocal = noLocal;
134         this.name = name;
135         this.durable = true;
136
137         // register myself with the subscription router.
138
ts.conn.getClientProcessor().registerDurableSubscription(t.getTopicName(),
139                                                                  name,
140                                                                  selector,
141                                                                  this);
142     }
143
144     //
145
// Implementation of TopicSubscriber Interface.
146
//
147
public Topic getTopic() {return this.t;}
148     public boolean getNoLocal() {return this.noLocal;}
149
150     public void deliver(IDatagram d)
151     {
152         // if this is a local delivery, check our
153
// special flag and possibly ignore
154
if (noLocal &&
155             getConnection().isSenderLocal(((IMessageDatagram)d).getSenderId()))
156         {
157             log.debug("msg dropped: sender " + ((IMessageDatagram)d).getSenderId() + " is local for topic " + ((IMessageDatagram)d).getTopicName());
158             return;
159         }
160
161         super.deliver(d);
162     }
163
164     protected void internalAcknowledge(IMessageDatagram md)
165         throws IOException
166     {
167         if (durable) {
168             super.internalAcknowledge(md);
169         } else {
170             // we don't use acknolwedgements for non durable subscribers
171
// because it is not meaningful
172
}
173     }
174
175     /**
176      * Since a provider may allocate some resources on behalf of a
177      * MessageConsumer outside the JVM, clients should close them when they
178      * are not needed. Relying on garbage collection to eventually reclaim
179      * these resources may not be timely enough.
180      *
181      * <P>This call blocks until a receive or message listener in progress
182      * has completed. A blocked message consumer receive call
183      * returns null when this message consumer is closed.
184      *
185      * @exception JMSException if JMS fails to close the consumer
186      * due to some error.
187      */

188     public void close() throws JMSException
189     {
190         super.close();
191         getClientProcessor().unregisterSubscription(t.getTopicName(),
192                                                     this);
193
194         if (durable)
195         {
196             getClientProcessor().durableGoingAway(this.name);
197         }
198     }
199
200 }
201
Popular Tags