KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > SpyMessageProducer


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.mq;
23
24 import java.util.Enumeration JavaDoc;
25
26 import javax.jms.BytesMessage JavaDoc;
27 import javax.jms.DeliveryMode JavaDoc;
28 import javax.jms.Destination JavaDoc;
29 import javax.jms.IllegalStateException JavaDoc;
30 import javax.jms.InvalidDestinationException JavaDoc;
31 import javax.jms.JMSException JavaDoc;
32 import javax.jms.MapMessage JavaDoc;
33 import javax.jms.Message JavaDoc;
34 import javax.jms.MessageEOFException JavaDoc;
35 import javax.jms.MessageProducer JavaDoc;
36 import javax.jms.ObjectMessage JavaDoc;
37 import javax.jms.StreamMessage JavaDoc;
38 import javax.jms.TemporaryQueue JavaDoc;
39 import javax.jms.TemporaryTopic JavaDoc;
40 import javax.jms.TextMessage JavaDoc;
41
42 import org.jboss.logging.Logger;
43
44 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
45
46 /**
47  * This class implements javax.jms.MessageProducer
48  *
49  * @author Norbert Lataille (Norbert.Lataille@m4x.org)
50  * @author <a HREF="mailto:adrian@jboss.org">Adrian Brock</a>
51  * @version $Revision: 37830 $
52  */

53 public class SpyMessageProducer implements MessageProducer JavaDoc
54 {
55    /** The log */
56    static Logger log = Logger.getLogger(SpyMessageProducer.class);
57
58    /** Is trace enabled */
59    static boolean trace = log.isTraceEnabled();
60
61    /** The session */
62    protected SpySession session;
63    /** The destination */
64    protected Destination JavaDoc destination;
65    /** The defaultDeliveryMode */
66    protected int defaultDeliveryMode = SpyMessage.DEFAULT_DELIVERY_MODE;
67    /** The defaultPriority */
68    protected int defaultPriority = SpyMessage.DEFAULT_PRIORITY;
69    /** The defaultTTL */
70    protected long defaultTTL = SpyMessage.DEFAULT_TIME_TO_LIVE;
71    /** Whether we are closed */
72    private SynchronizedBoolean closed = new SynchronizedBoolean(false);
73    /** Whether to disable MessageID generation */
74    private boolean disableMessageID = false;
75    /** Whether to disable timestamp generation */
76    private boolean disableTS = false;
77
78    /**
79      * Create a new SpyMessageProducer
80      *
81      * @param session the session
82      * @param destination the destination
83      */

84    SpyMessageProducer(SpySession session, Destination JavaDoc destination)
85    {
86       trace = log.isTraceEnabled();
87       
88       this.session = session;
89       this.destination = destination;
90       try
91       {
92          if (destination instanceof TemporaryQueue JavaDoc || destination instanceof TemporaryTopic JavaDoc)
93             setDeliveryMode(DeliveryMode.NON_PERSISTENT);
94          else
95             setDeliveryMode(DeliveryMode.PERSISTENT);
96       }
97       catch (JMSException JavaDoc ignored)
98       {
99          log.debug("Ignored error during setDeliveryMode", ignored);
100       }
101       
102       if (trace)
103          log.trace("New message producer " + this);
104    }
105
106    public void setDisableMessageID(boolean value) throws JMSException JavaDoc
107    {
108       checkClosed();
109       disableMessageID = value;
110    }
111
112    public void setDisableMessageTimestamp(boolean value) throws JMSException JavaDoc
113    {
114       checkClosed();
115       disableTS = value;
116    }
117
118    public void setDeliveryMode(int deli) throws JMSException JavaDoc
119    {
120       checkClosed();
121       if (deli != DeliveryMode.NON_PERSISTENT && deli != DeliveryMode.PERSISTENT)
122          throw new JMSException JavaDoc("Bad DeliveryMode value");
123       else
124          defaultDeliveryMode = deli;
125    }
126
127    public void setPriority(int pri) throws JMSException JavaDoc
128    {
129       checkClosed();
130       if (pri < 0 || pri > 9)
131          throw new JMSException JavaDoc("Bad priority value");
132       else
133          defaultPriority = pri;
134    }
135
136    public void setTimeToLive(int timeToLive) throws JMSException JavaDoc
137    {
138       checkClosed();
139       if (timeToLive < 0)
140          throw new JMSException JavaDoc("Bad TimeToLive value");
141       else
142          defaultTTL = timeToLive;
143    }
144
145    public void setTimeToLive(long timeToLive) throws JMSException JavaDoc
146    {
147       checkClosed();
148       if (timeToLive < 0)
149          throw new JMSException JavaDoc("Bad TimeToLive value");
150       else
151          defaultTTL = timeToLive;
152    }
153
154    public boolean getDisableMessageID() throws JMSException JavaDoc
155    {
156       checkClosed();
157       return disableMessageID;
158    }
159
160    public boolean getDisableMessageTimestamp() throws JMSException JavaDoc
161    {
162       checkClosed();
163       return disableTS;
164    }
165
166    public int getDeliveryMode() throws JMSException JavaDoc
167    {
168       checkClosed();
169       return defaultDeliveryMode;
170    }
171
172    public int getPriority() throws JMSException JavaDoc
173    {
174       checkClosed();
175       return defaultPriority;
176    }
177
178    public long getTimeToLive() throws JMSException JavaDoc
179    {
180       checkClosed();
181       return defaultTTL;
182    }
183
184    public void close() throws JMSException JavaDoc
185    {
186       if (closed.set(true))
187          return;
188
189       session.removeProducer(this);
190
191       if (trace)
192          log.trace("Closed " + this);
193    }
194
195    public Destination JavaDoc getDestination() throws JMSException JavaDoc
196    {
197       checkClosed();
198       return destination;
199    }
200
201    public void send(Message JavaDoc message) throws JMSException JavaDoc
202    {
203       if (destination == null)
204          throw new UnsupportedOperationException JavaDoc(
205          "Not constructed with identifyed destination. Usage of method not allowed");
206       send(destination, message, defaultDeliveryMode, defaultPriority, defaultTTL);
207    }
208
209    public void send(Destination JavaDoc destination, Message JavaDoc message) throws JMSException JavaDoc
210    {
211       send(destination, message, defaultDeliveryMode, defaultPriority, defaultTTL);
212    }
213
214    public void send(Message JavaDoc message, int deliveryMode, int priority, long ttl) throws JMSException JavaDoc
215    {
216       if (destination == null)
217          throw new UnsupportedOperationException JavaDoc(
218          "Not constructed with identifyed destination. Usage of method not allowed");
219       send(destination, message, deliveryMode, priority, ttl);
220    }
221
222    public void send(Destination JavaDoc destination, Message JavaDoc message, int deliveryMode, int priority, long ttl)
223       throws JMSException JavaDoc
224    {
225       checkClosed();
226
227       if (this.destination != null && this.destination.equals(destination) == false)
228          throw new UnsupportedOperationException JavaDoc("Sending to " + destination
229                + " not allowed when producer created with " + this.destination);
230
231       if (destination == null || (destination instanceof SpyDestination) == false)
232          throw new InvalidDestinationException JavaDoc("Destination is not an instance of SpyDestination " + destination);
233
234       // Encapsulate the message if not a SpyMessage
235
SpyMessage sendMessage;
236       if ((message instanceof SpyMessage) == false)
237          sendMessage = encapsulateMessage(message);
238       else
239          sendMessage = (SpyMessage) message;
240
241       //Set the header fields
242
sendMessage.setJMSDestination(destination);
243       sendMessage.setJMSDeliveryMode(deliveryMode);
244       long ts = System.currentTimeMillis();
245       sendMessage.setJMSTimestamp(ts);
246       if (ttl == 0)
247          sendMessage.setJMSExpiration(0);
248       else
249          sendMessage.setJMSExpiration(ttl + ts);
250       sendMessage.setJMSPriority(priority);
251       String JavaDoc id = session.getNewMessageID();
252       sendMessage.setJMSMessageID(id);
253
254       // If we encapsulated the message, update the original message
255
if (message != sendMessage)
256       {
257          message.setJMSDestination(destination);
258          message.setJMSDeliveryMode(deliveryMode);
259          message.setJMSTimestamp(ts);
260          if (ttl == 0)
261             message.setJMSExpiration(0);
262          else
263             message.setJMSExpiration(ttl + ts);
264          message.setJMSPriority(priority);
265          message.setJMSMessageID(id);
266       }
267
268       if (trace)
269          log.trace("Sending message " + this + " \n" + sendMessage);
270       
271       //Send the message.
272
session.sendMessage(sendMessage);
273    }
274    
275    public String JavaDoc toString()
276    {
277       StringBuffer JavaDoc buffer = new StringBuffer JavaDoc(100);
278       buffer.append("SpyMessageProducer@").append(System.identityHashCode(this));
279       buffer.append("[ dest=").append(destination);
280       if (defaultDeliveryMode == DeliveryMode.PERSISTENT)
281          buffer.append(" delivery=").append("persist");
282       else
283          buffer.append(" delivery=").append("besteffort");
284       buffer.append(" priority=").append(defaultPriority);
285       buffer.append(" ttl=").append(defaultTTL);
286       buffer.append(" disableMessageID=").append(disableMessageID);
287       buffer.append(" disableTS=").append(disableTS);
288       buffer.append(" session=").append(session);
289       buffer.append(']');
290       return buffer.toString();
291    }
292
293    protected SpyMessage encapsulateMessage(Message JavaDoc message) throws JMSException JavaDoc
294    {
295       SpyMessage result;
296       if (message instanceof BytesMessage JavaDoc)
297       {
298          result = MessagePool.getBytesMessage();
299          BytesMessage JavaDoc original = (BytesMessage JavaDoc) message;
300          original.reset();
301          byte[] temp = new byte[1024];
302          int bytes = original.readBytes(temp);
303          while (bytes != -1)
304          {
305             ((BytesMessage JavaDoc) result).writeBytes(temp, 0, bytes);
306             bytes = original.readBytes(temp);
307          }
308       }
309       else if (message instanceof MapMessage JavaDoc)
310       {
311          result = MessagePool.getMapMessage();
312          MapMessage JavaDoc original = (MapMessage JavaDoc) message;
313          for (Enumeration JavaDoc en=original.getMapNames(); en.hasMoreElements();)
314          {
315             String JavaDoc key = (String JavaDoc) en.nextElement();
316             try
317             {
318                ((MapMessage JavaDoc) result).setObject(key, original.getObject(key));
319             }
320             catch (JMSException JavaDoc ignored)
321             {
322                if (trace)
323                   log.trace("Unable to copy map entry " + key, ignored);
324             }
325          }
326       }
327       else if (message instanceof StreamMessage JavaDoc)
328       {
329          result = MessagePool.getStreamMessage();
330          StreamMessage JavaDoc original = (StreamMessage JavaDoc) message;
331          original.reset();
332          try
333          {
334             while (true)
335             {
336                ((StreamMessage JavaDoc) result).writeObject(original.readObject());
337             }
338          }
339          catch (MessageEOFException JavaDoc expected)
340          {
341          }
342       }
343       else if (message instanceof ObjectMessage JavaDoc)
344       {
345          result = MessagePool.getObjectMessage();
346          ((ObjectMessage JavaDoc) result).setObject(((ObjectMessage JavaDoc) message).getObject());
347       }
348       else if (message instanceof TextMessage JavaDoc)
349       {
350          result = MessagePool.getTextMessage();
351          ((TextMessage JavaDoc) result).setText(((TextMessage JavaDoc) message).getText());
352       }
353       else
354          result = MessagePool.getMessage();
355
356       // Copy headers
357
try
358       {
359          result.setJMSCorrelationID(message.getJMSCorrelationID());
360       }
361       catch (JMSException JavaDoc e)
362       {
363          //must be as bytes
364
result.setJMSCorrelationIDAsBytes(message.getJMSCorrelationIDAsBytes());
365       }
366       result.setJMSReplyTo(message.getJMSReplyTo());
367       result.setJMSType(message.getJMSType());
368
369       // Copy properties
370
for (Enumeration JavaDoc en=message.getPropertyNames(); en.hasMoreElements();)
371       {
372          String JavaDoc key = (String JavaDoc) en.nextElement();
373          try
374          {
375             result.setObjectProperty(key, message.getObjectProperty(key));
376          }
377          catch (JMSException JavaDoc ignored)
378          {
379             if (trace)
380                log.trace("Unable to copy property " + key, ignored);
381          }
382       }
383
384       return result;
385    }
386
387    /**
388     * Check whether we are closed
389     *
390     * @throws IllegalStateException when the session is closed
391     */

392    protected void checkClosed() throws JMSException JavaDoc
393    {
394       if (closed.get())
395          throw new IllegalStateException JavaDoc("Message producer is closed");
396    }
397 }
398
Popular Tags