1 22 package org.jboss.mq; 23 24 import java.io.Serializable ; 25 26 import javax.jms.ConnectionConsumer ; 27 import javax.jms.Destination ; 28 import javax.jms.IllegalStateException ; 29 import javax.jms.InvalidDestinationException ; 30 import javax.jms.JMSException ; 31 import javax.jms.Queue ; 32 import javax.jms.QueueConnection ; 33 import javax.jms.QueueSession ; 34 import javax.jms.ServerSessionPool ; 35 import javax.jms.Session ; 36 import javax.jms.TemporaryQueue ; 37 import javax.jms.TemporaryTopic ; 38 import javax.jms.Topic ; 39 import javax.jms.TopicConnection ; 40 import javax.jms.TopicSession ; 41 42 import org.jboss.util.UnreachableStatementException; 43 44 53 public class SpyConnection extends Connection implements Serializable , TopicConnection , QueueConnection 54 { 55 private static final long serialVersionUID = -6227193901482445607L; 56 57 58 public static final int UNIFIED = 0; 59 60 61 public static final int QUEUE = 1; 62 63 64 public static final int TOPIC = 2; 65 66 67 private int type = UNIFIED; 68 69 77 public SpyConnection(String userId, String password, GenericConnectionFactory gcf) throws JMSException 78 { 79 super(userId, password, gcf); 80 } 81 82 88 public SpyConnection(GenericConnectionFactory gcf) throws JMSException 89 { 90 super(gcf); 91 } 92 93 102 public SpyConnection(int type, String userId, String password, GenericConnectionFactory gcf) throws JMSException 103 { 104 super(userId, password, gcf); 105 this.type = type; 106 } 107 108 114 public SpyConnection(int type, GenericConnectionFactory gcf) throws JMSException 115 { 116 super(gcf); 117 this.type = type; 118 } 119 120 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, 121 ServerSessionPool sessionPool, int maxMessages) throws JMSException 122 { 123 checkClosed(); 124 if (destination == null) 125 throw new InvalidDestinationException ("Null destination"); 126 checkTemporary(destination); 127 128 return new SpyConnectionConsumer(this, destination, messageSelector, sessionPool, maxMessages); 129 } 130 131 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException 132 { 133 checkClosed(); 134 checkClientID(); 135 136 if (transacted) 137 acknowledgeMode = 0; 138 Session session = new SpySession(this, transacted, acknowledgeMode, false); 139 140 synchronized (createdSessions) 142 { 143 createdSessions.add(session); 144 } 145 146 return session; 147 } 148 149 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException 150 { 151 checkClosed(); 152 checkClientID(); 153 154 if (transacted) 155 acknowledgeMode = 0; 156 TopicSession session = new SpyTopicSession(this, transacted, acknowledgeMode); 157 158 synchronized (createdSessions) 160 { 161 createdSessions.add(session); 162 } 163 164 return session; 165 } 166 167 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, 168 ServerSessionPool sessionPool, int maxMessages) throws JMSException 169 { 170 checkClosed(); 171 if (type == QUEUE) 172 throw new IllegalStateException ("Cannot create a topic consumer on a QueueConnection"); 173 if (topic == null) 174 throw new InvalidDestinationException ("Null topic"); 175 checkClientID(); 176 checkTemporary(topic); 177 178 return new SpyConnectionConsumer(this, topic, messageSelector, sessionPool, maxMessages); 179 } 180 181 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, 182 String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException 183 { 184 checkClosed(); 185 if (type == QUEUE) 186 throw new IllegalStateException ("Cannot create a topic consumer on a QueueConnection"); 187 if (topic == null) 188 throw new InvalidDestinationException ("Null topic"); 189 if (topic instanceof TemporaryTopic ) 190 throw new InvalidDestinationException ("Attempt to create a durable subscription for a temporary topic"); 191 192 if (subscriptionName == null || subscriptionName.trim().length() == 0) 193 throw new JMSException ("Null or empty subscription"); 194 195 SpyTopic t = new SpyTopic((SpyTopic) topic, getClientID(), subscriptionName, messageSelector); 196 return new SpyConnectionConsumer(this, t, messageSelector, sessionPool, maxMessages); 197 } 198 199 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, 200 ServerSessionPool sessionPool, int maxMessages) throws JMSException 201 { 202 checkClosed(); 203 if (type == TOPIC) 204 throw new IllegalStateException ("Cannot create a queue consumer on a TopicConnection"); 205 if (queue == null) 206 throw new InvalidDestinationException ("Null queue"); 207 checkTemporary(queue); 208 209 return new SpyConnectionConsumer(this, queue, messageSelector, sessionPool, maxMessages); 210 } 211 212 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException 213 { 214 checkClosed(); 215 checkClientID(); 216 if (transacted) 217 acknowledgeMode = 0; 218 QueueSession session = new SpyQueueSession(this, transacted, acknowledgeMode); 219 220 synchronized (createdSessions) 222 { 223 createdSessions.add(session); 224 } 225 226 return session; 227 } 228 229 TemporaryTopic getTemporaryTopic() throws JMSException 230 { 231 checkClosed(); 232 checkClientID(); 233 try 234 { 235 SpyTemporaryTopic temp = (SpyTemporaryTopic) serverIL.getTemporaryTopic(connectionToken); 236 temp.setConnection(this); 237 synchronized (temps) 238 { 239 temps.add(temp); 240 } 241 return temp; 242 } 243 catch (Throwable t) 244 { 245 SpyJMSException.rethrowAsJMSException("Cannot create a Temporary Topic", t); 246 throw new UnreachableStatementException(); 247 } 248 } 249 250 Topic createTopic(String name) throws JMSException 251 { 252 checkClosed(); 253 checkClientID(); 254 try 255 { 256 return serverIL.createTopic(connectionToken, name); 257 } 258 catch (Throwable t) 259 { 260 SpyJMSException.rethrowAsJMSException("Cannot get the Topic from the provider", t); 261 throw new UnreachableStatementException(); 262 } 263 } 264 265 TemporaryQueue getTemporaryQueue() throws JMSException 266 { 267 checkClosed(); 268 checkClientID(); 269 try 270 { 271 SpyTemporaryQueue temp = (SpyTemporaryQueue) serverIL.getTemporaryQueue(connectionToken); 272 temp.setConnection(this); 273 synchronized (temps) 274 { 275 temps.add(temp); 276 } 277 return temp; 278 } 279 catch (Throwable t) 280 { 281 SpyJMSException.rethrowAsJMSException("Cannot create a Temporary Queue", t); 282 throw new UnreachableStatementException(); 283 } 284 } 285 286 Queue createQueue(String name) throws JMSException 287 { 288 checkClosed(); 289 checkClientID(); 290 try 291 { 292 293 return serverIL.createQueue(connectionToken, name); 294 } 295 catch (Throwable t) 296 { 297 SpyJMSException.rethrowAsJMSException("Cannot get the Queue from the provider", t); 298 throw new UnreachableStatementException(); 299 } 300 } 301 } | Popular Tags |