1 25 26 package org.objectweb.jonas_jms; 27 28 import java.util.LinkedList ; 29 30 import javax.jms.Connection ; 31 import javax.jms.ConnectionConsumer ; 32 import javax.jms.ConnectionMetaData ; 33 import javax.jms.Destination ; 34 import javax.jms.ExceptionListener ; 35 import javax.jms.JMSException ; 36 import javax.jms.ServerSessionPool ; 37 import javax.jms.Session ; 38 import javax.jms.Topic ; 39 import javax.jms.XAConnection ; 40 import javax.jms.XAConnectionFactory ; 41 42 import org.objectweb.transaction.jta.TransactionManager; 43 import org.objectweb.util.monolog.api.BasicLevel; 44 45 54 55 public class JConnection implements Connection { 56 57 protected XAConnection xac; 59 60 protected boolean closed; 61 protected String user; 62 protected boolean globaltx = false; 63 protected static TransactionManager tm; 64 protected JConnectionFactory jcf; 65 protected LinkedList sessionlist = new LinkedList (); 66 67 protected static final String INTERNAL_USER_NAME = "anInternalNameUsedOnlyByJOnAS"; 70 71 74 protected JConnection(JConnectionFactory jcf, String user) throws JMSException { 75 this.user = user; 76 this.jcf = jcf; 77 closed = false; 78 if (tm == null) { 79 tm = JmsManagerImpl.getTransactionManager(); 80 } 81 try { 84 globaltx = (tm.getTransaction() != null); 85 } catch (Exception e) { 86 } 87 } 88 89 90 96 public JConnection(JConnectionFactory jcf, XAConnectionFactory xacf, String user, String passwd) 97 throws JMSException { 98 this(jcf, user); 99 xac = xacf.createXAConnection(user, passwd); 101 } 102 103 106 public JConnection(JConnectionFactory jcf, XAConnectionFactory xacf) throws JMSException { 107 this(jcf, INTERNAL_USER_NAME); 108 xac = xacf.createXAConnection(); 110 } 111 112 116 119 protected synchronized boolean sessionOpen(Session s) { 120 if (!closed) { 121 sessionlist.add(s); 122 return true; 123 } else { 124 return false; 125 } 126 } 127 128 131 protected synchronized void sessionClose(Session s) { 132 sessionlist.remove(s); 133 if (sessionlist.size() == 0 && closed) { 134 notify(); 135 } 136 } 137 138 141 public String getUser() { 142 return user; 143 } 144 145 149 158 public void close() throws JMSException { 159 TraceJms.logger.log(BasicLevel.DEBUG, ""); 160 if (globaltx) { 161 jcf.freeJConnection(this); 166 } else { 167 synchronized(this) { 170 while (sessionlist.size() > 0) { 171 try { 172 wait(); 173 } catch (InterruptedException e) { 174 TraceJms.logger.log(BasicLevel.ERROR, "interrupted"); 175 } 176 } 177 } 178 closed = true; 179 xac.close(); 180 } 181 } 182 public void finalClose() throws JMSException { 183 if (!closed) { 184 xac.close(); 185 } 186 } 187 188 200 201 public ConnectionConsumer createConnectionConsumer(Destination destination, 202 java.lang.String messageSelector, 203 ServerSessionPool sessionPool, 204 int maxMessages) 205 throws JMSException { 206 TraceJms.logger.log(BasicLevel.DEBUG, ""); 207 return xac.createConnectionConsumer(destination, 208 messageSelector, 209 sessionPool, 210 maxMessages); 211 } 212 213 226 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 227 java.lang.String subscriptionName, 228 java.lang.String messageSelector, 229 ServerSessionPool sessionPool, 230 int maxMessages) 231 throws JMSException { 232 TraceJms.logger.log(BasicLevel.DEBUG, ""); 233 return xac.createDurableConnectionConsumer(topic, 234 subscriptionName, 235 messageSelector, 236 sessionPool, 237 maxMessages); 238 } 239 240 241 250 251 public Session createSession(boolean transacted,int acknowledgeMode) throws JMSException { 252 TraceJms.logger.log(BasicLevel.DEBUG, ""); 253 return new JSession(this, xac); 254 } 255 256 257 265 public String getClientID() throws JMSException { 266 TraceJms.logger.log(BasicLevel.DEBUG, ""); 267 return xac.getClientID(); 268 } 269 270 281 public void setClientID(String clientID) throws JMSException { 282 TraceJms.logger.log(BasicLevel.DEBUG, ""); 283 xac.setClientID(clientID); 284 } 285 286 292 public ConnectionMetaData getMetaData() throws JMSException { 293 TraceJms.logger.log(BasicLevel.DEBUG, ""); 294 return xac.getMetaData(); 295 } 296 297 303 public ExceptionListener getExceptionListener() throws JMSException { 304 TraceJms.logger.log(BasicLevel.DEBUG, ""); 305 return xac.getExceptionListener(); 306 } 307 308 314 public void setExceptionListener(ExceptionListener listener) throws JMSException { 315 TraceJms.logger.log(BasicLevel.DEBUG, ""); 316 xac.setExceptionListener(listener); 317 } 318 319 324 public void start() throws JMSException { 325 TraceJms.logger.log(BasicLevel.DEBUG, ""); 326 xac.start(); 327 } 328 329 337 public void stop() throws JMSException { 338 TraceJms.logger.log(BasicLevel.DEBUG, ""); 339 xac.stop(); 340 } 341 } 342 | Popular Tags |