KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > ejb3 > mdb > DLQHandler


1 /***************************************
2  * *
3  * JBoss: The OpenSource J2EE WebOS *
4  * *
5  * Distributable under LGPL license. *
6  * See terms of license at gnu.org. *
7  * *
8  ***************************************/

9
10 package org.jboss.ejb3.mdb;
11
12 import org.jboss.jms.jndi.JMSProviderAdapter;
13 import org.jboss.system.ServiceMBeanSupport;
14
15 import javax.jms.Destination JavaDoc;
16 import javax.jms.JMSException JavaDoc;
17 import javax.jms.Message JavaDoc;
18 import javax.jms.Queue JavaDoc;
19 import javax.jms.QueueConnection JavaDoc;
20 import javax.jms.QueueConnectionFactory JavaDoc;
21 import javax.jms.QueueSender JavaDoc;
22 import javax.jms.QueueSession JavaDoc;
23 import javax.jms.Session JavaDoc;
24 import javax.naming.Context JavaDoc;
25 import javax.transaction.Status JavaDoc;
26 import javax.transaction.Synchronization JavaDoc;
27 import javax.transaction.Transaction JavaDoc;
28 import java.util.Enumeration JavaDoc;
29 import java.util.Hashtable JavaDoc;
30
31 /**
32  * Places redeliveded messages on a Dead Letter Queue.
33  *
34  *<p>
35  *The Dead Letter Queue handler is used to not set JBoss in an endles loop
36  * when a message is resent on and on due to transaction rollback for
37  * message receipt.
38  *
39  * <p>
40  * It sends message to a dead letter queue (configurable, defaults to
41  * queue/DLQ) when the message has been resent a configurable amount of times,
42  * defaults to 10.
43  *
44  * <p>
45  * The handler is configured through the element MDBConfig in
46  * container-invoker-conf.
47  *
48  * <p>
49  * The JMS property JBOSS_ORIG_DESTINATION in the resent message is set
50  * to the name of the original destination (Destionation.toString()).
51  *
52  * <p>
53  * The JMS property JBOSS_ORIG_MESSAGEID in the resent message is set
54  * to the id of the original message.
55  *
56  * Created: Thu Aug 23 21:17:26 2001
57  *
58  * @version <tt>$Revision: 1.1.6.1 $</tt>
59  * @author ???
60  * @author <a HREF="mailto:jason@planet57.com">Jason Dillon</a>
61  */

62 public class DLQHandler
63    extends ServiceMBeanSupport
64 {
65    /** JMS property name holding original destination. */
66    public static final String JavaDoc JBOSS_ORIG_DESTINATION = "JBOSS_ORIG_DESTINATION";
67
68    /** JMS property name holding original JMS message id. */
69    public static final String JavaDoc JBOSS_ORIG_MESSAGEID = "JBOSS_ORIG_MESSAGEID";
70
71    /** Properties copied from org.jboss.mq.SpyMessage */
72    private static final String JavaDoc JMS_JBOSS_REDELIVERY_COUNT = "JMS_JBOSS_REDELIVERY_COUNT";
73    private static final String JavaDoc JMS_JBOSS_REDELIVERY_LIMIT = "JMS_JBOSS_REDELIVERY_LIMIT";
74    
75    // Configuratable stuff
76

77    // May become configurable
78

79    /** Delivery mode for message, Message.DEFAULT_DELIVERY_MODE. */
80    private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
81
82    /** Priority for the message, Message.DEFAULT_PRIORITY */
83    private int priority = Message.DEFAULT_PRIORITY;
84
85    // Private stuff
86
private QueueConnection JavaDoc connection;
87    private Queue JavaDoc dlq;
88    private JMSProviderAdapter providerAdapter;
89    private Hashtable JavaDoc resentBuffer = new Hashtable JavaDoc();
90
91    private MDBConfig config;
92
93    public DLQHandler(final JMSProviderAdapter providerAdapter, MDBConfig config)
94    {
95       this.providerAdapter = providerAdapter;
96       this.config = config;
97    }
98
99    //--- Service
100

101    /**
102     * Initalize the service.
103     *
104     * @throws Exception Service failed to initalize.
105     */

106    protected void createService() throws Exception JavaDoc
107    {
108       Context JavaDoc ctx = providerAdapter.getInitialContext();
109
110       try
111       {
112          String JavaDoc factoryName = providerAdapter.getQueueFactoryRef();
113          QueueConnectionFactory JavaDoc factory = (QueueConnectionFactory JavaDoc)
114             ctx.lookup(factoryName);
115          log.debug("Using factory: " + factory);
116
117          if (config.getDlqUser() == null)
118             connection = factory.createQueueConnection();
119          else
120             connection = factory.createQueueConnection(config.getDlqUser(), config.getDlqPassword());
121          log.debug("Created connection: " + connection);
122
123          dlq = (Queue JavaDoc) ctx.lookup(config.getDlq());
124          log.debug("Using Queue: " + dlq);
125       }
126       catch (Exception JavaDoc e)
127       {
128          if (e instanceof JMSException JavaDoc)
129             throw e;
130          else
131          {
132             JMSException JavaDoc x = new JMSException JavaDoc("Error creating the dlq connection: " + e.getMessage());
133             x.setLinkedException(e);
134             throw x;
135          }
136       }
137       finally
138       {
139          ctx.close();
140       }
141    }
142
143    protected void startService() throws Exception JavaDoc
144    {
145       connection.start();
146    }
147
148    protected void stopService() throws Exception JavaDoc
149    {
150       connection.stop();
151    }
152
153    protected void destroyService() throws Exception JavaDoc
154    {
155       // Help the GC
156
if (connection != null)
157          connection.close();
158       connection = null;
159       dlq = null;
160       providerAdapter = null;
161    }
162    
163    //--- Logic
164

165    /**
166     * Check if a message has been redelivered to many times.
167     *
168     * If message has been redelivered to many times, send it to the
169     * dead letter queue (default to queue/DLQ).
170     *
171     * @return true if message is handled (i.e resent), false if not.
172     */

173    public boolean handleRedeliveredMessage(final Message JavaDoc msg, final Transaction JavaDoc tx)
174    {
175       boolean handled = false;
176       int max = config.getDlqMaxTimesRedelivered();
177       String JavaDoc id = null;
178       boolean jbossmq = true;
179       int count = 0;
180
181       try
182       {
183
184          if (msg.propertyExists(JMS_JBOSS_REDELIVERY_LIMIT))
185             max = msg.getIntProperty(JMS_JBOSS_REDELIVERY_LIMIT);
186
187          if (msg.propertyExists(JMS_JBOSS_REDELIVERY_COUNT))
188             count = msg.getIntProperty(JMS_JBOSS_REDELIVERY_COUNT);
189          else
190          {
191             id = msg.getJMSMessageID();
192             if (id == null)
193             {
194                // if we can't get the id we are basically fucked
195
log.error("Message id is null, can't handle message");
196                return false;
197             }
198             count = incrementResentCount(id);
199             jbossmq = false;
200          }
201
202          if (count > max)
203          {
204             id = msg.getJMSMessageID();
205             log.warn("Message resent too many times; sending it to DLQ; message id=" + id);
206
207             sendMessage(msg);
208             deleteFromBuffer(id);
209
210             handled = true;
211          }
212          else if (jbossmq == false && tx != null)
213          {
214             // Register a synchronization to remove the buffer entry
215
// should the transaction commit
216
DLQSynchronization synch = new DLQSynchronization(id);
217             try
218             {
219                tx.registerSynchronization(synch);
220             }
221             catch (Exception JavaDoc e)
222             {
223                log.warn("Error registering DlQ Synchronization with transaction " + tx, e);
224             }
225          }
226       }
227       catch (JMSException JavaDoc e)
228       {
229          // If we can't send it ahead, we do not dare to just drop it...or?
230
log.error("Could not send message to Dead Letter Queue", e);
231       }
232
233       return handled;
234    }
235
236    /**
237     * Send message to the configured dead letter queue, defaults to queue/DLQ.
238     */

239    protected void sendMessage(Message JavaDoc msg) throws JMSException JavaDoc
240    {
241       boolean trace = log.isTraceEnabled();
242
243       QueueSession JavaDoc session = null;
244       QueueSender JavaDoc sender = null;
245
246       try
247       {
248          msg = makeWritable(msg, trace); // Don't know yet if we are gona clone or not
249

250          // Set the properties
251
msg.setStringProperty(JBOSS_ORIG_MESSAGEID, msg.getJMSMessageID());
252          // Some providers (say Websphere MQ) don't set this to something we can use
253
Destination JavaDoc d = msg.getJMSDestination();
254          if (d != null)
255             msg.setStringProperty(JBOSS_ORIG_DESTINATION, d.toString());
256
257          session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
258          sender = session.createSender(dlq);
259          if (trace)
260          {
261             log.trace("Sending message to DLQ; destination=" +
262                dlq + ", session=" + session + ", sender=" + sender);
263          }
264
265          sender.send(msg, deliveryMode, priority, config.getDlqTimeToLive());
266
267          if (trace)
268          {
269             log.trace("Message sent.");
270          }
271
272       }
273       finally
274       {
275          try
276          {
277             if (sender != null) sender.close();
278             if (session != null) session.close();
279          }
280          catch (Exception JavaDoc e)
281          {
282             log.warn("Failed to close sender or session; ignoring", e);
283          }
284       }
285    }
286
287    /**
288     * Increment the counter for the specific JMS message id.
289     *
290     * @return the new counter value.
291     */

292    protected int incrementResentCount(String JavaDoc id)
293    {
294       BufferEntry entry = null;
295       boolean trace = log.isTraceEnabled();
296       if (!resentBuffer.containsKey(id))
297       {
298          if (trace)
299             log.trace("Making new entry for id " + id);
300          entry = new BufferEntry();
301          entry.id = id;
302          entry.count = 1;
303          resentBuffer.put(id, entry);
304       }
305       else
306       {
307          entry = (BufferEntry) resentBuffer.get(id);
308          entry.count++;
309          if (trace)
310             log.trace("Incremented old entry for id " + id + " count " + entry.count);
311       }
312       return entry.count;
313    }
314
315    /**
316     * Delete the entry in the message counter buffer for specifyed JMS id.
317     */

318    protected void deleteFromBuffer(String JavaDoc id)
319    {
320       resentBuffer.remove(id);
321    }
322
323    /**
324     * Make the Message properties writable.
325     *
326     * @return the writable message.
327     */

328    protected Message JavaDoc makeWritable(Message JavaDoc msg, boolean trace) throws JMSException JavaDoc
329    {
330       Hashtable JavaDoc tmp = new Hashtable JavaDoc();
331
332       // Save properties
333
for (Enumeration JavaDoc en = msg.getPropertyNames(); en.hasMoreElements();)
334       {
335          String JavaDoc key = (String JavaDoc) en.nextElement();
336          tmp.put(key, msg.getObjectProperty(key));
337       }
338       
339       // Make them writable
340
msg.clearProperties();
341
342       Enumeration JavaDoc keys = tmp.keys();
343       while (keys.hasMoreElements())
344       {
345          String JavaDoc key = (String JavaDoc) keys.nextElement();
346          try
347          {
348             msg.setObjectProperty(key, tmp.get(key));
349          }
350          catch (JMSException JavaDoc ignored)
351          {
352             if (trace)
353                log.trace("Could not copy message property " + key, ignored);
354          }
355       }
356
357       return msg;
358    }
359
360    public String JavaDoc toString()
361    {
362       return super.toString() +
363          "{ destinationJNDI=" + config.getDlq() +
364          ", maxResent=" + config.getDlqMaxTimesRedelivered() +
365          ", timeToLive=" + config.getDlqTimeToLive() +
366          " }";
367    }
368
369    private class BufferEntry
370    {
371       int count;
372       String JavaDoc id;
373    }
374
375    /**
376     * Remove a redelivered message from the DLQ's buffer when it is acknowledged
377     */

378    protected class DLQSynchronization
379       implements Synchronization JavaDoc
380    {
381       /** The message id */
382       String JavaDoc id;
383
384       public DLQSynchronization(String JavaDoc id)
385       {
386          this.id = id;
387       }
388
389       public void beforeCompletion()
390       {
391       }
392
393       /**
394        * Forget the message when the transaction commits
395        */

396       public void afterCompletion(int status)
397       {
398          if (status == Status.STATUS_COMMITTED)
399             deleteFromBuffer(id);
400       }
401    }
402 }
403
Popular Tags