1 16 17 package org.springframework.jms.listener.serversession; 18 19 import javax.jms.Connection ; 20 import javax.jms.ConnectionConsumer ; 21 import javax.jms.Destination ; 22 import javax.jms.JMSException ; 23 import javax.jms.Message ; 24 import javax.jms.MessageListener ; 25 import javax.jms.ServerSession ; 26 import javax.jms.ServerSessionPool ; 27 import javax.jms.Session ; 28 import javax.jms.Topic ; 29 30 import org.springframework.jms.listener.AbstractMessageListenerContainer; 31 import org.springframework.jms.support.JmsUtils; 32 33 62 public class ServerSessionMessageListenerContainer extends AbstractMessageListenerContainer 63 implements ListenerSessionManager { 64 65 private ServerSessionFactory serverSessionFactory = new SimpleServerSessionFactory(); 66 67 private int maxMessagesPerTask = 1; 68 69 private ConnectionConsumer consumer; 70 71 72 80 public void setServerSessionFactory(ServerSessionFactory serverSessionFactory) { 81 this.serverSessionFactory = 82 (serverSessionFactory != null ? serverSessionFactory : new SimpleServerSessionFactory()); 83 } 84 85 88 protected ServerSessionFactory getServerSessionFactory() { 89 return this.serverSessionFactory; 90 } 91 92 99 public void setMaxMessagesPerTask(int maxMessagesPerTask) { 100 this.maxMessagesPerTask = maxMessagesPerTask; 101 } 102 103 106 protected int getMaxMessagesPerTask() { 107 return this.maxMessagesPerTask; 108 } 109 110 111 115 118 protected final boolean sharedConnectionEnabled() { 119 return true; 120 } 121 122 128 protected void doInitialize() throws JMSException { 129 Connection con = getSharedConnection(); 130 Destination destination = getDestination(); 131 if (destination == null) { 132 Session session = createSession(con); 133 try { 134 destination = resolveDestinationName(session, getDestinationName()); 135 } 136 finally { 137 JmsUtils.closeSession(session); 138 } 139 } 140 ServerSessionPool pool = createServerSessionPool(); 141 this.consumer = createConsumer(con, destination, pool); 142 } 143 144 152 protected ServerSessionPool createServerSessionPool() throws JMSException { 153 return new ServerSessionPool () { 154 public ServerSession getServerSession() throws JMSException { 155 logger.debug("JMS ConnectionConsumer requests ServerSession"); 156 return getServerSessionFactory().getServerSession(ServerSessionMessageListenerContainer.this); 157 } 158 }; 159 } 160 161 165 protected final ConnectionConsumer getConsumer() { 166 return this.consumer; 167 } 168 169 178 protected void doShutdown() throws JMSException { 179 logger.debug("Closing ServerSessionFactory"); 180 getServerSessionFactory().close(this); 181 logger.debug("Closing JMS ConnectionConsumer"); 182 this.consumer.close(); 183 } 184 185 186 190 200 public Session createListenerSession() throws JMSException { 201 final Session session = createSession(getSharedConnection()); 202 203 session.setMessageListener(new MessageListener () { 204 public void onMessage(Message message) { 205 executeListener(session, message); 206 } 207 }); 208 209 return session; 210 } 211 212 220 public void executeListenerSession(Session session) { 221 session.run(); 222 } 223 224 225 229 236 protected ConnectionConsumer createConsumer(Connection con, Destination destination, ServerSessionPool pool) 237 throws JMSException { 238 239 if (isSubscriptionDurable() && destination instanceof Topic ) { 240 return con.createDurableConnectionConsumer( 241 (Topic ) destination, getDurableSubscriptionName(), getMessageSelector(), pool, getMaxMessagesPerTask()); 242 } 243 else { 244 return con.createConnectionConsumer(destination, getMessageSelector(), pool, getMaxMessagesPerTask()); 245 } 246 } 247 248 } 249 | Popular Tags |