1 22 package org.jboss.resource.adapter.jms; 23 24 import java.util.HashSet ; 25 import java.util.Iterator ; 26 27 import javax.jms.ConnectionConsumer ; 28 import javax.jms.ConnectionMetaData ; 29 import javax.jms.Destination ; 30 import javax.jms.ExceptionListener ; 31 import javax.jms.IllegalStateException ; 32 import javax.jms.JMSException ; 33 import javax.jms.Queue ; 34 import javax.jms.QueueSession ; 35 import javax.jms.ServerSessionPool ; 36 import javax.jms.Session ; 37 import javax.jms.TemporaryQueue ; 38 import javax.jms.TemporaryTopic ; 39 import javax.jms.Topic ; 40 import javax.jms.TopicSession ; 41 import javax.naming.Reference ; 42 import javax.resource.Referenceable ; 43 import javax.resource.ResourceException ; 44 import javax.resource.spi.ConnectionManager ; 45 import javax.resource.spi.ManagedConnectionFactory ; 46 47 import org.jboss.logging.Logger; 48 49 57 public class JmsSessionFactoryImpl 58 implements JmsSessionFactory, Referenceable 59 { 60 private static final Logger log = Logger.getLogger(JmsSessionFactoryImpl.class); 61 62 63 private boolean closed = false; 64 65 66 private boolean trace = log.isTraceEnabled(); 67 68 private Reference reference; 69 70 private String userName; 72 private String password; 73 private String clientID; 74 private int type; 75 76 77 private boolean started = false; 78 79 80 private JmsManagedConnectionFactory mcf; 81 82 83 private ConnectionManager cm; 84 85 86 private HashSet sessions = new HashSet (); 87 88 89 private HashSet tempQueues = new HashSet (); 90 91 92 private HashSet tempTopics = new HashSet (); 93 94 public JmsSessionFactoryImpl(final ManagedConnectionFactory mcf, 95 final ConnectionManager cm, 96 final int type) 97 { 98 this.mcf = (JmsManagedConnectionFactory) mcf; 99 this.cm = cm; 100 101 if (cm == null) 102 this.cm = new JmsConnectionManager(); 104 else 105 this.cm = cm; 106 107 this.type = type; 108 109 if (trace) 110 log.trace("mcf=" + mcf + ", cm=" + cm + ", type=" + type); 111 } 112 113 public void setReference(final Reference reference) 114 { 115 this.reference = reference; 116 } 117 118 public Reference getReference() 119 { 120 return reference; 121 } 122 123 125 public void setUserName(final String name) 126 { 127 userName = name; 128 } 129 130 public void setPassword(final String password) 131 { 132 this.password = password; 133 } 134 135 137 public QueueSession createQueueSession(final boolean transacted, 138 final int acknowledgeMode) 139 throws JMSException 140 { 141 checkClosed(); 142 if (type == JmsConnectionFactory.TOPIC) 143 throw new IllegalStateException ("Can not get a queue session from a topic connection"); 144 return allocateConnection(transacted, acknowledgeMode, type); 145 } 146 147 public ConnectionConsumer createConnectionConsumer 148 (Queue queue, 149 String messageSelector, 150 ServerSessionPool sessionPool, 151 int maxMessages) 152 throws JMSException 153 { 154 throw new IllegalStateException (ISE); 155 } 156 157 159 public TopicSession createTopicSession(final boolean transacted, 160 final int acknowledgeMode) 161 throws JMSException 162 { 163 checkClosed(); 164 if (type == JmsConnectionFactory.QUEUE) 165 throw new IllegalStateException ("Can not get a topic session from a queue connection"); 166 return allocateConnection(transacted, acknowledgeMode, type); 167 } 168 169 public ConnectionConsumer createConnectionConsumer 170 (Topic topic, 171 String messageSelector, 172 ServerSessionPool sessionPool, 173 int maxMessages) 174 throws JMSException 175 { 176 throw new IllegalStateException (ISE); 177 } 178 179 public ConnectionConsumer createDurableConnectionConsumer( 180 Topic topic, 181 String subscriptionName, 182 String messageSelector, 183 ServerSessionPool sessionPool, 184 int maxMessages) 185 throws JMSException 186 { 187 throw new IllegalStateException (ISE); 188 } 189 190 192 public String getClientID() throws JMSException 193 { 194 checkClosed(); 195 return clientID; 196 } 197 198 public void setClientID(String cID) throws JMSException 199 { 200 if (mcf.isStrict()) 201 throw new IllegalStateException (ISE); 202 203 checkClosed(); 204 if (clientID != null) 205 throw new IllegalStateException ("Cannot change client id"); 206 clientID = cID; 207 } 208 209 public ConnectionMetaData getMetaData() throws JMSException 210 { 211 checkClosed(); 212 return mcf.getMetaData(); 213 } 214 215 public ExceptionListener getExceptionListener() throws JMSException 216 { 217 throw new IllegalStateException (ISE); 218 } 219 220 public void setExceptionListener(ExceptionListener listener) 221 throws JMSException 222 { 223 throw new IllegalStateException (ISE); 224 } 225 226 public void start() throws JMSException 227 { 228 checkClosed(); 229 if (trace) 230 log.trace("start() " + this); 231 synchronized (sessions) 232 { 233 if (started) 234 return; 235 started = true; 236 for (Iterator i = sessions.iterator(); i.hasNext();) 237 { 238 JmsSession session = (JmsSession) i.next(); 239 session.start(); 240 } 241 } 242 } 243 244 public void stop() throws JMSException 245 { 246 if (mcf.isStrict()) 247 throw new IllegalStateException (ISE); 248 checkClosed(); 249 if (trace) 250 log.trace("stop() " + this); 251 synchronized (sessions) 252 { 253 if (started == false) 254 return; 255 started = true; 256 for (Iterator i = sessions.iterator(); i.hasNext();) 257 { 258 JmsSession session = (JmsSession) i.next(); 259 session.stop(); 260 } 261 } 262 } 263 264 public void close() throws JMSException 265 { 266 if (closed) 267 return; 268 closed = true; 269 270 if (trace) 271 log.trace("close() " + this); 272 273 synchronized (sessions) 274 { 275 for (Iterator i = sessions.iterator(); i.hasNext();) 276 { 277 JmsSession session = (JmsSession) i.next(); 278 try 279 { 280 session.closeSession(); 281 } 282 catch (Throwable t) 283 { 284 log.trace("Error closing session", t); 285 } 286 i.remove(); 287 } 288 } 289 290 synchronized (tempQueues) 291 { 292 for (Iterator i = tempQueues.iterator(); i.hasNext();) 293 { 294 TemporaryQueue temp = (TemporaryQueue ) i.next(); 295 try 296 { 297 if (trace) 298 log.trace("Closing temporary queue " + temp + " for " + this); 299 temp.delete(); 300 } 301 catch (Throwable t) 302 { 303 log.trace("Error deleting temporary queue", t); 304 } 305 i.remove(); 306 } 307 } 308 309 synchronized (tempTopics) 310 { 311 for (Iterator i = tempTopics.iterator(); i.hasNext();) 312 { 313 TemporaryTopic temp = (TemporaryTopic ) i.next(); 314 try 315 { 316 if (trace) 317 log.trace("Closing temporary topic " + temp + " for " + this); 318 temp.delete(); 319 } 320 catch (Throwable t) 321 { 322 log.trace("Error deleting temporary queue", t); 323 } 324 i.remove(); 325 } 326 } 327 } 328 329 public void closeSession(JmsSession session) throws JMSException 330 { 331 synchronized (sessions) 332 { 333 sessions.remove(session); 334 } 335 } 336 337 public void addTemporaryQueue(TemporaryQueue temp) 338 { 339 synchronized(tempQueues) 340 { 341 tempQueues.add(temp); 342 } 343 } 344 345 public void addTemporaryTopic(TemporaryTopic temp) 346 { 347 synchronized(tempTopics) 348 { 349 tempTopics.add(temp); 350 } 351 } 352 353 355 public ConnectionConsumer createConnectionConsumer(Destination destination, ServerSessionPool pool, int maxMessages) throws JMSException 356 { 357 throw new IllegalStateException (ISE); 358 } 359 360 public ConnectionConsumer createConnectionConsumer(Destination destination, String name, ServerSessionPool pool, int maxMessages) throws JMSException 361 { 362 throw new IllegalStateException (ISE); 363 } 364 365 public Session createSession(boolean transacted, int acknowledgeMode) 366 throws JMSException 367 { 368 checkClosed(); 369 return allocateConnection(transacted, acknowledgeMode, type); 370 } 371 372 protected JmsSession allocateConnection(boolean transacted, int acknowledgeMode, int sessionType) throws JMSException 373 { 374 try 375 { 376 synchronized (sessions) 377 { 378 if (mcf.isStrict() && sessions.isEmpty() == false) 379 throw new IllegalStateException ("Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6"); 380 if (transacted) 381 acknowledgeMode = Session.SESSION_TRANSACTED; 382 JmsConnectionRequestInfo info = new JmsConnectionRequestInfo(transacted, acknowledgeMode, sessionType); 383 info.setUserName(userName); 384 info.setPassword(password); 385 info.setClientID(clientID); 386 387 if (trace) 388 log.trace("Allocating session for " + this + " with request info=" + info); 389 JmsSession session = (JmsSession) cm.allocateConnection(mcf, info); 390 if (trace) 391 log.trace("Allocated " + this + " session=" + session); 392 session.setJmsSessionFactory(this); 393 if (started) 394 session.start(); 395 sessions.add(session); 396 return session; 397 } 398 } 399 catch (ResourceException e) 400 { 401 log.error("could not create session", e); 402 403 JMSException je = new JMSException 404 ("Could not create a session: " + e); 405 je.setLinkedException(e); 406 throw je; 407 } 408 } 409 410 protected void checkClosed() throws IllegalStateException 411 { 412 if (closed) 413 throw new IllegalStateException ("The connection is closed"); 414 } 415 } 416 | Popular Tags |