KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > ejb > plugins > jms > DLQHandler


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.ejb.plugins.jms;
23
24 import java.util.Hashtable JavaDoc;
25 import java.util.HashMap JavaDoc;
26 import java.util.Map JavaDoc;
27 import java.util.Enumeration JavaDoc;
28 import java.util.Iterator JavaDoc;
29
30 import javax.naming.Context JavaDoc;
31 import javax.jms.ExceptionListener JavaDoc;
32 import javax.jms.Session JavaDoc;
33 import javax.jms.QueueConnection JavaDoc;
34 import javax.jms.QueueConnectionFactory JavaDoc;
35 import javax.jms.QueueSession JavaDoc;
36 import javax.jms.QueueSender JavaDoc;
37 import javax.jms.Queue JavaDoc;
38 import javax.jms.Message JavaDoc;
39 import javax.jms.JMSException JavaDoc;
40 import javax.jms.Destination JavaDoc;
41 import javax.transaction.Status JavaDoc;
42 import javax.transaction.Synchronization JavaDoc;
43 import javax.transaction.Transaction JavaDoc;
44
45 import org.w3c.dom.Element JavaDoc;
46
47 import org.jboss.deployment.DeploymentException;
48 import org.jboss.metadata.MetaData;
49 import org.jboss.jms.jndi.JMSProviderAdapter;
50 import org.jboss.system.ServiceMBeanSupport;
51
52 /**
53  * Places redeliveded messages on a Dead Letter Queue.
54  *
55  *<p>
56  *The Dead Letter Queue handler is used to not set JBoss in an endles loop
57  * when a message is resent on and on due to transaction rollback for
58  * message receipt.
59  *
60  * <p>
61  * It sends message to a dead letter queue (configurable, defaults to
62  * queue/DLQ) when the message has been resent a configurable amount of times,
63  * defaults to 10.
64  *
65  * <p>
66  * The handler is configured through the element MDBConfig in
67  * container-invoker-conf.
68  *
69  * <p>
70  * The JMS property JBOSS_ORIG_DESTINATION in the resent message is set
71  * to the name of the original destination (Destination.toString())
72  * if it is present.
73  *
74  * <p>
75  * The JMS property JBOSS_ORIG_MESSAGEID in the resent message is set
76  * to the id of the original message.
77  *
78  * @author <a HREF="mailto:jason@planet57.com">Jason Dillon</a>
79  * @author Scott.Stark@jboss.org
80  * @author Adrian Brock
81  * @version <tt>$Revision: 40393 $</tt>
82  */

83 public class DLQHandler extends ServiceMBeanSupport implements ExceptionListener JavaDoc
84 {
85    /** Standard property for delivery count */
86    public static final String JavaDoc PROPERTY_DELIVERY_COUNT = "JMSXDeliveryCount";
87
88    /** JMS property name holding original destination. */
89    public static final String JavaDoc JBOSS_ORIG_DESTINATION = "JBOSS_ORIG_DESTINATION";
90
91    /** JMS property name holding original JMS message id. */
92    public static final String JavaDoc JBOSS_ORIG_MESSAGEID = "JBOSS_ORIG_MESSAGEID";
93
94    /** Properties copied from org.jboss.mq.SpyMessage */
95    private static final String JavaDoc JMS_JBOSS_REDELIVERY_COUNT = "JMS_JBOSS_REDELIVERY_COUNT";
96    private static final String JavaDoc JMS_JBOSS_REDELIVERY_LIMIT = "JMS_JBOSS_REDELIVERY_LIMIT";
97
98    /**
99     * Destination to send dead letters to.
100     *
101     * <p>
102     * Defaults to <em>queue/DLQ</em>, configurable through
103     * <tt>DestinationQueue</tt> element.
104     */

105    private String JavaDoc destinationJNDI = "queue/DLQ";
106
107    /**
108     * Maximum times a message is alowed to be resent.
109     *
110     * <p>Defaults to <em>10</em>, configurable through
111     * <tt>MaxTimesRedelivered</tt> element.
112     */

113    private int maxResent = 10;
114
115    /**
116     * Time to live for the message.
117     *
118     * <p>
119     * Defaults to <em>{@link Message#DEFAULT_TIME_TO_LIVE}</em>,
120     * configurable through the <tt>TimeToLive</tt> element.
121     */

122    private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
123    
124    // May become configurable
125

126    /** Delivery mode for message, Message.DEFAULT_DELIVERY_MODE. */
127    private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
128
129    /** Priority for the message, Message.DEFAULT_PRIORITY */
130    private int priority = Message.DEFAULT_PRIORITY;
131
132    /** The dlq user for the connection */
133    private String JavaDoc dlqUser;
134
135    /** The dlq password for the connection */
136    private String JavaDoc dlqPass;
137
138    // Private stuff
139
private QueueConnection JavaDoc connection;
140    private Queue JavaDoc dlq;
141    private JMSProviderAdapter providerAdapter;
142    private JMSContainerInvoker invoker;
143    private Hashtable JavaDoc resentBuffer = new Hashtable JavaDoc();
144
145    public DLQHandler(final JMSProviderAdapter providerAdapter, final JMSContainerInvoker invoker)
146    {
147       this.providerAdapter = providerAdapter;
148       this.invoker = invoker;
149    }
150    
151    public void onException(JMSException JavaDoc e)
152    {
153       if (invoker != null && invoker.exListener != null)
154          invoker.exListener.handleFailure(e);
155       else
156       {
157          log.warn("DLQHandler got JMS Failure but there is no link to JMSContainerInvoker's exception listener.", e);
158
159          // We shouldn't get here, but if we do, we should at least close the connection
160
if (connection != null)
161          {
162             try
163             {
164                connection.close();
165             }
166             catch (Throwable JavaDoc ignored)
167             {
168                log.trace("Ignored error closing connection", ignored);
169             }
170             connection = null;
171          }
172       }
173    }
174    
175    protected void createService() throws Exception JavaDoc
176    {
177       Context JavaDoc ctx = providerAdapter.getInitialContext();
178
179       try
180       {
181          String JavaDoc factoryName = providerAdapter.getQueueFactoryRef();
182          QueueConnectionFactory JavaDoc factory = (QueueConnectionFactory JavaDoc)
183             ctx.lookup(factoryName);
184          log.debug("Using factory: " + factory);
185
186          if (dlqUser == null)
187             connection = factory.createQueueConnection();
188          else
189             connection = factory.createQueueConnection(dlqUser, dlqPass);
190          log.debug("Created connection: " + connection);
191
192          dlq = (Queue JavaDoc) ctx.lookup(destinationJNDI);
193          log.debug("Using Queue: " + dlq);
194       }
195       finally
196       {
197          ctx.close();
198       }
199    }
200    
201    protected void startService() throws Exception JavaDoc
202    {
203       connection.setExceptionListener(this);
204       connection.start();
205    }
206
207    protected void stopService() throws Exception JavaDoc
208    {
209       try
210       {
211          connection.setExceptionListener(null);
212          connection.stop();
213       }
214       catch (Throwable JavaDoc t)
215       {
216          log.trace("Ignored error stopping DLQ", t);
217       }
218    }
219
220    protected void destroyService() throws Exception JavaDoc
221    {
222       // Help the GC
223
if (connection != null)
224          connection.close();
225       connection = null;
226       dlq = null;
227       providerAdapter = null;
228    }
229    
230    /**
231     * Check if a message has been redelivered to many times.
232     *
233     * If message has been redelivered to many times, send it to the
234     * dead letter queue (default to queue/DLQ).
235     *
236     * @return true if message is handled (i.e resent), false if not.
237     */

238    public boolean handleRedeliveredMessage(final Message JavaDoc msg, final Transaction JavaDoc tx)
239    {
240       boolean handled = false;
241       int max = this.maxResent;
242       String JavaDoc id = null;
243       boolean fromMessage = true;
244       int count = 0;
245
246       try
247       {
248
249          if (msg.propertyExists(JMS_JBOSS_REDELIVERY_LIMIT))
250             max = msg.getIntProperty(JMS_JBOSS_REDELIVERY_LIMIT);
251
252          try
253          {
254             if (msg.propertyExists(PROPERTY_DELIVERY_COUNT))
255                count = msg.getIntProperty(PROPERTY_DELIVERY_COUNT);
256          }
257          catch (JMSException JavaDoc ignored)
258          {
259          }
260          if (count > 0)
261          {
262             // The delivery count is one too many
263
--count;
264          }
265          else if (msg.propertyExists(JMS_JBOSS_REDELIVERY_COUNT))
266             count = msg.getIntProperty(JMS_JBOSS_REDELIVERY_COUNT);
267          else
268          {
269             id = msg.getJMSMessageID();
270             if (id == null)
271             {
272                // if we can't get the id we are basically fucked
273
log.error("Message id is null, can't handle message");
274                return false;
275             }
276             count = incrementResentCount(id);
277             fromMessage = false;
278          }
279
280          if (count > max)
281          {
282             id = msg.getJMSMessageID();
283             log.warn("Message resent too many times; sending it to DLQ; message id=" + id);
284
285             sendMessage(msg);
286             deleteFromBuffer(id);
287
288             handled = true;
289          }
290          else if (fromMessage == false && tx != null)
291          {
292             // Register a synchronization to remove the buffer entry
293
// should the transaction commit
294
DLQSynchronization synch = new DLQSynchronization(id);
295             try
296             {
297                tx.registerSynchronization(synch);
298             }
299             catch (Exception JavaDoc e)
300             {
301                log.warn("Error registering DlQ Synchronization with transaction " + tx, e);
302             }
303          }
304       }
305       catch (JMSException JavaDoc e)
306       {
307          // If we can't send it ahead, we do not dare to just drop it...or?
308
log.error("Could not send message to Dead Letter Queue", e);
309       }
310
311       return handled;
312    }
313
314    /**
315     * Send message to the configured dead letter queue, defaults to queue/DLQ.
316     */

317    protected void sendMessage(Message JavaDoc msg) throws JMSException JavaDoc
318    {
319       boolean trace = log.isTraceEnabled();
320
321       QueueSession JavaDoc session = null;
322       QueueSender JavaDoc sender = null;
323
324       try
325       {
326          msg = makeWritable(msg, trace); // Don't know yet if we are gona clone or not
327

328          // Set the properties
329
msg.setStringProperty(JBOSS_ORIG_MESSAGEID, msg.getJMSMessageID());
330          // Some providers (say Websphere MQ) don't set this to something we can use
331
Destination JavaDoc d = msg.getJMSDestination();
332          if (d != null)
333             msg.setStringProperty(JBOSS_ORIG_DESTINATION, d.toString());
334
335          session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
336          sender = session.createSender(dlq);
337          if (trace)
338          {
339             log.trace("Sending message to DLQ; destination=" +
340                dlq + ", session=" + session + ", sender=" + sender);
341          }
342
343          sender.send(msg, deliveryMode, priority, timeToLive);
344
345          if (trace)
346          {
347             log.trace("Message sent.");
348          }
349
350       }
351       finally
352       {
353          try
354          {
355             if (sender != null) sender.close();
356             if (session != null) session.close();
357          }
358          catch (Exception JavaDoc e)
359          {
360             log.warn("Failed to close sender or session; ignoring", e);
361          }
362       }
363    }
364
365    /**
366     * Increment the counter for the specific JMS message id.
367     *
368     * @return the new counter value.
369     */

370    protected int incrementResentCount(String JavaDoc id)
371    {
372       BufferEntry entry = null;
373       boolean trace = log.isTraceEnabled();
374       if (!resentBuffer.containsKey(id))
375       {
376          if (trace)
377             log.trace("Making new entry for id " + id);
378          entry = new BufferEntry();
379          entry.id = id;
380          entry.count = 1;
381          resentBuffer.put(id, entry);
382       }
383       else
384       {
385          entry = (BufferEntry) resentBuffer.get(id);
386          entry.count++;
387          if (trace)
388             log.trace("Incremented old entry for id " + id + " count " + entry.count);
389       }
390       return entry.count;
391    }
392
393    /**
394     * Delete the entry in the message counter buffer for specifyed JMS id.
395     */

396    protected void deleteFromBuffer(String JavaDoc id)
397    {
398       resentBuffer.remove(id);
399    }
400
401    /**
402     * Make the Message properties writable.
403     *
404     * @return the writable message.
405     */

406    protected Message JavaDoc makeWritable(Message JavaDoc msg, boolean trace) throws JMSException JavaDoc
407    {
408       HashMap JavaDoc tmp = new HashMap JavaDoc();
409
410       // Save properties
411
for (Enumeration JavaDoc en = msg.getPropertyNames(); en.hasMoreElements();)
412       {
413          String JavaDoc key = (String JavaDoc) en.nextElement();
414          tmp.put(key, msg.getObjectProperty(key));
415       }
416       
417       // Make them writable
418
msg.clearProperties();
419
420       Iterator JavaDoc i = tmp.entrySet().iterator();
421       while (i.hasNext())
422       {
423          Map.Entry JavaDoc me = (Map.Entry JavaDoc)i.next();
424          String JavaDoc key = (String JavaDoc) me.getKey();
425          try
426          {
427             msg.setObjectProperty(key, me.getValue());
428          }
429          catch (JMSException JavaDoc ignored)
430          {
431             if (trace)
432                log.trace("Could not copy message property " + key, ignored);
433          }
434       }
435
436       return msg;
437    }
438
439    /**
440     * Takes an MDBConfig Element
441     */

442    public void importXml(final Element JavaDoc element) throws DeploymentException
443    {
444       destinationJNDI = MetaData.getElementContent
445          (MetaData.getUniqueChild(element, "DestinationQueue"));
446
447       try
448       {
449          String JavaDoc mr = MetaData.getElementContent
450             (MetaData.getUniqueChild(element, "MaxTimesRedelivered"));
451          maxResent = Integer.parseInt(mr);
452       }
453       catch (Exception JavaDoc ignore)
454       {
455       }
456
457       try
458       {
459          String JavaDoc ttl = MetaData.getElementContent
460             (MetaData.getUniqueChild(element, "TimeToLive"));
461          timeToLive = Long.parseLong(ttl);
462
463          if (timeToLive < 0)
464          {
465             log.warn("Invalid TimeToLive: " + timeToLive + "; using default");
466             timeToLive = Message.DEFAULT_TIME_TO_LIVE;
467          }
468       }
469       catch (Exception JavaDoc ignore)
470       {
471       }
472
473       dlqUser = MetaData.getElementContent(MetaData.getOptionalChild(element, "DLQUser"));
474       dlqPass = MetaData.getElementContent(MetaData.getOptionalChild(element, "DLQPassword"));
475    }
476
477    public String JavaDoc toString()
478    {
479       return super.toString() +
480          "{ destinationJNDI=" + destinationJNDI +
481          ", maxResent=" + maxResent +
482          ", timeToLive=" + timeToLive +
483          " }";
484    }
485
486    private static class BufferEntry
487    {
488       int count;
489       String JavaDoc id;
490    }
491
492    /**
493     * Remove a redelivered message from the DLQ's buffer when it is acknowledged
494     */

495    protected class DLQSynchronization implements Synchronization JavaDoc
496    {
497       /** The message id */
498       String JavaDoc id;
499
500       public DLQSynchronization(String JavaDoc id)
501       {
502          this.id = id;
503       }
504
505       public void beforeCompletion()
506       {
507       }
508
509       /**
510        * Forget the message when the transaction commits
511        */

512       public void afterCompletion(int status)
513       {
514          if (status == Status.STATUS_COMMITTED)
515             deleteFromBuffer(id);
516       }
517    }
518 }
519
Popular Tags