1 22 package org.jboss.resource.adapter.jms.inflow; 23 24 import java.util.ArrayList ; 25 26 import javax.jms.Connection ; 27 import javax.jms.ConnectionConsumer ; 28 import javax.jms.JMSException ; 29 import javax.jms.Queue ; 30 import javax.jms.ServerSession ; 31 import javax.jms.ServerSessionPool ; 32 import javax.jms.Topic ; 33 34 import org.jboss.logging.Logger; 35 36 42 public class JmsServerSessionPool implements ServerSessionPool 43 { 44 45 private static final Logger log = Logger.getLogger(JmsServerSessionPool.class); 46 47 48 JmsActivation activation; 49 50 51 ConnectionConsumer consumer; 52 53 54 ArrayList serverSessions = new ArrayList (); 55 56 57 boolean stopped = false; 58 59 60 int sessionCount = 0; 61 62 63 68 public JmsServerSessionPool(JmsActivation activation) 69 { 70 this.activation = activation; 71 } 72 73 76 public JmsActivation getActivation() 77 { 78 return activation; 79 } 80 81 86 public void start() throws Exception 87 { 88 setupSessions(); 89 setupConsumer(); 90 } 91 92 95 public void stop() 96 { 97 teardownConsumer(); 98 teardownSessions(); 99 } 100 101 public ServerSession getServerSession() throws JMSException 102 { 103 boolean trace = log.isTraceEnabled(); 104 if (trace) 105 log.trace("getServerSession"); 106 107 ServerSession result = null; 108 109 try 110 { 111 synchronized (serverSessions) 112 { 113 while (true) 114 { 115 int sessionsSize = serverSessions.size(); 116 117 if (stopped) 118 throw new Exception ("Cannot get a server session after the pool is stopped"); 119 120 else if (sessionsSize > 0) 121 { 122 result = (ServerSession ) serverSessions.remove(sessionsSize-1); 123 break; 124 } 125 126 else 127 { 128 try 129 { 130 serverSessions.wait(); 131 } 132 catch (InterruptedException ignored) 133 { 134 } 135 } 136 } 137 } 138 } 139 catch (Throwable t) 140 { 141 log.error("Unable to get a server session", t); 142 throw new JMSException ("Unable to get a server session " + t); 143 } 144 145 if (trace) 146 log.trace("Returning server session " + result); 147 148 return result; 149 } 150 151 156 protected void returnServerSession(JmsServerSession session) 157 { 158 synchronized (serverSessions) 159 { 160 if (stopped) 161 { 162 session.teardown(); 163 --sessionCount; 164 } 165 else 166 serverSessions.add(session); 167 serverSessions.notifyAll(); 168 } 169 } 170 171 176 protected void setupSessions() throws Exception 177 { 178 JmsActivationSpec spec = activation.getActivationSpec(); 179 180 synchronized (serverSessions) 182 { 183 for (int i = 0; i < spec.getMaxSessionInt(); ++i) 184 { 185 JmsServerSession session = new JmsServerSession(this); 186 serverSessions.add(session); 187 } 188 sessionCount = serverSessions.size(); 189 } 190 191 ArrayList clonedSessions = (ArrayList ) serverSessions.clone(); 193 for (int i = 0; i < clonedSessions.size(); ++ i) 194 { 195 JmsServerSession session = (JmsServerSession) serverSessions.get(i); 196 session.setup(); 197 } 198 } 199 200 203 protected void teardownSessions() 204 { 205 synchronized (serverSessions) 206 { 207 stopped = true; 209 serverSessions.notifyAll(); 210 211 for (int i = 0; i < serverSessions.size(); ++i) 213 { 214 JmsServerSession session = (JmsServerSession) serverSessions.get(i); 215 session.teardown(); 216 } 217 218 sessionCount -= serverSessions.size(); 219 serverSessions.clear(); 220 221 while (sessionCount > 0) 223 { 224 try 225 { 226 serverSessions.wait(); 227 } 228 catch (InterruptedException ignore) 229 { 230 } 231 } 232 } 233 } 234 235 240 protected void setupConsumer() throws Exception 241 { 242 Connection connection = activation.getConnection(); 243 JmsActivationSpec spec = activation.getActivationSpec(); 244 String selector = spec.getMessageSelector(); 245 int maxMessages = spec.getMaxMessagesInt(); 246 if (spec.isTopic()) 247 { 248 Topic topic = (Topic ) activation.getDestination(); 249 String subscriptionName = spec.getSubscriptionName(); 250 if (spec.isDurable()) 251 consumer = connection.createDurableConnectionConsumer(topic, subscriptionName, selector, this, maxMessages); 252 else 253 consumer = connection.createConnectionConsumer(topic, selector, this, maxMessages); 254 } 255 else 256 { 257 Queue queue = (Queue ) activation.getDestination(); 258 consumer = connection.createConnectionConsumer(queue, selector, this, maxMessages); 259 } 260 log.debug("Created consumer " + consumer); 261 } 262 263 266 protected void teardownConsumer() 267 { 268 try 269 { 270 if (consumer != null) 271 { 272 log.debug("Closing the " + consumer); 273 consumer.close(); 274 } 275 } 276 catch (Throwable t) 277 { 278 log.debug("Error closing the consumer " + consumer, t); 279 } 280 } 281 282 } | Popular Tags |