KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > ActiveMQMessageProducer


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import java.util.HashMap JavaDoc;
21 import java.util.concurrent.atomic.AtomicLong JavaDoc;
22
23 import javax.jms.DeliveryMode JavaDoc;
24 import javax.jms.Destination JavaDoc;
25 import javax.jms.IllegalStateException JavaDoc;
26 import javax.jms.InvalidDestinationException JavaDoc;
27 import javax.jms.JMSException JavaDoc;
28 import javax.jms.Message JavaDoc;
29 import javax.jms.MessageFormatException JavaDoc;
30 import javax.jms.MessageProducer JavaDoc;
31
32 import org.apache.activemq.command.ActiveMQDestination;
33 import org.apache.activemq.command.ProducerAck;
34 import org.apache.activemq.command.ProducerId;
35 import org.apache.activemq.command.ProducerInfo;
36 import org.apache.activemq.management.JMSProducerStatsImpl;
37 import org.apache.activemq.management.StatsCapable;
38 import org.apache.activemq.management.StatsImpl;
39 import org.apache.activemq.memory.UsageManager;
40 import org.apache.activemq.util.IntrospectionSupport;
41
42 /**
43  * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
44  * destination. A <CODE>MessageProducer</CODE> object is created by passing a
45  * <CODE>Destination</CODE> object to a message-producer creation method
46  * supplied by a session.
47  * <P>
48  * <CODE>MessageProducer</CODE> is the parent interface for all message
49  * producers.
50  * <P>
51  * A client also has the option of creating a message producer without
52  * supplying a destination. In this case, a destination must be provided with
53  * every send operation. A typical use for this kind of message producer is to
54  * send replies to requests using the request's <CODE>JMSReplyTo</CODE>
55  * destination.
56  * <P>
57  * A client can specify a default delivery mode, priority, and time to live for
58  * messages sent by a message producer. It can also specify the delivery mode,
59  * priority, and time to live for an individual message.
60  * <P>
61  * A client can specify a time-to-live value in milliseconds for each message
62  * it sends. This value defines a message expiration time that is the sum of
63  * the message's time-to-live and the GMT when it is sent (for transacted
64  * sends, this is the time the client sends the message, not the time the
65  * transaction is committed).
66  * <P>
67  * A JMS provider should do its best to expire messages accurately; however,
68  * the JMS API does not define the accuracy provided.
69  *
70  * @version $Revision: 1.14 $
71  * @see javax.jms.TopicPublisher
72  * @see javax.jms.QueueSender
73  * @see javax.jms.Session#createProducer
74  */

75 public class ActiveMQMessageProducer implements MessageProducer JavaDoc, StatsCapable, Closeable, Disposable {
76
77     protected ActiveMQSession session;
78     protected ProducerInfo info;
79     private JMSProducerStatsImpl stats;
80     private AtomicLong JavaDoc messageSequence;
81
82     protected boolean closed;
83     private boolean disableMessageID;
84     private boolean disableMessageTimestamp;
85     private int defaultDeliveryMode;
86     private int defaultPriority;
87     private long defaultTimeToLive;
88     private long startTime;
89     private MessageTransformer transformer;
90     private UsageManager producerWindow;
91
92     protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination)
93             throws JMSException JavaDoc {
94         this.session = session;
95         this.info = new ProducerInfo(producerId);
96         this.info.setWindowSize(session.connection.getProducerWindowSize());
97         if (destination!=null && destination.getOptions() != null) {
98             HashMap JavaDoc options = new HashMap JavaDoc(destination.getOptions());
99             IntrospectionSupport.setProperties(this.info, options, "producer.");
100         }
101         this.info.setDestination(destination);
102         
103         // Enable producer window flow control if protocol > 3 and the window size > 0
104
if( session.connection.getProtocolVersion()>=3 && this.info.getWindowSize()>0 ) {
105             producerWindow = new UsageManager("Producer Window: "+producerId);
106             producerWindow.setLimit(this.info.getWindowSize());
107         }
108         
109         this.disableMessageID = false;
110         this.disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault();
111         this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
112         this.defaultPriority = Message.DEFAULT_PRIORITY;
113         this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
114         this.startTime = System.currentTimeMillis();
115         this.messageSequence = new AtomicLong JavaDoc(0);
116         this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
117         this.session.addProducer(this);
118         this.session.asyncSendPacket(info);
119         setTransformer(session.getTransformer());
120     }
121
122     public StatsImpl getStats() {
123         return stats;
124     }
125
126     public JMSProducerStatsImpl getProducerStats() {
127         return stats;
128     }
129
130     /**
131      * Sets whether message IDs are disabled.
132      * <P>
133      * Since message IDs take some effort to create and increase a message's
134      * size, some JMS providers may be able to optimize message overhead if
135      * they are given a hint that the message ID is not used by an application.
136      * By calling the <CODE>setDisableMessageID</CODE> method on this message
137      * producer, a JMS client enables this potential optimization for all
138      * messages sent by this message producer. If the JMS provider accepts this
139      * hint, these messages must have the message ID set to null; if the
140      * provider ignores the hint, the message ID must be set to its normal
141      * unique value.
142      * <P>
143      * Message IDs are enabled by default.
144      *
145      * @param value indicates if message IDs are disabled
146      * @throws JMSException if the JMS provider fails to close the producer due to
147      * some internal error.
148      */

149     public void setDisableMessageID(boolean value) throws JMSException JavaDoc {
150         checkClosed();
151         this.disableMessageID = value;
152     }
153
154     /**
155      * Gets an indication of whether message IDs are disabled.
156      *
157      * @return an indication of whether message IDs are disabled
158      * @throws JMSException if the JMS provider fails to determine if message IDs are
159      * disabled due to some internal error.
160      */

161     public boolean getDisableMessageID() throws JMSException JavaDoc {
162         checkClosed();
163         return this.disableMessageID;
164     }
165
166     /**
167      * Sets whether message timestamps are disabled.
168      * <P>
169      * Since timestamps take some effort to create and increase a message's
170      * size, some JMS providers may be able to optimize message overhead if
171      * they are given a hint that the timestamp is not used by an application.
172      * By calling the <CODE>setDisableMessageTimestamp</CODE> method on this
173      * message producer, a JMS client enables this potential optimization for
174      * all messages sent by this message producer. If the JMS provider accepts
175      * this hint, these messages must have the timestamp set to zero; if the
176      * provider ignores the hint, the timestamp must be set to its normal
177      * value.
178      * <P>
179      * Message timestamps are enabled by default.
180      *
181      * @param value indicates if message timestamps are disabled
182      * @throws JMSException if the JMS provider fails to close the producer due to
183      * some internal error.
184      */

185     public void setDisableMessageTimestamp(boolean value) throws JMSException JavaDoc {
186         checkClosed();
187         this.disableMessageTimestamp = value;
188     }
189
190     /**
191      * Gets an indication of whether message timestamps are disabled.
192      *
193      * @return an indication of whether message timestamps are disabled
194      * @throws JMSException if the JMS provider fails to close the producer due to
195      * some internal error.
196      */

197     public boolean getDisableMessageTimestamp() throws JMSException JavaDoc {
198         checkClosed();
199         return this.disableMessageTimestamp;
200     }
201
202     /**
203      * Sets the producer's default delivery mode.
204      * <P>
205      * Delivery mode is set to <CODE>PERSISTENT</CODE> by default.
206      *
207      * @param newDeliveryMode the message delivery mode for this message producer; legal
208      * values are <code>DeliveryMode.NON_PERSISTENT</code> and
209      * <code>DeliveryMode.PERSISTENT</code>
210      * @throws JMSException if the JMS provider fails to set the delivery mode due to
211      * some internal error.
212      * @see javax.jms.MessageProducer#getDeliveryMode
213      * @see javax.jms.DeliveryMode#NON_PERSISTENT
214      * @see javax.jms.DeliveryMode#PERSISTENT
215      * @see javax.jms.Message#DEFAULT_DELIVERY_MODE
216      */

217     public void setDeliveryMode(int newDeliveryMode) throws JMSException JavaDoc {
218         if (newDeliveryMode != DeliveryMode.PERSISTENT && newDeliveryMode != DeliveryMode.NON_PERSISTENT) {
219             throw new IllegalStateException JavaDoc("unkown delivery mode: " + newDeliveryMode);
220         }
221         checkClosed();
222         this.defaultDeliveryMode = newDeliveryMode;
223     }
224
225     /**
226      * Gets the producer's default delivery mode.
227      *
228      * @return the message delivery mode for this message producer
229      * @throws JMSException if the JMS provider fails to close the producer due to
230      * some internal error.
231      */

232     public int getDeliveryMode() throws JMSException JavaDoc {
233         checkClosed();
234         return this.defaultDeliveryMode;
235     }
236
237     /**
238      * Sets the producer's default priority.
239      * <P>
240      * The JMS API defines ten levels of priority value, with 0 as the lowest
241      * priority and 9 as the highest. Clients should consider priorities 0-4 as
242      * gradations of normal priority and priorities 5-9 as gradations of
243      * expedited priority. Priority is set to 4 by default.
244      *
245      * @param newDefaultPriority the message priority for this message producer; must be a
246      * value between 0 and 9
247      * @throws JMSException if the JMS provider fails to set the delivery mode due to
248      * some internal error.
249      * @see javax.jms.MessageProducer#getPriority
250      * @see javax.jms.Message#DEFAULT_PRIORITY
251      */

252     public void setPriority(int newDefaultPriority) throws JMSException JavaDoc {
253         if (newDefaultPriority < 0 || newDefaultPriority > 9) {
254             throw new IllegalStateException JavaDoc("default priority must be a value between 0 and 9");
255         }
256         checkClosed();
257         this.defaultPriority = newDefaultPriority;
258     }
259
260     /**
261      * Gets the producer's default priority.
262      *
263      * @return the message priority for this message producer
264      * @throws JMSException if the JMS provider fails to close the producer due to
265      * some internal error.
266      * @see javax.jms.MessageProducer#setPriority
267      */

268     public int getPriority() throws JMSException JavaDoc {
269         checkClosed();
270         return this.defaultPriority;
271     }
272
273     /**
274      * Sets the default length of time in milliseconds from its dispatch time
275      * that a produced message should be retained by the message system.
276      * <P>
277      * Time to live is set to zero by default.
278      *
279      * @param timeToLive the message time to live in milliseconds; zero is unlimited
280      * @throws JMSException if the JMS provider fails to set the time to live due to
281      * some internal error.
282      * @see javax.jms.MessageProducer#getTimeToLive
283      * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE
284      */

285     public void setTimeToLive(long timeToLive) throws JMSException JavaDoc {
286         if (timeToLive < 0l) {
287             throw new IllegalStateException JavaDoc("cannot set a negative timeToLive");
288         }
289         checkClosed();
290         this.defaultTimeToLive = timeToLive;
291     }
292
293     /**
294      * Gets the default length of time in milliseconds from its dispatch time
295      * that a produced message should be retained by the message system.
296      *
297      * @return the message time to live in milliseconds; zero is unlimited
298      * @throws JMSException if the JMS provider fails to get the time to live due to
299      * some internal error.
300      * @see javax.jms.MessageProducer#setTimeToLive
301      */

302     public long getTimeToLive() throws JMSException JavaDoc {
303         checkClosed();
304         return this.defaultTimeToLive;
305     }
306
307     /**
308      * Gets the destination associated with this <CODE>MessageProducer</CODE>.
309      *
310      * @return this producer's <CODE>Destination/ <CODE>
311      * @throws JMSException if the JMS provider fails to close the producer due to
312      * some internal error.
313      * @since 1.1
314      */

315     public Destination JavaDoc getDestination() throws JMSException JavaDoc {
316         checkClosed();
317         return this.info.getDestination();
318     }
319
320     /**
321      * Closes the message producer.
322      * <P>
323      * Since a provider may allocate some resources on behalf of a <CODE>
324      * MessageProducer</CODE> outside the Java virtual machine, clients should
325      * close them when they are not needed. Relying on garbage collection to
326      * eventually reclaim these resources may not be timely enough.
327      *
328      * @throws JMSException if the JMS provider fails to close the producer due to
329      * some internal error.
330      */

331     public void close() throws JMSException JavaDoc {
332         if( closed==false ) {
333             dispose();
334             this.session.asyncSendPacket(info.createRemoveCommand());
335         }
336     }
337
338     public void dispose() {
339         if( closed==false ) {
340             this.session.removeProducer(this);
341             closed = true;
342         }
343     }
344
345
346     /**
347      * Check if the instance of this producer has been closed.
348      * @throws IllegalStateException
349      */

350     protected void checkClosed() throws IllegalStateException JavaDoc {
351         if (closed) {
352             throw new IllegalStateException JavaDoc("The producer is closed");
353         }
354     }
355
356     /**
357      * Sends a message using the <CODE>MessageProducer</CODE>'s default
358      * delivery mode, priority, and time to live.
359      *
360      * @param message the message to send
361      * @throws JMSException if the JMS provider fails to send the message due to some
362      * internal error.
363      * @throws MessageFormatException if an invalid message is specified.
364      * @throws InvalidDestinationException if a client uses this method with a <CODE>
365      * MessageProducer</CODE> with an invalid destination.
366      * @throws java.lang.UnsupportedOperationException
367      * if a client uses this method with a <CODE>
368      * MessageProducer</CODE> that did not specify a
369      * destination at creation time.
370      * @see javax.jms.Session#createProducer
371      * @see javax.jms.MessageProducer
372      * @since 1.1
373      */

374     public void send(Message message) throws JMSException JavaDoc {
375         this.send(this.getDestination(),
376                   message,
377                   this.defaultDeliveryMode,
378                   this.defaultPriority,
379                   this.defaultTimeToLive);
380     }
381
382     /**
383      * Sends a message to the destination, specifying delivery mode, priority,
384      * and time to live.
385      *
386      * @param message the message to send
387      * @param deliveryMode the delivery mode to use
388      * @param priority the priority for this message
389      * @param timeToLive the message's lifetime (in milliseconds)
390      * @throws JMSException if the JMS provider fails to send the message due to some
391      * internal error.
392      * @throws MessageFormatException if an invalid message is specified.
393      * @throws InvalidDestinationException if a client uses this method with a <CODE>
394      * MessageProducer</CODE> with an invalid destination.
395      * @throws java.lang.UnsupportedOperationException
396      * if a client uses this method with a <CODE>
397      * MessageProducer</CODE> that did not specify a
398      * destination at creation time.
399      * @see javax.jms.Session#createProducer
400      * @since 1.1
401      */

402     public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException JavaDoc {
403         this.send(this.getDestination(),
404                   message,
405                   deliveryMode,
406                   priority,
407                   timeToLive);
408     }
409
410     /**
411      * Sends a message to a destination for an unidentified message producer.
412      * Uses the <CODE>MessageProducer</CODE>'s default delivery mode,
413      * priority, and time to live.
414      * <P>
415      * Typically, a message producer is assigned a destination at creation
416      * time; however, the JMS API also supports unidentified message producers,
417      * which require that the destination be supplied every time a message is
418      * sent.
419      *
420      * @param destination the destination to send this message to
421      * @param message the message to send
422      * @throws JMSException if the JMS provider fails to send the message due to some
423      * internal error.
424      * @throws MessageFormatException if an invalid message is specified.
425      * @throws InvalidDestinationException if a client uses this method with an invalid destination.
426      * @throws java.lang.UnsupportedOperationException
427      * if a client uses this method with a <CODE>
428      * MessageProducer</CODE> that specified a destination at
429      * creation time.
430      * @see javax.jms.Session#createProducer
431      * @see javax.jms.MessageProducer
432      */

433     public void send(Destination JavaDoc destination, Message message) throws JMSException JavaDoc {
434         this.send(destination,
435                   message,
436                   this.defaultDeliveryMode,
437                   this.defaultPriority,
438                   this.defaultTimeToLive);
439     }
440
441     /**
442      * Sends a message to a destination for an unidentified message producer,
443      * specifying delivery mode, priority and time to live.
444      * <P>
445      * Typically, a message producer is assigned a destination at creation
446      * time; however, the JMS API also supports unidentified message producers,
447      * which require that the destination be supplied every time a message is
448      * sent.
449      *
450      * @param destination the destination to send this message to
451      * @param message the message to send
452      * @param deliveryMode the delivery mode to use
453      * @param priority the priority for this message
454      * @param timeToLive the message's lifetime (in milliseconds)
455      * @throws JMSException if the JMS provider fails to send the message due to some
456      * internal error.
457      * @throws UnsupportedOperationException if an invalid destination is specified.
458      * @throws InvalidDestinationException if a client uses this method with an invalid destination.
459      * @see javax.jms.Session#createProducer
460      * @since 1.1
461      */

462     public void send(Destination JavaDoc destination, Message message, int deliveryMode, int priority, long timeToLive)
463             throws JMSException JavaDoc {
464         checkClosed();
465         if (destination == null) {
466             if( info.getDestination() == null ) {
467                 throw new UnsupportedOperationException JavaDoc("A destination must be specified.");
468             }
469             throw new InvalidDestinationException JavaDoc("Don't understand null destinations");
470         }
471
472         ActiveMQDestination dest;
473         if( destination == info.getDestination() ) {
474             dest = (ActiveMQDestination) destination;
475         } else if ( info.getDestination() == null ) {
476             dest = ActiveMQDestination.transform(destination);
477         } else {
478             throw new UnsupportedOperationException JavaDoc("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
479         }
480         if (dest == null) {
481             throw new JMSException JavaDoc("No destination specified");
482         }
483
484         if (transformer != null) {
485             Message transformedMessage = transformer.producerTransform(session, this, message);
486             if (transformedMessage != null) {
487                 message = transformedMessage;
488             }
489         }
490         
491         if( producerWindow!=null ) {
492             try {
493                 producerWindow.waitForSpace();
494             } catch (InterruptedException JavaDoc e) {
495                 throw new JMSException JavaDoc("Send aborted due to thread interrupt.");
496             }
497         }
498         
499         this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow);
500         
501         stats.onMessage();
502     }
503
504
505     public MessageTransformer getTransformer() {
506         return transformer;
507     }
508
509     /**
510      * Sets the transformer used to transform messages before they are sent on to the JMS bus
511      */

512     public void setTransformer(MessageTransformer transformer) {
513         this.transformer = transformer;
514     }
515
516     /**
517      * @return the time in milli second when this object was created.
518      */

519     protected long getStartTime() {
520         return this.startTime;
521     }
522
523     /**
524      * @return Returns the messageSequence.
525      */

526     protected long getMessageSequence() {
527         return messageSequence.incrementAndGet();
528     }
529
530     /**
531      * @param messageSequence The messageSequence to set.
532      */

533     protected void setMessageSequence(AtomicLong JavaDoc messageSequence) {
534         this.messageSequence = messageSequence;
535     }
536
537     /**
538      * @return Returns the info.
539      */

540     protected ProducerInfo getProducerInfo(){
541         return this.info!=null?this.info:null;
542     }
543
544     /**
545      * @param info The info to set
546      */

547     protected void setProducerInfo(ProducerInfo info){
548         this.info = info;
549     }
550
551     public String JavaDoc toString() {
552         return "ActiveMQMessageProducer { value=" +info.getProducerId()+" }";
553     }
554
555     public void onProducerAck(ProducerAck pa) {
556         if( this.producerWindow!=null ) {
557             this.producerWindow.decreaseUsage(pa.getSize());
558         }
559     }
560
561 }
562
Popular Tags