1 45 package org.exolab.jms.messagemgr; 46 47 import java.sql.Connection ; 48 import java.util.Iterator ; 49 import java.util.List ; 50 import javax.jms.InvalidSelectorException ; 51 import javax.jms.JMSException ; 52 53 import org.apache.commons.logging.Log; 54 import org.apache.commons.logging.LogFactory; 55 56 import org.exolab.jms.client.JmsTopic; 57 import org.exolab.jms.persistence.DatabaseService; 58 import org.exolab.jms.persistence.SQLHelper; 59 import org.exolab.jms.scheduler.Scheduler; 60 import org.exolab.jms.server.JmsServerSession; 61 import org.exolab.jms.server.JmsServerSession; 62 63 64 73 public class DurableConsumerEndpoint 74 extends AbstractTopicConsumerEndpoint { 75 76 79 private final String _name; 80 81 84 private static final Log _log 85 = LogFactory.getLog(DurableConsumerEndpoint.class); 86 87 108 public DurableConsumerEndpoint(long consumerId, JmsServerSession session, 109 JmsTopic topic, String name, String selector, 110 boolean noLocal, Scheduler scheduler) 111 throws InvalidSelectorException , JMSException { 112 super(consumerId, session, topic, selector, noLocal, scheduler); 113 _name = name; 114 115 Connection connection = null; 117 try { 118 connection = DatabaseService.getConnection(); 119 120 DatabaseService.getAdapter().removeExpiredMessageHandles( 122 connection, _name); 123 124 TopicDestinationCache cache = (TopicDestinationCache) 125 DestinationManager.instance().getDestinationCache(topic); 126 List handles = cache.getDurableMessageHandles(_name, connection); 130 131 connection.commit(); 132 133 Iterator iterator = handles.iterator(); 136 while (iterator.hasNext()) { 137 MessageHandle handle = (MessageHandle) iterator.next(); 138 TopicConsumerMessageHandle consumer = 139 new TopicConsumerMessageHandle(handle, this); 140 addMessage(consumer); 141 } 142 } catch (Exception exception) { 143 SQLHelper.rollback(connection); 144 String msg = "Failed to create durable consumer, name=" + name 145 + ", for topic=" + topic.getName(); 146 _log.error(msg, exception); 147 throw new JMSException (msg + ": " + exception.getMessage()); 148 } finally { 149 SQLHelper.close(connection); 150 } 151 152 init(); 153 } 154 155 163 public boolean isPersistent() { 164 return true; 165 } 166 167 177 public String getPersistentId() { 178 return _name; 179 } 180 181 182 } 183 | Popular Tags |