1 18 package org.apache.activemq.ra; 19 20 import java.util.ArrayList ; 21 import java.util.Iterator ; 22 23 import javax.jms.Connection ; 24 import javax.jms.ConnectionConsumer ; 25 import javax.jms.ConnectionMetaData ; 26 import javax.jms.Destination ; 27 import javax.jms.ExceptionListener ; 28 import javax.jms.IllegalStateException ; 29 import javax.jms.JMSException ; 30 import javax.jms.Queue ; 31 import javax.jms.QueueConnection ; 32 import javax.jms.QueueSession ; 33 import javax.jms.ServerSessionPool ; 34 import javax.jms.Session ; 35 import javax.jms.Topic ; 36 import javax.jms.TopicConnection ; 37 import javax.jms.TopicSession ; 38 39 import org.apache.activemq.ActiveMQQueueSession; 40 import org.apache.activemq.ActiveMQSession; 41 import org.apache.activemq.ActiveMQTopicSession; 42 43 44 50 public class ManagedConnectionProxy implements Connection , QueueConnection , TopicConnection , ExceptionListener { 51 52 private ActiveMQManagedConnection managedConnection; 53 private ArrayList sessions = new ArrayList (); 54 private ExceptionListener exceptionListener; 55 56 public ManagedConnectionProxy(ActiveMQManagedConnection managedConnection) { 57 this.managedConnection = managedConnection; 58 } 59 60 66 public void close() throws JMSException { 67 if( managedConnection!=null ) { 68 managedConnection.proxyClosedEvent(this); 69 } 70 } 71 72 75 public void cleanup() { 76 exceptionListener=null; 77 managedConnection = null; 78 for (Iterator iter = sessions.iterator(); iter.hasNext();) { 79 ManagedSessionProxy p = (ManagedSessionProxy) iter.next(); 80 try { 81 p.cleanup(); 82 } catch (JMSException ignore) { 83 } 84 iter.remove(); 85 } 86 } 87 88 91 private Connection getConnection() throws JMSException { 92 if (managedConnection == null) { 93 throw new IllegalStateException ("The Connection is closed"); 94 } 95 return managedConnection.getPhysicalConnection(); 96 } 97 98 104 public Session createSession(boolean transacted, int acknowledgeMode) 105 throws JMSException { 106 return createSessionProxy(transacted, acknowledgeMode); 107 } 108 109 115 private ManagedSessionProxy createSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException { 116 ActiveMQSession session = (ActiveMQSession) getConnection().createSession(transacted, acknowledgeMode); 117 ManagedTransactionContext txContext = new ManagedTransactionContext(managedConnection.getTransactionContext()); 118 session.setTransactionContext(txContext); 119 ManagedSessionProxy p = new ManagedSessionProxy(session); 120 p.setUseSharedTxContext(managedConnection.isInManagedTx()); 121 sessions.add(p); 122 return p; 123 } 124 125 public void setUseSharedTxContext(boolean enable) throws JMSException { 126 for (Iterator iter = sessions.iterator(); iter.hasNext();) { 127 ManagedSessionProxy p = (ManagedSessionProxy) iter.next(); 128 p.setUseSharedTxContext(enable); 129 } 130 } 131 132 138 public QueueSession createQueueSession(boolean transacted, 139 int acknowledgeMode) throws JMSException { 140 return new ActiveMQQueueSession(createSessionProxy(transacted, acknowledgeMode)); 141 } 142 143 149 public TopicSession createTopicSession(boolean transacted, 150 int acknowledgeMode) throws JMSException { 151 return new ActiveMQTopicSession(createSessionProxy(transacted, acknowledgeMode)); 152 } 153 154 158 public String getClientID() throws JMSException { 159 return getConnection().getClientID(); 160 } 161 162 166 public ExceptionListener getExceptionListener() throws JMSException { 167 return getConnection().getExceptionListener(); 168 } 169 170 174 public ConnectionMetaData getMetaData() throws JMSException { 175 return getConnection().getMetaData(); 176 } 177 178 182 public void setClientID(String clientID) throws JMSException { 183 getConnection().setClientID(clientID); 184 } 185 186 190 public void setExceptionListener(ExceptionListener listener) 191 throws JMSException { 192 getConnection(); 193 exceptionListener = listener; 194 } 195 196 199 public void start() throws JMSException { 200 getConnection().start(); 201 } 202 203 206 public void stop() throws JMSException { 207 getConnection().stop(); 208 } 209 210 211 219 public ConnectionConsumer createConnectionConsumer(Queue queue, 220 String messageSelector, ServerSessionPool sessionPool, 221 int maxMessages) throws JMSException { 222 throw new JMSException ("Not Supported."); 223 } 224 225 233 public ConnectionConsumer createConnectionConsumer(Topic topic, 234 String messageSelector, ServerSessionPool sessionPool, 235 int maxMessages) throws JMSException { 236 throw new JMSException ("Not Supported."); 237 } 238 239 247 public ConnectionConsumer createConnectionConsumer(Destination destination, 248 String messageSelector, ServerSessionPool sessionPool, 249 int maxMessages) throws JMSException { 250 throw new JMSException ("Not Supported."); 251 } 252 253 262 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 263 String subscriptionName, String messageSelector, 264 ServerSessionPool sessionPool, int maxMessages) throws JMSException { 265 throw new JMSException ("Not Supported."); 266 } 267 268 271 public ActiveMQManagedConnection getManagedConnection() { 272 return managedConnection; 273 } 274 275 public void onException(JMSException e) { 276 if(exceptionListener!=null && managedConnection!=null) { 277 try { 278 exceptionListener.onException(e); 279 } catch (Throwable ignore) { 280 } 282 } 283 } 284 285 } 286 | Popular Tags |