1 25 26 package org.objectweb.jonas_jms; 27 28 import java.io.Serializable ; 29 30 import javax.jms.BytesMessage ; 31 import javax.jms.Destination ; 32 import javax.jms.JMSException ; 33 import javax.jms.MapMessage ; 34 import javax.jms.Message ; 35 import javax.jms.MessageConsumer ; 36 import javax.jms.MessageListener ; 37 import javax.jms.MessageProducer ; 38 import javax.jms.ObjectMessage ; 39 import javax.jms.Queue ; 40 import javax.jms.QueueBrowser ; 41 import javax.jms.Session ; 42 import javax.jms.StreamMessage ; 43 import javax.jms.TemporaryQueue ; 44 import javax.jms.TemporaryTopic ; 45 import javax.jms.TextMessage ; 46 import javax.jms.Topic ; 47 import javax.jms.TopicSubscriber ; 48 import javax.jms.XAConnection ; 49 import javax.jms.XASession ; 50 import javax.transaction.RollbackException ; 51 import javax.transaction.Synchronization ; 52 import javax.transaction.SystemException ; 53 import javax.transaction.Transaction ; 54 import javax.transaction.xa.XAResource ; 55 56 import org.objectweb.transaction.jta.TransactionManager; 57 import org.objectweb.util.monolog.api.BasicLevel; 58 59 60 67 68 public class JSession implements Session , Synchronization { 69 70 protected XAResource xares = null; protected boolean txover = true; 72 protected Transaction currtx = null; 73 protected boolean closed = false; 74 protected JConnection jconn; 75 protected static TransactionManager tm = null; 76 77 protected XAConnection xac; 78 protected Session sess = null; 79 protected XASession xasess = null; 80 81 85 protected JSession(JConnection jconn) { 86 this.jconn = jconn; 87 if (tm == null) { 88 tm = JmsManagerImpl.getTransactionManager(); 89 } 90 } 91 92 95 public JSession(JConnection jconn, XAConnection xac) { 96 this(jconn); 97 this.xac = xac; 98 TraceJms.logger.log(BasicLevel.DEBUG,""); 99 } 100 101 105 109 protected XAResource getXAResource() { 110 return xares; 111 } 112 113 117 protected Session getMOMSession() throws JMSException { 118 Transaction tx = null; 119 try { 120 tx = tm.getTransaction(); 121 } catch (SystemException e) { 122 TraceJms.logger.log(BasicLevel.ERROR,"cannot get Transaction"); 123 } 124 if (tx == null) { 125 if (sess == null) { 126 sess = xac.createSession(false, Session.AUTO_ACKNOWLEDGE); 127 jconn.sessionOpen(this); 128 } 129 return sess; 130 } else { 131 if (xasess == null) { 132 xasess = xac.createXASession(); 133 if (currtx != null) { 134 TraceJms.logger.log(BasicLevel.ERROR,"mixed transactions"); 135 } 136 currtx = tx; 137 xares = xasess.getXAResource(); 138 try { 139 tx.enlistResource(xares); 140 txover = false; 141 } catch (SystemException e) { 142 TraceJms.logger.log(BasicLevel.ERROR,"cannot enlist session:"+e); 143 throw new JMSException (e.toString()); 144 } catch (RollbackException e) { 145 TraceJms.logger.log(BasicLevel.ERROR,"transaction rolled back"); 146 throw new JMSException (e.toString()); 147 } 148 } 149 return xasess.getSession(); 150 } 151 } 152 153 protected void MOMSessionClose() { 154 try { 155 if (xasess != null) { 156 xasess.close(); 157 xasess = null; 158 } 159 if (sess != null) { 160 sess.close(); 161 sess = null; 162 jconn.sessionClose(this); 163 } 164 } catch (JMSException e) { 165 TraceJms.logger.log(BasicLevel.ERROR,"exception:"+e); 166 } 167 } 168 169 172 protected void PhysicalClose() { 173 MOMSessionClose(); 174 } 175 176 180 183 public void close() throws JMSException { 184 TraceJms.logger.log(BasicLevel.DEBUG, ""); 185 closed = true; 186 if (txover) { 187 PhysicalClose(); 188 } else { 189 if (currtx == null) { 191 TraceJms.logger.log(BasicLevel.ERROR, "should be in a tx"); 192 } else { 193 try { 194 currtx.delistResource(xares, XAResource.TMSUCCESS); 195 } catch (SystemException e) { 196 TraceJms.logger.log(BasicLevel.ERROR, "cannot delist session:"+e); 197 throw new JMSException (e.toString()); 198 } 199 } 200 } 201 } 202 203 206 public void commit() throws JMSException { 207 TraceJms.logger.log(BasicLevel.DEBUG, ""); 208 throw new JMSException ("JSession: commit Operation Not Allowed"); 209 } 210 211 214 public QueueBrowser createBrowser(Queue queue) 215 throws JMSException { 216 TraceJms.logger.log(BasicLevel.DEBUG, ""); 217 return getMOMSession().createBrowser(queue); 218 } 219 220 223 public QueueBrowser createBrowser(Queue queue, java.lang.String messageSelector) 224 throws JMSException { 225 TraceJms.logger.log(BasicLevel.DEBUG, ""); 226 return getMOMSession().createBrowser(queue, messageSelector); 227 } 228 229 232 public BytesMessage createBytesMessage() throws JMSException { 233 TraceJms.logger.log(BasicLevel.DEBUG, ""); 234 return getMOMSession().createBytesMessage(); 235 } 236 237 238 241 public MessageConsumer createConsumer(Destination destination) 242 throws JMSException { 243 TraceJms.logger.log(BasicLevel.DEBUG, ""); 244 return getMOMSession().createConsumer(destination); 245 } 246 247 248 251 public MessageConsumer createConsumer(Destination destination, 252 String messageSelector) 253 throws JMSException { 254 TraceJms.logger.log(BasicLevel.DEBUG, ""); 255 return getMOMSession().createConsumer(destination, messageSelector); 256 } 257 258 261 public MessageConsumer createConsumer(Destination destination, 262 String messageSelector, 263 boolean NoLocal) 264 throws JMSException { 265 TraceJms.logger.log(BasicLevel.DEBUG, ""); 266 return getMOMSession().createConsumer(destination, messageSelector,NoLocal ); 267 } 268 269 272 public TopicSubscriber createDurableSubscriber(Topic topic, 273 String name) 274 throws JMSException { 275 TraceJms.logger.log(BasicLevel.DEBUG, ""); 276 return getMOMSession().createDurableSubscriber(topic, name); 277 } 278 279 282 public TopicSubscriber createDurableSubscriber(Topic topic, 283 String name, 284 String messageSelector, 285 boolean noLocal) 286 throws JMSException { 287 TraceJms.logger.log(BasicLevel.DEBUG, ""); 288 return getMOMSession().createDurableSubscriber(topic, name, messageSelector, noLocal); 289 } 290 291 294 public MapMessage createMapMessage() throws JMSException { 295 TraceJms.logger.log(BasicLevel.DEBUG, ""); 296 return getMOMSession().createMapMessage(); 297 } 298 299 302 public Message createMessage() throws JMSException { 303 TraceJms.logger.log(BasicLevel.DEBUG, ""); 304 return getMOMSession().createMessage(); 305 } 306 307 310 public ObjectMessage createObjectMessage() throws JMSException { 311 TraceJms.logger.log(BasicLevel.DEBUG, ""); 312 return getMOMSession().createObjectMessage(); 313 } 314 315 318 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 319 TraceJms.logger.log(BasicLevel.DEBUG, ""); 320 return getMOMSession().createObjectMessage(object); 321 } 322 323 326 public MessageProducer createProducer(Destination destination) 327 throws JMSException { 328 TraceJms.logger.log(BasicLevel.DEBUG, ""); 329 return getMOMSession().createProducer(destination); 330 } 331 332 335 336 public Queue createQueue(String queueName) 337 throws JMSException { 338 TraceJms.logger.log(BasicLevel.DEBUG, ""); 339 return getMOMSession().createQueue(queueName); 340 } 341 342 345 public StreamMessage createStreamMessage() throws JMSException { 346 TraceJms.logger.log(BasicLevel.DEBUG, ""); 347 return getMOMSession().createStreamMessage(); 348 } 349 350 353 public TemporaryQueue createTemporaryQueue() 354 throws JMSException { 355 TraceJms.logger.log(BasicLevel.DEBUG, ""); 356 return getMOMSession().createTemporaryQueue(); 357 } 358 359 362 public TemporaryTopic createTemporaryTopic() 363 throws JMSException { 364 TraceJms.logger.log(BasicLevel.DEBUG, ""); 365 return getMOMSession().createTemporaryTopic(); 366 } 367 368 371 public TextMessage createTextMessage() throws JMSException { 372 TraceJms.logger.log(BasicLevel.DEBUG, ""); 373 return getMOMSession().createTextMessage(); 374 } 375 376 379 public TextMessage createTextMessage(String text) throws JMSException { 380 TraceJms.logger.log(BasicLevel.DEBUG, ""); 381 return getMOMSession().createTextMessage(text); 382 } 383 384 387 public Topic createTopic(String topicName) 388 throws JMSException { 389 TraceJms.logger.log(BasicLevel.DEBUG, ""); 390 return getMOMSession().createTopic(topicName); 391 } 392 393 396 public MessageListener getMessageListener() throws JMSException { 397 TraceJms.logger.log(BasicLevel.DEBUG, ""); 398 return getMOMSession().getMessageListener(); 399 } 400 401 404 public boolean getTransacted() throws JMSException { 405 TraceJms.logger.log(BasicLevel.DEBUG, ""); 406 return getMOMSession().getTransacted(); 407 } 408 409 412 public int getAcknowledgeMode() throws JMSException { 413 TraceJms.logger.log(BasicLevel.DEBUG, ""); 414 return getMOMSession().getAcknowledgeMode(); 415 } 416 417 418 421 public void recover() throws JMSException { 422 TraceJms.logger.log(BasicLevel.DEBUG, ""); 423 throw new JMSException ("JSession: recover Operation Not Allowed"); 424 } 425 426 429 public void rollback() throws JMSException { 430 TraceJms.logger.log(BasicLevel.DEBUG, ""); 431 throw new JMSException ("JSession: rollback Operation Not Allowed"); 432 } 433 434 437 public void run() { 438 TraceJms.logger.log(BasicLevel.DEBUG, ""); 439 try { 440 getMOMSession().run(); 441 } catch (JMSException e) { 442 TraceJms.logger.log(BasicLevel.ERROR, "exception: "+e); 443 } 444 } 445 446 449 public void setMessageListener(MessageListener listener) throws JMSException { 450 TraceJms.logger.log(BasicLevel.DEBUG, ""); 451 getMOMSession().setMessageListener(listener); 452 } 453 454 455 458 public void unsubscribe(String name) 459 throws JMSException { 460 TraceJms.logger.log(BasicLevel.DEBUG, ""); 461 getMOMSession().unsubscribe(name); 462 } 463 464 465 469 473 public void beforeCompletion() { 474 if(TraceJms.isDebug()){ 475 TraceJms.logger.log(BasicLevel.DEBUG, ""); 476 } 477 } 478 479 483 484 public void afterCompletion(int status) { 485 if(TraceJms.isDebug()){ 486 TraceJms.logger.log(BasicLevel.DEBUG, ""); 487 } 488 txover = true; 489 if (closed) { 490 PhysicalClose(); 491 } 492 } 493 494 495 } 496 497 | Popular Tags |