1 9 10 package org.jboss.ejb3.mdb; 11 12 import org.jboss.jms.jndi.JMSProviderAdapter; 13 import org.jboss.system.ServiceMBeanSupport; 14 15 import javax.jms.Destination ; 16 import javax.jms.JMSException ; 17 import javax.jms.Message ; 18 import javax.jms.Queue ; 19 import javax.jms.QueueConnection ; 20 import javax.jms.QueueConnectionFactory ; 21 import javax.jms.QueueSender ; 22 import javax.jms.QueueSession ; 23 import javax.jms.Session ; 24 import javax.naming.Context ; 25 import javax.transaction.Status ; 26 import javax.transaction.Synchronization ; 27 import javax.transaction.Transaction ; 28 import java.util.Enumeration ; 29 import java.util.Hashtable ; 30 31 62 public class DLQHandler 63 extends ServiceMBeanSupport 64 { 65 66 public static final String JBOSS_ORIG_DESTINATION = "JBOSS_ORIG_DESTINATION"; 67 68 69 public static final String JBOSS_ORIG_MESSAGEID = "JBOSS_ORIG_MESSAGEID"; 70 71 72 private static final String JMS_JBOSS_REDELIVERY_COUNT = "JMS_JBOSS_REDELIVERY_COUNT"; 73 private static final String JMS_JBOSS_REDELIVERY_LIMIT = "JMS_JBOSS_REDELIVERY_LIMIT"; 74 75 77 79 80 private int deliveryMode = Message.DEFAULT_DELIVERY_MODE; 81 82 83 private int priority = Message.DEFAULT_PRIORITY; 84 85 private QueueConnection connection; 87 private Queue dlq; 88 private JMSProviderAdapter providerAdapter; 89 private Hashtable resentBuffer = new Hashtable (); 90 91 private MDBConfig config; 92 93 public DLQHandler(final JMSProviderAdapter providerAdapter, MDBConfig config) 94 { 95 this.providerAdapter = providerAdapter; 96 this.config = config; 97 } 98 99 101 106 protected void createService() throws Exception 107 { 108 Context ctx = providerAdapter.getInitialContext(); 109 110 try 111 { 112 String factoryName = providerAdapter.getQueueFactoryRef(); 113 QueueConnectionFactory factory = (QueueConnectionFactory ) 114 ctx.lookup(factoryName); 115 log.debug("Using factory: " + factory); 116 117 if (config.getDlqUser() == null) 118 connection = factory.createQueueConnection(); 119 else 120 connection = factory.createQueueConnection(config.getDlqUser(), config.getDlqPassword()); 121 log.debug("Created connection: " + connection); 122 123 dlq = (Queue ) ctx.lookup(config.getDlq()); 124 log.debug("Using Queue: " + dlq); 125 } 126 catch (Exception e) 127 { 128 if (e instanceof JMSException ) 129 throw e; 130 else 131 { 132 JMSException x = new JMSException ("Error creating the dlq connection: " + e.getMessage()); 133 x.setLinkedException(e); 134 throw x; 135 } 136 } 137 finally 138 { 139 ctx.close(); 140 } 141 } 142 143 protected void startService() throws Exception 144 { 145 connection.start(); 146 } 147 148 protected void stopService() throws Exception 149 { 150 connection.stop(); 151 } 152 153 protected void destroyService() throws Exception 154 { 155 if (connection != null) 157 connection.close(); 158 connection = null; 159 dlq = null; 160 providerAdapter = null; 161 } 162 163 165 173 public boolean handleRedeliveredMessage(final Message msg, final Transaction tx) 174 { 175 boolean handled = false; 176 int max = config.getDlqMaxTimesRedelivered(); 177 String id = null; 178 boolean jbossmq = true; 179 int count = 0; 180 181 try 182 { 183 184 if (msg.propertyExists(JMS_JBOSS_REDELIVERY_LIMIT)) 185 max = msg.getIntProperty(JMS_JBOSS_REDELIVERY_LIMIT); 186 187 if (msg.propertyExists(JMS_JBOSS_REDELIVERY_COUNT)) 188 count = msg.getIntProperty(JMS_JBOSS_REDELIVERY_COUNT); 189 else 190 { 191 id = msg.getJMSMessageID(); 192 if (id == null) 193 { 194 log.error("Message id is null, can't handle message"); 196 return false; 197 } 198 count = incrementResentCount(id); 199 jbossmq = false; 200 } 201 202 if (count > max) 203 { 204 id = msg.getJMSMessageID(); 205 log.warn("Message resent too many times; sending it to DLQ; message id=" + id); 206 207 sendMessage(msg); 208 deleteFromBuffer(id); 209 210 handled = true; 211 } 212 else if (jbossmq == false && tx != null) 213 { 214 DLQSynchronization synch = new DLQSynchronization(id); 217 try 218 { 219 tx.registerSynchronization(synch); 220 } 221 catch (Exception e) 222 { 223 log.warn("Error registering DlQ Synchronization with transaction " + tx, e); 224 } 225 } 226 } 227 catch (JMSException e) 228 { 229 log.error("Could not send message to Dead Letter Queue", e); 231 } 232 233 return handled; 234 } 235 236 239 protected void sendMessage(Message msg) throws JMSException 240 { 241 boolean trace = log.isTraceEnabled(); 242 243 QueueSession session = null; 244 QueueSender sender = null; 245 246 try 247 { 248 msg = makeWritable(msg, trace); 250 msg.setStringProperty(JBOSS_ORIG_MESSAGEID, msg.getJMSMessageID()); 252 Destination d = msg.getJMSDestination(); 254 if (d != null) 255 msg.setStringProperty(JBOSS_ORIG_DESTINATION, d.toString()); 256 257 session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 258 sender = session.createSender(dlq); 259 if (trace) 260 { 261 log.trace("Sending message to DLQ; destination=" + 262 dlq + ", session=" + session + ", sender=" + sender); 263 } 264 265 sender.send(msg, deliveryMode, priority, config.getDlqTimeToLive()); 266 267 if (trace) 268 { 269 log.trace("Message sent."); 270 } 271 272 } 273 finally 274 { 275 try 276 { 277 if (sender != null) sender.close(); 278 if (session != null) session.close(); 279 } 280 catch (Exception e) 281 { 282 log.warn("Failed to close sender or session; ignoring", e); 283 } 284 } 285 } 286 287 292 protected int incrementResentCount(String id) 293 { 294 BufferEntry entry = null; 295 boolean trace = log.isTraceEnabled(); 296 if (!resentBuffer.containsKey(id)) 297 { 298 if (trace) 299 log.trace("Making new entry for id " + id); 300 entry = new BufferEntry(); 301 entry.id = id; 302 entry.count = 1; 303 resentBuffer.put(id, entry); 304 } 305 else 306 { 307 entry = (BufferEntry) resentBuffer.get(id); 308 entry.count++; 309 if (trace) 310 log.trace("Incremented old entry for id " + id + " count " + entry.count); 311 } 312 return entry.count; 313 } 314 315 318 protected void deleteFromBuffer(String id) 319 { 320 resentBuffer.remove(id); 321 } 322 323 328 protected Message makeWritable(Message msg, boolean trace) throws JMSException 329 { 330 Hashtable tmp = new Hashtable (); 331 332 for (Enumeration en = msg.getPropertyNames(); en.hasMoreElements();) 334 { 335 String key = (String ) en.nextElement(); 336 tmp.put(key, msg.getObjectProperty(key)); 337 } 338 339 msg.clearProperties(); 341 342 Enumeration keys = tmp.keys(); 343 while (keys.hasMoreElements()) 344 { 345 String key = (String ) keys.nextElement(); 346 try 347 { 348 msg.setObjectProperty(key, tmp.get(key)); 349 } 350 catch (JMSException ignored) 351 { 352 if (trace) 353 log.trace("Could not copy message property " + key, ignored); 354 } 355 } 356 357 return msg; 358 } 359 360 public String toString() 361 { 362 return super.toString() + 363 "{ destinationJNDI=" + config.getDlq() + 364 ", maxResent=" + config.getDlqMaxTimesRedelivered() + 365 ", timeToLive=" + config.getDlqTimeToLive() + 366 " }"; 367 } 368 369 private class BufferEntry 370 { 371 int count; 372 String id; 373 } 374 375 378 protected class DLQSynchronization 379 implements Synchronization 380 { 381 382 String id; 383 384 public DLQSynchronization(String id) 385 { 386 this.id = id; 387 } 388 389 public void beforeCompletion() 390 { 391 } 392 393 396 public void afterCompletion(int status) 397 { 398 if (status == Status.STATUS_COMMITTED) 399 deleteFromBuffer(id); 400 } 401 } 402 } 403 | Popular Tags |