1 45 package org.exolab.jms.server; 46 47 import java.sql.Connection ; 48 import java.util.Collections ; 49 import java.util.Iterator ; 50 import java.util.LinkedList ; 51 import java.util.List ; 52 53 import javax.jms.Session ; 54 import javax.jms.JMSException ; 55 56 import org.apache.commons.logging.Log; 57 import org.apache.commons.logging.LogFactory; 58 59 import org.exolab.jms.messagemgr.MessageHandle; 60 import org.exolab.jms.messagemgr.DestinationManager; 61 import org.exolab.jms.messagemgr.DestinationCache; 62 import org.exolab.jms.persistence.DatabaseService; 63 import org.exolab.jms.persistence.PersistenceException; 64 import org.exolab.jms.persistence.SQLHelper; 65 66 67 80 class SentMessageCache { 81 82 85 private JmsServerSession _session; 86 87 90 private List _unackedMessages = Collections.synchronizedList( 91 new LinkedList ()); 92 93 96 private static final Log _log = LogFactory.getLog(SentMessageCache.class); 97 98 99 104 public SentMessageCache(JmsServerSession session) { 105 _session = session; 106 } 107 108 122 public void process(MessageHandle handle) throws JMSException { 123 if (handle.isPersistent()) { 124 Connection connection = null; 125 try { 126 connection = DatabaseService.getConnection(); 127 128 handle.setDelivered(true); 129 if (_session.isTransacted() || 130 _session.getAckMode() == Session.CLIENT_ACKNOWLEDGE) { 131 _unackedMessages.add(handle); 132 handle.update(connection); 133 } else { 134 handle.destroy(connection); 136 } 137 connection.commit(); 138 } catch (PersistenceException exception) { 139 SQLHelper.rollback(connection); 140 _log.error("Error in SentMessageCache.process", exception); 141 } catch (Exception exception) { 142 _log.error("Error in SentMessageCache.process", exception); 143 } finally { 144 SQLHelper.close(connection); 145 } 146 } else { 147 handle.setDelivered(true); 148 if (_session.isTransacted() || 149 _session.getAckMode() == Session.CLIENT_ACKNOWLEDGE) { 150 _unackedMessages.add(handle); 151 } else { 152 handle.destroy(); 153 } 154 } 155 } 156 157 164 public void acknowledgeMessage(String messageId, long consumerId) { 165 boolean exists = false; 168 Iterator iterator = _unackedMessages.iterator(); 169 while (iterator.hasNext()) { 170 MessageHandle handle = (MessageHandle) iterator.next(); 171 if (handle.getConsumerId() == consumerId 172 && handle.getMessageId().equals(messageId)) { 173 exists = true; 174 break; 175 } 176 } 177 178 if (exists) { 179 boolean intransaction = false; 180 Connection connection = null; 181 182 try { 183 while (!_unackedMessages.isEmpty()) { 187 MessageHandle handle = 188 (MessageHandle) _unackedMessages.remove(0); 189 if (handle.isPersistent()) { 190 if (!intransaction) { 191 connection = DatabaseService.getConnection(); 192 193 intransaction = true; 195 } 196 handle.destroy(connection); 197 } else { 198 handle.destroy(); 199 } 200 201 if (handle.getConsumerId() == consumerId 204 && handle.getMessageId().equals(messageId)) { 205 if (intransaction) { 206 connection.commit(); 207 intransaction = false; 208 } 209 break; 210 } 211 } 212 } catch (PersistenceException exception) { 213 SQLHelper.rollback(connection); 214 _log.error("Error in SentMessageCache.acknowledgeMessage", 215 exception); 216 } catch (Exception exception) { 217 _log.error("Error in SentMessageCache.acknowledgeMessage", 218 exception); 219 } finally { 220 SQLHelper.close(connection); 221 } 222 } 223 } 224 225 228 public void acknowledgeAllMessages() { 229 boolean intransaction = false; 230 Connection connection = null; 231 232 try { 233 while (!_unackedMessages.isEmpty()) { 237 MessageHandle handle = 238 (MessageHandle) _unackedMessages.remove(0); 239 if (handle.isPersistent()) { 240 if (!intransaction) { 241 connection = DatabaseService.getConnection(); 242 intransaction = true; 243 } 244 handle.destroy(connection); 245 } else { 246 handle.destroy(); 247 } 248 } 249 250 if (intransaction) { 251 connection.commit(); 252 intransaction = false; 253 } 254 } catch (PersistenceException exception) { 255 SQLHelper.rollback(connection); 256 _log.error("Error in SentMessageCache.acknowledgeMessage", 257 exception); 258 } catch (Exception exception) { 259 _log.error("Error in SentMessageCache.acknowledgeMessage", 260 exception); 261 } finally { 262 SQLHelper.close(connection); 263 } 264 } 265 266 279 public void clear() throws JMSException { 280 Object [] unacked = _unackedMessages.toArray(); 282 _unackedMessages.clear(); 283 284 int count = unacked.length; 285 for (int index = 0; index < count; index++) { 286 MessageHandle handle = (MessageHandle) unacked[index]; 287 DestinationCache cache = 288 DestinationManager.instance().getDestinationCache(handle.getDestination()); 289 cache.returnMessageHandle(handle); 290 } 291 } 292 293 300 public boolean handleInCache(MessageHandle handle) { 301 return _unackedMessages.contains(handle); 302 } 303 304 309 public void removeHandle(MessageHandle handle) { 310 _unackedMessages.remove(handle); 311 } 312 313 } 314 | Popular Tags |