1 22 package org.jboss.resource.adapter.jms.inflow.dlq; 23 24 import java.util.Enumeration ; 25 import java.util.HashMap ; 26 import java.util.Iterator ; 27 28 import javax.jms.Destination ; 29 import javax.jms.ExceptionListener ; 30 import javax.jms.JMSException ; 31 import javax.jms.Message ; 32 import javax.jms.Queue ; 33 import javax.jms.QueueConnection ; 34 import javax.jms.QueueConnectionFactory ; 35 import javax.jms.QueueSender ; 36 import javax.jms.QueueSession ; 37 import javax.jms.TopicConnectionFactory ; 38 import javax.naming.Context ; 39 40 import org.jboss.jms.jndi.JMSProviderAdapter; 41 import org.jboss.logging.Logger; 42 import org.jboss.resource.adapter.jms.inflow.DLQHandler; 43 import org.jboss.resource.adapter.jms.inflow.JmsActivation; 44 import org.jboss.resource.adapter.jms.inflow.JmsActivationSpec; 45 import org.jboss.util.naming.Util; 46 47 53 public abstract class AbstractDLQHandler implements DLQHandler, ExceptionListener 54 { 55 56 protected static final Logger log = Logger.getLogger(AbstractDLQHandler.class); 57 58 59 protected JmsActivation activation; 60 61 62 protected Queue dlq; 63 64 65 protected QueueConnection connection; 66 67 public boolean handleRedeliveredMessage(Message msg) 68 { 69 boolean handled = handleDelivery(msg); 70 if (handled) 71 sendToDLQ(msg); 72 return handled; 73 } 74 75 public void messageDelivered(Message msg) 76 { 77 } 78 79 public void setup(JmsActivation activation, Context ctx) throws Exception 80 { 81 this.activation = activation; 82 setupDLQDestination(ctx); 83 setupDLQConnection(ctx); 84 } 85 86 public void teardown() 87 { 88 teardownDLQConnection(); 89 teardownDLQDestination(); 90 } 91 92 public void onException(JMSException exception) 93 { 94 activation.handleFailure(exception); 95 } 96 97 103 protected void setupDLQDestination(Context ctx) throws Exception 104 { 105 String name = activation.getActivationSpec().getDLQJNDIName(); 106 dlq = (Queue ) Util.lookup(ctx, name, Queue .class); 107 } 108 109 112 protected void teardownDLQDestination() 113 { 114 } 115 116 122 protected void setupDLQConnection(Context ctx) throws Exception 123 { 124 JmsActivationSpec spec = activation.getActivationSpec(); 125 String user = spec.getDLQUser(); 126 String pass = spec.getDLQPassword(); 127 String clientID = spec.getDLQClientID(); 128 JMSProviderAdapter adapter = activation.getProviderAdapter(); 129 String queueFactoryRef = adapter.getQueueFactoryRef(); 130 log.debug("Attempting to lookup dlq connection factory " + queueFactoryRef); 131 QueueConnectionFactory qcf = (QueueConnectionFactory ) Util.lookup(ctx, queueFactoryRef, QueueConnectionFactory .class); 132 log.debug("Got dlq connection factory " + qcf + " from " + queueFactoryRef); 133 log.debug("Attempting to create queue connection with user " + user); 134 if (user != null) 135 connection = qcf.createQueueConnection(user, pass); 136 else 137 connection = qcf.createQueueConnection(); 138 if (clientID != null) 139 connection.setClientID(clientID); 140 connection.setExceptionListener(this); 141 log.debug("Using queue connection " + connection); 142 } 143 144 147 protected void teardownDLQConnection() 148 { 149 try 150 { 151 if (connection != null) 152 { 153 log.debug("Closing the " + connection); 154 connection.close(); 155 } 156 } 157 catch (Throwable t) 158 { 159 log.debug("Error closing the connection " + connection, t); 160 } 161 } 162 163 169 protected abstract boolean handleDelivery(Message msg); 170 171 178 protected void warnDLQ(Message msg, int count, int max) 179 { 180 log.warn("Message redelivered=" + count + " max=" + max + " sending it to the dlq " + msg); 181 } 182 183 188 protected void sendToDLQ(Message msg) 189 { 190 int deliveryMode = getDeliveryMode(msg); 191 int priority = getPriority(msg); 192 long timeToLive = getTimeToLive(msg); 193 194 if (timeToLive < 0) 196 { 197 if (log.isTraceEnabled()) 198 log.trace("Not sending the message to the DLQ, it has expired " + msg); 199 return; 200 } 201 202 Message copy = makeWritable(msg); 203 if (copy != null) 204 doSend(copy, deliveryMode, priority, timeToLive); 205 } 206 207 213 protected int getDeliveryMode(Message msg) 214 { 215 try 216 { 217 return msg.getJMSDeliveryMode(); 218 } 219 catch (Throwable t) 220 { 221 return Message.DEFAULT_DELIVERY_MODE; 222 } 223 } 224 225 231 protected int getPriority(Message msg) 232 { 233 try 234 { 235 return msg.getJMSPriority(); 236 } 237 catch (Throwable t) 238 { 239 return Message.DEFAULT_PRIORITY; 240 } 241 } 242 243 249 protected long getTimeToLive(Message msg) 250 { 251 try 252 { 253 long expires = msg.getJMSExpiration(); 254 if (expires == Message.DEFAULT_TIME_TO_LIVE) 255 return Message.DEFAULT_TIME_TO_LIVE; 256 return expires - System.currentTimeMillis(); 257 } 258 catch (Throwable t) 259 { 260 return Message.DEFAULT_TIME_TO_LIVE; 261 } 262 } 263 264 270 protected Message makeWritable(Message msg) 271 { 272 boolean trace = log.isTraceEnabled(); 273 274 try 275 { 276 HashMap tmp = new HashMap (); 277 278 for (Enumeration en = msg.getPropertyNames(); en.hasMoreElements();) 280 { 281 String key = (String ) en.nextElement(); 282 tmp.put(key, msg.getObjectProperty(key)); 283 } 284 285 msg.clearProperties(); 287 288 for (Iterator i = tmp.keySet().iterator(); i.hasNext();) 289 { 290 String key = (String ) i.next(); 291 try 292 { 293 msg.setObjectProperty(key, tmp.get(key)); 294 } 295 catch (JMSException ignored) 296 { 297 if (trace) 298 log.trace("Could not copy message property " + key, ignored); 299 } 300 } 301 302 msg.setStringProperty(JBOSS_ORIG_MESSAGEID, msg.getJMSMessageID()); 303 Destination destination = msg.getJMSDestination(); 304 if (destination != null) 305 msg.setStringProperty(JBOSS_ORIG_DESTINATION, destination.toString()); 306 307 return msg; 308 } 309 catch (Throwable t) 310 { 311 log.error("Unable to make writable " + msg, t); 312 return null; 313 } 314 } 315 316 321 protected void doSend(Message msg, int deliveryMode, int priority, long timeToLive) 322 { 323 QueueSession session = null; 324 try 325 { 326 session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); 327 QueueSender sender = session.createSender(dlq); 328 sender.send(msg, deliveryMode, priority, timeToLive); 329 } 330 catch (Throwable t) 331 { 332 handleSendError(msg, t); 333 } 334 finally 335 { 336 if (session != null) 337 { 338 try 339 { 340 session.close(); 341 } 342 catch (Throwable t) 343 { 344 log.trace("Ignored ", t); 345 } 346 } 347 } 348 } 349 350 356 protected void handleSendError(Message msg, Throwable t) 357 { 358 log.error("DLQ " + dlq + " error sending message " + msg, t); 359 } 360 } | Popular Tags |