1 21 package com.presumo.jms.router; 22 23 24 import com.presumo.jms.message.JmsMessage; 25 import com.presumo.jms.message.MessageStateListener; 26 import com.presumo.jms.resources.Resources; 27 import com.presumo.util.log.Logger; 28 import com.presumo.util.log.LoggerFactory; 29 30 import java.util.Hashtable ; 31 import java.util.StringTokenizer ; 32 33 import javax.jms.DeliveryMode ; 34 import javax.jms.JMSException ; 35 36 40 class PersistentAckHandler 41 implements MessageStateListener 42 { 43 44 45 private final Hashtable checkDupsMap = new Hashtable (); 46 47 48 private final Hashtable sentToMap = new Hashtable (); 49 50 51 private StringBuffer msgSafeAcks = new StringBuffer (); 52 private StringBuffer msgRoutedAcks = new StringBuffer (); 53 private StringBuffer msgDeletedAcks = new StringBuffer (); 54 55 private RemoteSession session; 56 57 61 PersistentAckHandler(RemoteSession session) 62 { 63 this.session = session; 64 } 65 66 67 71 72 80 final void handleOutgoingMsg(JmsMessage msg) 81 { 82 if (logger.isDebugEnabled()) 83 logger.entry("handleOutgoingMsg: " + msg); 84 85 if (msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) 86 { 87 sentToMap.put(msg.getJMSMessageID(), msg); 88 msg.getAckHelper().addDeletionListener(this); 89 } 90 } 91 92 93 98 final synchronized void handleIncomingMsgs(JmsMessage [] msgs) 99 { 100 for (int i=0; i < msgs.length; ++i) { 101 JmsMessage msg = msgs[i]; 102 if (msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) 103 { 104 checkDupsMap.put(msg.getJMSMessageID(), msg); 105 addMsgSafeAck(msg.getJMSMessageID()); 106 } 107 } 108 } 109 110 111 118 final synchronized String getAckString() 119 { 120 logger.entry("getAckString"); 121 122 String retval = null; 123 if (msgSafeAcks.length() > 0 || 124 msgRoutedAcks.length() > 0 || 125 msgDeletedAcks.length() > 0 ) 126 { 127 msgSafeAcks.append('#'); 128 msgSafeAcks.append(msgRoutedAcks.toString()); 129 msgSafeAcks.append('#'); 130 msgSafeAcks.append(msgDeletedAcks); 131 132 retval = msgSafeAcks.toString(); 133 134 msgSafeAcks.setLength(0); 135 msgRoutedAcks.setLength(0); 136 msgDeletedAcks.setLength(0); 137 } 138 139 logger.exit("getAckString: "+ retval); 140 return retval; 141 } 142 143 148 final boolean acksAvailable() 149 { 150 return (msgSafeAcks.length() > 0 || 151 msgRoutedAcks.length() > 0 || 152 msgDeletedAcks.length() > 0 ); 153 } 154 155 156 165 final synchronized boolean isDuplicate(JmsMessage msg) 166 { 167 boolean dup = (checkDupsMap.get(msg.getJMSMessageID()) != null); 168 169 if (dup) { 170 addMsgSafeAck(msg.getJMSMessageID()); 171 } 172 173 return dup; 174 } 175 176 177 182 final void setOriginator(JmsMessage msg) 183 { 184 if (msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) 185 msg.getAckHelper().setOriginator(this); 186 } 187 188 189 192 final synchronized void handleAcks(String acks) 193 { 194 logger.entry("handleAcks: " +acks); 195 196 if (acks == null) 197 return; 198 199 StringBuffer currentValue = new StringBuffer (); 200 int currentState = 0; 201 202 char [] data = acks.toCharArray(); 203 204 for (int i=0; i < data.length; ++i) { 205 char c = data[i]; 206 207 switch (c) 208 { 209 case('#'): 210 ++currentState; 211 break; 212 case(':'): 213 if (currentValue.length() != 0) { 214 JmsMessage msg = null; 215 logger.debug("--------> parsed: " + currentValue); 216 switch (currentState) { 217 case(0): 218 msg = (JmsMessage) sentToMap.get(currentValue.toString()); 219 msg.getAckHelper().safeAck(this); 220 break; 221 case(1): 222 msg = (JmsMessage) sentToMap.get(currentValue.toString()); 223 msg.getAckHelper().routedAck(this); 224 break; 225 default: 226 msg = (JmsMessage) checkDupsMap.remove(currentValue.toString()); 227 msg.getAckHelper().deleteAck(this); 228 break; 229 } 230 currentValue.setLength(0); 231 } 232 break; 233 default: 234 currentValue.append(c); 235 } 236 237 } 238 239 logger.exit("handleAcks"); 240 } 241 242 243 252 public void messageDeleted(JmsMessage msg) 253 { 254 logger.entry("messageDeleted: " + msg); 255 256 sentToMap.remove(msg.getJMSMessageID()); 257 258 msgDeletedAcks.append(msg.getJMSMessageID()); 260 msgDeletedAcks.append(':'); 261 262 session.acksAvailable(); 264 265 logger.exit("messageDeleted"); 266 } 267 268 269 276 public void messageRouted(JmsMessage msg) 277 { 278 logger.entry("messageRouted: "+msg); 279 280 msgRoutedAcks.append(msg.getJMSMessageID()); 282 msgRoutedAcks.append(':'); 283 284 session.acksAvailable(); 286 287 logger.exit("messageRouted"); 288 } 289 290 291 292 296 299 private void addMsgSafeAck(String msgID) 300 { 301 logger.entry("addMsgSafeAck: " +msgID); 302 303 msgSafeAcks.append(msgID); 304 msgSafeAcks.append(':'); 305 306 session.acksAvailable(); 308 309 logger.exit("addMsgSafeAck: " + msgSafeAcks); 310 } 311 312 314 private static Logger logger = 315 LoggerFactory.getLogger(PersistentAckHandler.class, Resources.getBundle()); 316 317 } 319 | Popular Tags |