1 46 47 package org.mr.ra; 48 49 import java.util.ArrayList ; 50 import java.util.Iterator ; 51 52 import javax.jms.Connection ; 53 import javax.jms.ConnectionConsumer ; 54 import javax.jms.ConnectionMetaData ; 55 import javax.jms.Destination ; 56 import javax.jms.ExceptionListener ; 57 import javax.jms.IllegalStateException ; 58 import javax.jms.JMSException ; 59 import javax.jms.Queue ; 60 import javax.jms.QueueConnection ; 61 import javax.jms.QueueSession ; 62 import javax.jms.ServerSessionPool ; 63 import javax.jms.Session ; 64 import javax.jms.Topic ; 65 import javax.jms.TopicConnection ; 66 import javax.jms.TopicSession ; 67 68 import org.apache.commons.logging.Log; 69 import org.apache.commons.logging.LogFactory; 70 import org.mr.api.jms.MantaQueueSession; 71 import org.mr.api.jms.MantaSession; 72 import org.mr.api.jms.MantaTopicSession; 73 74 75 79 public class JMSConnectionProxy implements Connection , QueueConnection , TopicConnection , ExceptionListener { 80 81 private static final Log log = LogFactory.getLog(JMSConnectionProxy.class); 82 83 private ManagedConnectionImpl managedConnection; 84 private ArrayList sessions = new ArrayList (); 85 private ExceptionListener exceptionListener; 86 87 public JMSConnectionProxy(ManagedConnectionImpl managedConnection) { 88 this.managedConnection = managedConnection; 89 } 90 91 97 public void close() throws JMSException { 98 if( managedConnection!=null ) { 99 managedConnection.proxyClosedEvent(this); 100 } 101 } 102 103 106 public void cleanup() { 107 exceptionListener=null; 108 managedConnection = null; 109 for (Iterator iter = sessions.iterator(); iter.hasNext();) { 110 JMSSessionProxy p = (JMSSessionProxy) iter.next(); 111 try { 112 p.cleanup(); 113 } catch (JMSException ignore) { 114 } 115 iter.remove(); 116 } 117 } 118 119 122 private Connection getConnection() throws JMSException { 123 if (managedConnection == null) { 124 throw new IllegalStateException ("The Connection is closed"); 125 } 126 return managedConnection.getPhysicalConnection(); 127 } 128 129 135 public Session createSession(boolean transacted, int acknowledgeMode) 136 throws JMSException { 137 return createSessionProxy(transacted, acknowledgeMode); 138 } 139 140 146 private JMSSessionProxy createSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException { 147 MantaSession session = (MantaSession) getConnection().createSession(transacted, acknowledgeMode); 148 RATransactionContext txContext = new RATransactionContext(managedConnection.getTransactionContext()); 150 session.setTransactionContext(txContext); 151 JMSSessionProxy p = new JMSSessionProxy(session); 152 p.setUseSharedTxContext(managedConnection.isInManagedTx()); 153 sessions.add(p); 154 return p; 155 } 156 157 private JMSSessionProxy createQueueSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException { 158 MantaQueueSession session = (MantaQueueSession)((QueueConnection )getConnection()).createQueueSession(transacted, acknowledgeMode); 159 RATransactionContext txContext = new RATransactionContext(managedConnection.getTransactionContext()); 161 session.setTransactionContext(txContext); 162 JMSSessionProxy p = new JMSSessionProxy(session); 163 p.setUseSharedTxContext(managedConnection.isInManagedTx()); 164 sessions.add(p); 165 return p; 166 } 167 168 private JMSSessionProxy createTopicSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException { 169 MantaTopicSession session = (MantaTopicSession)((TopicConnection )getConnection()).createTopicSession(transacted, acknowledgeMode); 170 RATransactionContext txContext = new RATransactionContext(managedConnection.getTransactionContext()); 172 session.setTransactionContext(txContext); 173 JMSSessionProxy p = new JMSSessionProxy(session); 174 p.setUseSharedTxContext(managedConnection.isInManagedTx()); 175 sessions.add(p); 176 return p; 177 } 178 179 180 181 public void setUseSharedTxContext(boolean enable) throws JMSException { 182 for (Iterator iter = sessions.iterator(); iter.hasNext();) { 183 JMSSessionProxy p = (JMSSessionProxy) iter.next(); 184 p.setUseSharedTxContext(enable); 185 } 186 } 187 188 194 public QueueSession createQueueSession(boolean transacted, 195 int acknowledgeMode) throws JMSException { 196 return createQueueSessionProxy(transacted, acknowledgeMode); 198 } 199 200 206 public TopicSession createTopicSession(boolean transacted, 207 int acknowledgeMode) throws JMSException { 208 return createTopicSessionProxy(transacted, acknowledgeMode); 210 } 211 212 216 public String getClientID() throws JMSException { 217 return getConnection().getClientID(); 218 } 219 220 224 public ExceptionListener getExceptionListener() throws JMSException { 225 return getConnection().getExceptionListener(); 226 } 227 228 232 public ConnectionMetaData getMetaData() throws JMSException { 233 return getConnection().getMetaData(); 234 } 235 236 240 public void setClientID(String clientID) throws JMSException { 241 getConnection().setClientID(clientID); 242 } 243 244 248 public void setExceptionListener(ExceptionListener listener) 249 throws JMSException { 250 getConnection(); 251 exceptionListener = listener; 252 } 253 254 257 public void start() throws JMSException { 258 getConnection().start(); 259 } 260 261 264 public void stop() throws JMSException { 265 getConnection().stop(); 266 } 267 268 269 277 public ConnectionConsumer createConnectionConsumer(Queue queue, 278 String messageSelector, 279 ServerSessionPool sessionPool, 280 int maxMessages) 281 throws JMSException { 282 throw new JMSException ("Not Supported."); 283 } 284 285 293 public ConnectionConsumer createConnectionConsumer(Topic topic, 294 String messageSelector, 295 ServerSessionPool sessionPool, 296 int maxMessages) 297 throws JMSException { 298 throw new JMSException ("Not Supported."); 299 } 300 301 309 public ConnectionConsumer createConnectionConsumer(Destination destination, 310 String messageSelector, 311 ServerSessionPool sessionPool, 312 int maxMessages) 313 throws JMSException { 314 throw new JMSException ("Not Supported."); 315 } 316 317 326 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 327 String subscriptionName, 328 String messageSelector, 329 ServerSessionPool sessionPool, 330 int maxMessages) 331 throws JMSException { 332 throw new JMSException ("Not Supported."); 333 } 334 335 338 public ManagedConnectionImpl getManagedConnection() { 339 return managedConnection; 340 } 341 342 public void onException(JMSException e) { 343 if(exceptionListener!=null && managedConnection!=null) { 344 try { 345 exceptionListener.onException(e); 346 } catch (Throwable ignore) { 347 } 349 } 350 } 351 352 } 353 | Popular Tags |