1 22 package org.jboss.ejb.plugins.jms; 23 24 import java.util.Hashtable ; 25 import java.util.HashMap ; 26 import java.util.Map ; 27 import java.util.Enumeration ; 28 import java.util.Iterator ; 29 30 import javax.naming.Context ; 31 import javax.jms.ExceptionListener ; 32 import javax.jms.Session ; 33 import javax.jms.QueueConnection ; 34 import javax.jms.QueueConnectionFactory ; 35 import javax.jms.QueueSession ; 36 import javax.jms.QueueSender ; 37 import javax.jms.Queue ; 38 import javax.jms.Message ; 39 import javax.jms.JMSException ; 40 import javax.jms.Destination ; 41 import javax.transaction.Status ; 42 import javax.transaction.Synchronization ; 43 import javax.transaction.Transaction ; 44 45 import org.w3c.dom.Element ; 46 47 import org.jboss.deployment.DeploymentException; 48 import org.jboss.metadata.MetaData; 49 import org.jboss.jms.jndi.JMSProviderAdapter; 50 import org.jboss.system.ServiceMBeanSupport; 51 52 83 public class DLQHandler extends ServiceMBeanSupport implements ExceptionListener 84 { 85 86 public static final String PROPERTY_DELIVERY_COUNT = "JMSXDeliveryCount"; 87 88 89 public static final String JBOSS_ORIG_DESTINATION = "JBOSS_ORIG_DESTINATION"; 90 91 92 public static final String JBOSS_ORIG_MESSAGEID = "JBOSS_ORIG_MESSAGEID"; 93 94 95 private static final String JMS_JBOSS_REDELIVERY_COUNT = "JMS_JBOSS_REDELIVERY_COUNT"; 96 private static final String JMS_JBOSS_REDELIVERY_LIMIT = "JMS_JBOSS_REDELIVERY_LIMIT"; 97 98 105 private String destinationJNDI = "queue/DLQ"; 106 107 113 private int maxResent = 10; 114 115 122 private long timeToLive = Message.DEFAULT_TIME_TO_LIVE; 123 124 126 127 private int deliveryMode = Message.DEFAULT_DELIVERY_MODE; 128 129 130 private int priority = Message.DEFAULT_PRIORITY; 131 132 133 private String dlqUser; 134 135 136 private String dlqPass; 137 138 private QueueConnection connection; 140 private Queue dlq; 141 private JMSProviderAdapter providerAdapter; 142 private JMSContainerInvoker invoker; 143 private Hashtable resentBuffer = new Hashtable (); 144 145 public DLQHandler(final JMSProviderAdapter providerAdapter, final JMSContainerInvoker invoker) 146 { 147 this.providerAdapter = providerAdapter; 148 this.invoker = invoker; 149 } 150 151 public void onException(JMSException e) 152 { 153 if (invoker != null && invoker.exListener != null) 154 invoker.exListener.handleFailure(e); 155 else 156 { 157 log.warn("DLQHandler got JMS Failure but there is no link to JMSContainerInvoker's exception listener.", e); 158 159 if (connection != null) 161 { 162 try 163 { 164 connection.close(); 165 } 166 catch (Throwable ignored) 167 { 168 log.trace("Ignored error closing connection", ignored); 169 } 170 connection = null; 171 } 172 } 173 } 174 175 protected void createService() throws Exception 176 { 177 Context ctx = providerAdapter.getInitialContext(); 178 179 try 180 { 181 String factoryName = providerAdapter.getQueueFactoryRef(); 182 QueueConnectionFactory factory = (QueueConnectionFactory ) 183 ctx.lookup(factoryName); 184 log.debug("Using factory: " + factory); 185 186 if (dlqUser == null) 187 connection = factory.createQueueConnection(); 188 else 189 connection = factory.createQueueConnection(dlqUser, dlqPass); 190 log.debug("Created connection: " + connection); 191 192 dlq = (Queue ) ctx.lookup(destinationJNDI); 193 log.debug("Using Queue: " + dlq); 194 } 195 finally 196 { 197 ctx.close(); 198 } 199 } 200 201 protected void startService() throws Exception 202 { 203 connection.setExceptionListener(this); 204 connection.start(); 205 } 206 207 protected void stopService() throws Exception 208 { 209 try 210 { 211 connection.setExceptionListener(null); 212 connection.stop(); 213 } 214 catch (Throwable t) 215 { 216 log.trace("Ignored error stopping DLQ", t); 217 } 218 } 219 220 protected void destroyService() throws Exception 221 { 222 if (connection != null) 224 connection.close(); 225 connection = null; 226 dlq = null; 227 providerAdapter = null; 228 } 229 230 238 public boolean handleRedeliveredMessage(final Message msg, final Transaction tx) 239 { 240 boolean handled = false; 241 int max = this.maxResent; 242 String id = null; 243 boolean fromMessage = true; 244 int count = 0; 245 246 try 247 { 248 249 if (msg.propertyExists(JMS_JBOSS_REDELIVERY_LIMIT)) 250 max = msg.getIntProperty(JMS_JBOSS_REDELIVERY_LIMIT); 251 252 try 253 { 254 if (msg.propertyExists(PROPERTY_DELIVERY_COUNT)) 255 count = msg.getIntProperty(PROPERTY_DELIVERY_COUNT); 256 } 257 catch (JMSException ignored) 258 { 259 } 260 if (count > 0) 261 { 262 --count; 264 } 265 else if (msg.propertyExists(JMS_JBOSS_REDELIVERY_COUNT)) 266 count = msg.getIntProperty(JMS_JBOSS_REDELIVERY_COUNT); 267 else 268 { 269 id = msg.getJMSMessageID(); 270 if (id == null) 271 { 272 log.error("Message id is null, can't handle message"); 274 return false; 275 } 276 count = incrementResentCount(id); 277 fromMessage = false; 278 } 279 280 if (count > max) 281 { 282 id = msg.getJMSMessageID(); 283 log.warn("Message resent too many times; sending it to DLQ; message id=" + id); 284 285 sendMessage(msg); 286 deleteFromBuffer(id); 287 288 handled = true; 289 } 290 else if (fromMessage == false && tx != null) 291 { 292 DLQSynchronization synch = new DLQSynchronization(id); 295 try 296 { 297 tx.registerSynchronization(synch); 298 } 299 catch (Exception e) 300 { 301 log.warn("Error registering DlQ Synchronization with transaction " + tx, e); 302 } 303 } 304 } 305 catch (JMSException e) 306 { 307 log.error("Could not send message to Dead Letter Queue", e); 309 } 310 311 return handled; 312 } 313 314 317 protected void sendMessage(Message msg) throws JMSException 318 { 319 boolean trace = log.isTraceEnabled(); 320 321 QueueSession session = null; 322 QueueSender sender = null; 323 324 try 325 { 326 msg = makeWritable(msg, trace); 328 msg.setStringProperty(JBOSS_ORIG_MESSAGEID, msg.getJMSMessageID()); 330 Destination d = msg.getJMSDestination(); 332 if (d != null) 333 msg.setStringProperty(JBOSS_ORIG_DESTINATION, d.toString()); 334 335 session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 336 sender = session.createSender(dlq); 337 if (trace) 338 { 339 log.trace("Sending message to DLQ; destination=" + 340 dlq + ", session=" + session + ", sender=" + sender); 341 } 342 343 sender.send(msg, deliveryMode, priority, timeToLive); 344 345 if (trace) 346 { 347 log.trace("Message sent."); 348 } 349 350 } 351 finally 352 { 353 try 354 { 355 if (sender != null) sender.close(); 356 if (session != null) session.close(); 357 } 358 catch (Exception e) 359 { 360 log.warn("Failed to close sender or session; ignoring", e); 361 } 362 } 363 } 364 365 370 protected int incrementResentCount(String id) 371 { 372 BufferEntry entry = null; 373 boolean trace = log.isTraceEnabled(); 374 if (!resentBuffer.containsKey(id)) 375 { 376 if (trace) 377 log.trace("Making new entry for id " + id); 378 entry = new BufferEntry(); 379 entry.id = id; 380 entry.count = 1; 381 resentBuffer.put(id, entry); 382 } 383 else 384 { 385 entry = (BufferEntry) resentBuffer.get(id); 386 entry.count++; 387 if (trace) 388 log.trace("Incremented old entry for id " + id + " count " + entry.count); 389 } 390 return entry.count; 391 } 392 393 396 protected void deleteFromBuffer(String id) 397 { 398 resentBuffer.remove(id); 399 } 400 401 406 protected Message makeWritable(Message msg, boolean trace) throws JMSException 407 { 408 HashMap tmp = new HashMap (); 409 410 for (Enumeration en = msg.getPropertyNames(); en.hasMoreElements();) 412 { 413 String key = (String ) en.nextElement(); 414 tmp.put(key, msg.getObjectProperty(key)); 415 } 416 417 msg.clearProperties(); 419 420 Iterator i = tmp.entrySet().iterator(); 421 while (i.hasNext()) 422 { 423 Map.Entry me = (Map.Entry )i.next(); 424 String key = (String ) me.getKey(); 425 try 426 { 427 msg.setObjectProperty(key, me.getValue()); 428 } 429 catch (JMSException ignored) 430 { 431 if (trace) 432 log.trace("Could not copy message property " + key, ignored); 433 } 434 } 435 436 return msg; 437 } 438 439 442 public void importXml(final Element element) throws DeploymentException 443 { 444 destinationJNDI = MetaData.getElementContent 445 (MetaData.getUniqueChild(element, "DestinationQueue")); 446 447 try 448 { 449 String mr = MetaData.getElementContent 450 (MetaData.getUniqueChild(element, "MaxTimesRedelivered")); 451 maxResent = Integer.parseInt(mr); 452 } 453 catch (Exception ignore) 454 { 455 } 456 457 try 458 { 459 String ttl = MetaData.getElementContent 460 (MetaData.getUniqueChild(element, "TimeToLive")); 461 timeToLive = Long.parseLong(ttl); 462 463 if (timeToLive < 0) 464 { 465 log.warn("Invalid TimeToLive: " + timeToLive + "; using default"); 466 timeToLive = Message.DEFAULT_TIME_TO_LIVE; 467 } 468 } 469 catch (Exception ignore) 470 { 471 } 472 473 dlqUser = MetaData.getElementContent(MetaData.getOptionalChild(element, "DLQUser")); 474 dlqPass = MetaData.getElementContent(MetaData.getOptionalChild(element, "DLQPassword")); 475 } 476 477 public String toString() 478 { 479 return super.toString() + 480 "{ destinationJNDI=" + destinationJNDI + 481 ", maxResent=" + maxResent + 482 ", timeToLive=" + timeToLive + 483 " }"; 484 } 485 486 private static class BufferEntry 487 { 488 int count; 489 String id; 490 } 491 492 495 protected class DLQSynchronization implements Synchronization 496 { 497 498 String id; 499 500 public DLQSynchronization(String id) 501 { 502 this.id = id; 503 } 504 505 public void beforeCompletion() 506 { 507 } 508 509 512 public void afterCompletion(int status) 513 { 514 if (status == Status.STATUS_COMMITTED) 515 deleteFromBuffer(id); 516 } 517 } 518 } 519 | Popular Tags |