1 7 package org.jboss.jms.serverless; 8 9 import org.jboss.logging.Logger; 10 import java.io.Serializable ; 11 import javax.jms.Session ; 12 import javax.jms.BytesMessage ; 13 import javax.jms.MapMessage ; 14 import javax.jms.Message ; 15 import javax.jms.ObjectMessage ; 16 import javax.jms.StreamMessage ; 17 import javax.jms.JMSException ; 18 import javax.jms.TextMessage ; 19 import javax.jms.MessageListener ; 20 import javax.jms.MessageProducer ; 21 import javax.jms.MessageConsumer ; 22 import javax.jms.Destination ; 23 import javax.jms.Queue ; 24 import javax.jms.Topic ; 25 import javax.jms.TopicSubscriber ; 26 import javax.jms.QueueBrowser ; 27 import javax.jms.TemporaryQueue ; 28 import javax.jms.TemporaryTopic ; 29 import java.util.List ; 30 import java.util.ArrayList ; 31 import java.util.Iterator ; 32 33 39 class SessionImpl implements Session { 40 41 private static final Logger log = Logger.getLogger(SessionImpl.class); 42 43 private SessionManager sessionManager; 44 private String id; 45 private List subscribers; 46 private List receivers; 47 private boolean transacted; 48 private int acknowledgeMode; 49 private int receiverCounter = 0; 50 51 55 SessionImpl(SessionManager sessionManager, 56 String id, 57 boolean transacted, 58 int acknowledgeMode) { 59 60 this.sessionManager = sessionManager; 61 this.id = id; 62 subscribers = new ArrayList (); 63 receivers = new ArrayList (); 64 this.transacted = transacted; 65 this.acknowledgeMode = acknowledgeMode; 66 67 if (transacted) { 68 throw new NotImplementedException("Transacted sessions not supported"); 69 } 70 } 71 72 public String getID() { 73 return id; 74 } 75 76 void send(Message m) throws JMSException { 77 sessionManager.getConnection().send(m); 78 } 79 80 83 void deliver(Message m) { 85 86 92 Destination destination = null; 93 try { 94 destination = m.getJMSDestination(); 95 } 96 catch(JMSException e) { 97 log.error("Unhandled failure", e); 99 return; 100 } 101 102 104 for(Iterator i = subscribers.iterator(); i.hasNext(); ) { 105 106 TopicSubscriberImpl sub = (TopicSubscriberImpl)i.next(); 107 if (destination.equals(sub.getDestination())) { 108 MessageListener l = null; 109 try { 110 l = sub.getMessageListener(); 111 } 112 catch(JMSException e) { 113 log.error("Unhandled failure", e); 115 continue; 116 } 117 if (l == null) { 118 continue; 119 } 120 l.onMessage(m); 121 } 122 } 123 } 124 125 128 void deliver(Message m, String receiverID) { 130 131 137 QueueReceiverImpl receiver = null; 138 for(Iterator i = receivers.iterator(); i.hasNext(); ) { 139 140 QueueReceiverImpl crtRec = (QueueReceiverImpl)i.next(); 141 if (crtRec.getID().equals(receiverID)) { 142 receiver = crtRec; 143 break; 144 } 145 } 146 147 if (receiver == null) { 148 log.error("No such receiver: "+receiverID+". Delivery failed!"); 149 return; 150 } 151 MessageListener l = null; 152 try { 153 l = receiver.getMessageListener(); 154 } 155 catch(JMSException e) { 156 log.error("Unhandled failure", e); 158 return; 159 } 160 if (l == null) { 161 log.warn("No message listener for receiver "+receiverID+". Delivery failed!"); 162 } 163 else { 164 l.onMessage(m); 165 } 166 } 167 168 169 173 public BytesMessage createBytesMessage() throws JMSException { 174 throw new NotImplementedException(); 175 } 176 177 public MapMessage createMapMessage() throws JMSException { 178 throw new NotImplementedException(); 179 } 180 181 public Message createMessage() throws JMSException { 182 throw new NotImplementedException(); 183 } 184 185 public ObjectMessage createObjectMessage() throws JMSException { 186 throw new NotImplementedException(); 187 } 188 189 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 190 throw new NotImplementedException(); 191 } 192 193 public StreamMessage createStreamMessage() throws JMSException { 194 throw new NotImplementedException(); 195 } 196 197 public TextMessage createTextMessage() throws JMSException { 198 return new TextMessageImpl(); 199 } 200 201 public TextMessage createTextMessage(String text) throws JMSException { 202 throw new NotImplementedException(); 203 } 204 205 public boolean getTransacted() throws JMSException { 206 return transacted; 207 } 208 209 public int getAcknowledgeMode() throws JMSException { 210 return acknowledgeMode; 211 } 212 213 public void commit() throws JMSException { 214 throw new NotImplementedException(); 215 } 216 217 public void rollback() throws JMSException { 218 throw new NotImplementedException(); 219 } 220 221 public void close() throws JMSException { 222 throw new NotImplementedException(); 223 } 224 225 public void recover() throws JMSException { 226 throw new NotImplementedException(); 227 } 228 229 public MessageListener getMessageListener() throws JMSException { 230 throw new NotImplementedException(); 231 } 232 233 public void setMessageListener(MessageListener listener) throws JMSException { 234 throw new NotImplementedException(); 235 } 236 237 public void run() { 238 throw new NotImplementedException(); 239 } 240 241 public MessageProducer createProducer(Destination destination) throws JMSException { 242 243 if (destination instanceof Topic ) { 244 return new TopicPublisherImpl(this, (Topic )destination); 245 } 246 else if (destination instanceof Queue ) { 247 return new QueueSenderImpl(this, (Queue )destination); 248 } 249 throw new JMSException ("Destination not a Topic or Queue"); 250 } 251 252 public MessageConsumer createConsumer(Destination destination) throws JMSException { 253 254 if (destination instanceof Topic ) { 255 TopicSubscriberImpl ts = new TopicSubscriberImpl(this, (Topic )destination); 256 subscribers.add(ts); 257 return ts; 258 } 259 else if (destination instanceof Queue ) { 260 QueueReceiverImpl qr = 261 new QueueReceiverImpl(this, generateReceiverID(), (Queue )destination); 262 sessionManager.advertiseQueueReceiver(getID(), qr, true); 263 receivers.add(qr); 264 return qr; 265 } 266 throw new JMSException ("Destination not a Topic or Queue"); 267 } 268 269 270 public MessageConsumer createConsumer(Destination destination, String messageSelector) 271 throws JMSException { 272 throw new NotImplementedException(); 273 } 274 275 public MessageConsumer createConsumer(Destination destination, 276 String messageSelector, 277 boolean NoLocal) 278 throws JMSException { 279 throw new NotImplementedException(); 280 } 281 282 public Queue createQueue(String queueName) throws JMSException { 283 throw new NotImplementedException(); 284 } 285 286 public Topic createTopic(String topicName) throws JMSException { 287 throw new NotImplementedException(); 288 } 289 290 public TopicSubscriber createDurableSubscriber(Topic topic, 291 String name) throws JMSException { 292 throw new NotImplementedException(); 293 } 294 295 public TopicSubscriber createDurableSubscriber(Topic topic, 296 String name, 297 String messageSelector, 298 boolean noLocal) throws JMSException { 299 throw new NotImplementedException(); 300 } 301 302 public QueueBrowser createBrowser(Queue queue) throws JMSException { 303 throw new NotImplementedException(); 304 } 305 306 public QueueBrowser createBrowser(Queue queue, 307 String messageSelector) throws JMSException { 308 throw new NotImplementedException(); 309 } 310 311 public TemporaryQueue createTemporaryQueue() throws JMSException { 312 throw new NotImplementedException(); 313 } 314 315 public TemporaryTopic createTemporaryTopic() throws JMSException { 316 throw new NotImplementedException(); 317 } 318 319 public void unsubscribe(String name) throws JMSException { 320 throw new NotImplementedException(); 321 } 322 323 327 330 void removeConsumer(MessageConsumer consumer) throws JMSException { 331 332 if (consumer instanceof QueueReceiverImpl) { 333 if (!receivers.contains(consumer)) { 334 throw new JMSException ("No such QueueReceiver: "+consumer); 335 } 336 sessionManager.advertiseQueueReceiver(getID(), (QueueReceiverImpl)consumer, false); 337 receivers.remove(consumer); 338 } 339 else if (consumer instanceof TopicSubscriberImpl) { 340 throw new NotImplementedException(); 341 } 342 else { 343 throw new JMSException ("MessageConsumer not a TopicSubscriber or a QueueReceiver"); 344 } 345 } 346 347 348 352 private synchronized String generateReceiverID() { 353 return Integer.toString(receiverCounter++); 354 } 355 356 360 362 366 } 367 | Popular Tags |