KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > resource > adapter > jms > inflow > dlq > AbstractDLQHandler


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.resource.adapter.jms.inflow.dlq;
23
24 import java.util.Enumeration JavaDoc;
25 import java.util.HashMap JavaDoc;
26 import java.util.Iterator JavaDoc;
27
28 import javax.jms.Destination JavaDoc;
29 import javax.jms.ExceptionListener JavaDoc;
30 import javax.jms.JMSException JavaDoc;
31 import javax.jms.Message JavaDoc;
32 import javax.jms.Queue JavaDoc;
33 import javax.jms.QueueConnection JavaDoc;
34 import javax.jms.QueueConnectionFactory JavaDoc;
35 import javax.jms.QueueSender JavaDoc;
36 import javax.jms.QueueSession JavaDoc;
37 import javax.jms.TopicConnectionFactory JavaDoc;
38 import javax.naming.Context JavaDoc;
39
40 import org.jboss.jms.jndi.JMSProviderAdapter;
41 import org.jboss.logging.Logger;
42 import org.jboss.resource.adapter.jms.inflow.DLQHandler;
43 import org.jboss.resource.adapter.jms.inflow.JmsActivation;
44 import org.jboss.resource.adapter.jms.inflow.JmsActivationSpec;
45 import org.jboss.util.naming.Util;
46
47 /**
48  * An abstract DLQ handler.
49  *
50  * @author <a HREF="adrian@jboss.com">Adrian Brock</a>
51  * @version $Revision: 41243 $
52  */

53 public abstract class AbstractDLQHandler implements DLQHandler, ExceptionListener JavaDoc
54 {
55    /** The logger */
56    protected static final Logger log = Logger.getLogger(AbstractDLQHandler.class);
57
58    /** The activation */
59    protected JmsActivation activation;
60    
61    /** The DLQ */
62    protected Queue JavaDoc dlq;
63    
64    /** The DLQ Connection*/
65    protected QueueConnection JavaDoc connection;
66
67    public boolean handleRedeliveredMessage(Message JavaDoc msg)
68    {
69       boolean handled = handleDelivery(msg);
70       if (handled)
71          sendToDLQ(msg);
72       return handled;
73    }
74
75    public void messageDelivered(Message JavaDoc msg)
76    {
77    }
78    
79    public void setup(JmsActivation activation, Context JavaDoc ctx) throws Exception JavaDoc
80    {
81       this.activation = activation;
82       setupDLQDestination(ctx);
83       setupDLQConnection(ctx);
84    }
85
86    public void teardown()
87    {
88       teardownDLQConnection();
89       teardownDLQDestination();
90    }
91
92    public void onException(JMSException JavaDoc exception)
93    {
94       activation.handleFailure(exception);
95    }
96
97    /**
98     * Setup the DLQ Destination
99     *
100     * @param ctx the naming context
101     * @throws Exception for any error
102     */

103    protected void setupDLQDestination(Context JavaDoc ctx) throws Exception JavaDoc
104    {
105       String JavaDoc name = activation.getActivationSpec().getDLQJNDIName();
106       dlq = (Queue JavaDoc) Util.lookup(ctx, name, Queue JavaDoc.class);
107    }
108
109    /**
110     * Teardown the DLQ Destination
111     */

112    protected void teardownDLQDestination()
113    {
114    }
115
116    /**
117     * Setup the DLQ Connection
118     *
119     * @param ctx the naming context
120     * @throws Exception for any error
121     */

122    protected void setupDLQConnection(Context JavaDoc ctx) throws Exception JavaDoc
123    {
124       JmsActivationSpec spec = activation.getActivationSpec();
125       String JavaDoc user = spec.getDLQUser();
126       String JavaDoc pass = spec.getDLQPassword();
127       String JavaDoc clientID = spec.getDLQClientID();
128       JMSProviderAdapter adapter = activation.getProviderAdapter();
129       String JavaDoc queueFactoryRef = adapter.getQueueFactoryRef();
130       log.debug("Attempting to lookup dlq connection factory " + queueFactoryRef);
131       QueueConnectionFactory JavaDoc qcf = (QueueConnectionFactory JavaDoc) Util.lookup(ctx, queueFactoryRef, QueueConnectionFactory JavaDoc.class);
132       log.debug("Got dlq connection factory " + qcf + " from " + queueFactoryRef);
133       log.debug("Attempting to create queue connection with user " + user);
134       if (user != null)
135          connection = qcf.createQueueConnection(user, pass);
136       else
137          connection = qcf.createQueueConnection();
138       if (clientID != null)
139          connection.setClientID(clientID);
140       connection.setExceptionListener(this);
141       log.debug("Using queue connection " + connection);
142    }
143
144    /**
145     * Teardown the DLQ Connection
146     */

147    protected void teardownDLQConnection()
148    {
149       try
150       {
151          if (connection != null)
152          {
153             log.debug("Closing the " + connection);
154             connection.close();
155          }
156       }
157       catch (Throwable JavaDoc t)
158       {
159          log.debug("Error closing the connection " + connection, t);
160       }
161    }
162    
163    /**
164     * Do we handle the message?
165     *
166     * @param msg the message to handle
167     * @return true when we handle it
168     */

169    protected abstract boolean handleDelivery(Message JavaDoc msg);
170    
171    /**
172     * Warn that a message is being handled by the DLQ
173     *
174     * @param msg
175     * @param count the number of redelivers
176     * @param max the maximum number of redeliveries
177     */

178    protected void warnDLQ(Message JavaDoc msg, int count, int max)
179    {
180       log.warn("Message redelivered=" + count + " max=" + max + " sending it to the dlq " + msg);
181    }
182    
183    /**
184     * Send the message to the dlq
185     *
186     * @param msg message to send
187     */

188    protected void sendToDLQ(Message JavaDoc msg)
189    {
190       int deliveryMode = getDeliveryMode(msg);
191       int priority = getPriority(msg);
192       long timeToLive = getTimeToLive(msg);
193       
194       // If we get a negative time to live that means the message has expired
195
if (timeToLive < 0)
196       {
197          if (log.isTraceEnabled())
198             log.trace("Not sending the message to the DLQ, it has expired " + msg);
199          return;
200       }
201       
202       Message JavaDoc copy = makeWritable(msg);
203       if (copy != null)
204          doSend(copy, deliveryMode, priority, timeToLive);
205    }
206    
207    /**
208     * Get the delivery mode for the DLQ message
209     *
210     * @param msg the message
211     * @return the delivery mode
212     */

213    protected int getDeliveryMode(Message JavaDoc msg)
214    {
215       try
216       {
217          return msg.getJMSDeliveryMode();
218       }
219       catch (Throwable JavaDoc t)
220       {
221          return Message.DEFAULT_DELIVERY_MODE;
222       }
223    }
224    
225    /**
226     * Get the priority for the DLQ message
227     *
228     * @param msg the message
229     * @return the priority
230     */

231    protected int getPriority(Message JavaDoc msg)
232    {
233       try
234       {
235          return msg.getJMSPriority();
236       }
237       catch (Throwable JavaDoc t)
238       {
239          return Message.DEFAULT_PRIORITY;
240       }
241    }
242    
243    /**
244     * Get the time to live for the DLQ message
245     *
246     * @param msg the message
247     * @return the time to live
248     */

249    protected long getTimeToLive(Message JavaDoc msg)
250    {
251       try
252       {
253          long expires = msg.getJMSExpiration();
254          if (expires == Message.DEFAULT_TIME_TO_LIVE)
255             return Message.DEFAULT_TIME_TO_LIVE;
256          return expires - System.currentTimeMillis();
257       }
258       catch (Throwable JavaDoc t)
259       {
260          return Message.DEFAULT_TIME_TO_LIVE;
261       }
262    }
263    
264    /**
265     * Make a writable copy of the message
266     *
267     * @param msg the message
268     * @return the copied message
269     */

270    protected Message JavaDoc makeWritable(Message JavaDoc msg)
271    {
272       boolean trace = log.isTraceEnabled();
273       
274       try
275       {
276          HashMap JavaDoc tmp = new HashMap JavaDoc();
277
278          // Save properties
279
for (Enumeration JavaDoc en = msg.getPropertyNames(); en.hasMoreElements();)
280          {
281             String JavaDoc key = (String JavaDoc) en.nextElement();
282             tmp.put(key, msg.getObjectProperty(key));
283          }
284          
285          // Make them writable
286
msg.clearProperties();
287
288          for (Iterator JavaDoc i = tmp.keySet().iterator(); i.hasNext();)
289          {
290             String JavaDoc key = (String JavaDoc) i.next();
291             try
292             {
293                msg.setObjectProperty(key, tmp.get(key));
294             }
295             catch (JMSException JavaDoc ignored)
296             {
297                if (trace)
298                   log.trace("Could not copy message property " + key, ignored);
299             }
300          }
301
302          msg.setStringProperty(JBOSS_ORIG_MESSAGEID, msg.getJMSMessageID());
303          Destination JavaDoc destination = msg.getJMSDestination();
304          if (destination != null)
305             msg.setStringProperty(JBOSS_ORIG_DESTINATION, destination.toString());
306
307          return msg;
308       }
309       catch (Throwable JavaDoc t)
310       {
311          log.error("Unable to make writable " + msg, t);
312          return null;
313       }
314    }
315    
316    /**
317     * Do the message send
318     *
319     * @param msg the message
320     */

321    protected void doSend(Message JavaDoc msg, int deliveryMode, int priority, long timeToLive)
322    {
323       QueueSession JavaDoc session = null;
324       try
325       {
326          session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
327          QueueSender JavaDoc sender = session.createSender(dlq);
328          sender.send(msg, deliveryMode, priority, timeToLive);
329       }
330       catch (Throwable JavaDoc t)
331       {
332          handleSendError(msg, t);
333       }
334       finally
335       {
336          if (session != null)
337          {
338             try
339             {
340                session.close();
341             }
342             catch (Throwable JavaDoc t)
343             {
344                log.trace("Ignored ", t);
345             }
346          }
347       }
348    }
349    
350    /**
351     * Handle a failure to send the message to the dlq
352     *
353     * @param msg the message
354     * @param t the error
355     */

356    protected void handleSendError(Message JavaDoc msg, Throwable JavaDoc t)
357    {
358       log.error("DLQ " + dlq + " error sending message " + msg, t);
359    }
360 }
Popular Tags