1 45 package org.exolab.jms.client; 46 47 import javax.jms.Connection ; 48 import javax.jms.ConnectionConsumer ; 49 import javax.jms.Destination ; 50 import javax.jms.JMSException ; 51 import javax.jms.Message ; 52 import javax.jms.MessageConsumer ; 53 import javax.jms.MessageListener ; 54 import javax.jms.ServerSession ; 55 import javax.jms.ServerSessionPool ; 56 import javax.jms.Session ; 57 import javax.jms.Topic ; 58 59 import org.apache.commons.logging.Log; 60 import org.apache.commons.logging.LogFactory; 61 62 63 70 class JmsConnectionConsumer 71 implements ConnectionConsumer , MessageListener { 72 73 76 private Session _session; 77 78 81 private MessageConsumer _consumer; 82 83 86 private ServerSessionPool _pool; 87 88 91 private static final Log _log = 92 LogFactory.getLog(JmsConnectionConsumer.class); 93 94 95 106 public JmsConnectionConsumer(Connection connection, Destination destination, 107 ServerSessionPool pool, String selector, 108 int maxMessages) 109 throws JMSException { 110 this(connection, destination, null, pool, selector, maxMessages); 111 } 112 113 126 public JmsConnectionConsumer(Connection connection, Destination destination, 127 String subscriptionName, 128 ServerSessionPool pool, String selector, 129 int maxMessages) 130 throws JMSException { 131 if (connection == null) { 132 throw new IllegalArgumentException ("Argument 'connection' is null"); 133 } 134 if (destination == null) { 135 throw new IllegalArgumentException ( 136 "Argument 'destination' is null"); 137 } 138 if (pool == null) { 139 throw new IllegalArgumentException ("Argument 'pool' is null"); 140 } 141 if (maxMessages <= 0) { 142 throw new IllegalArgumentException ( 143 "Argument 'maxMessages' must be > 0"); 144 } 145 146 _pool = pool; 147 148 _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 149 if (subscriptionName == null) { 150 _consumer = _session.createConsumer(destination, selector, false); 151 } else { 152 _consumer = _session.createDurableSubscriber((Topic ) destination, 153 subscriptionName, 154 selector, false); 155 } 156 157 _consumer.setMessageListener(this); 158 } 159 160 165 public ServerSessionPool getServerSessionPool() { 166 return _pool; 167 } 168 169 174 public void close() throws JMSException { 175 try { 176 _consumer.close(); 177 _session.close(); 178 } finally { 179 _pool = null; 180 _consumer = null; 181 _session = null; 182 } 183 } 184 185 192 public void onMessage(Message message) { 193 try { 194 ServerSession serverSession = _pool.getServerSession(); 197 JmsSession session = (JmsSession) serverSession.getSession(); 198 message.acknowledge(); 199 session.addMessage(message); 200 serverSession.start(); 201 } catch (Exception exception) { 202 _log.error(exception, exception); 203 } 204 } 205 206 } 207 | Popular Tags |