1 6 7 package org.jfox.jms; 8 9 import java.io.Serializable ; 10 import java.util.HashMap ; 11 import java.util.Iterator ; 12 import java.util.Map ; 13 import javax.jms.BytesMessage ; 14 import javax.jms.Destination ; 15 import javax.jms.IllegalStateException ; 16 import javax.jms.InvalidDestinationException ; 17 import javax.jms.JMSException ; 18 import javax.jms.MapMessage ; 19 import javax.jms.Message ; 20 import javax.jms.MessageConsumer ; 21 import javax.jms.MessageListener ; 22 import javax.jms.MessageProducer ; 23 import javax.jms.ObjectMessage ; 24 import javax.jms.Queue ; 25 import javax.jms.QueueBrowser ; 26 import javax.jms.QueueReceiver ; 27 import javax.jms.QueueSender ; 28 import javax.jms.QueueSession ; 29 import javax.jms.Session ; 30 import javax.jms.StreamMessage ; 31 import javax.jms.TemporaryQueue ; 32 import javax.jms.TemporaryTopic ; 33 import javax.jms.TextMessage ; 34 import javax.jms.Topic ; 35 import javax.jms.TopicPublisher ; 36 import javax.jms.TopicSession ; 37 import javax.jms.TopicSubscriber ; 38 import javax.jms.XAQueueSession ; 39 import javax.jms.XASession ; 40 import javax.jms.XATopicSession ; 41 import javax.transaction.xa.XAResource ; 42 43 import org.jfox.ioc.util.UUID; 44 import org.jfox.jms.message.BytesMessageImpl; 45 import org.jfox.jms.message.JMSMessage; 46 import org.jfox.jms.message.MapMessageImpl; 47 import org.jfox.jms.message.ObjectMessageImpl; 48 import org.jfox.jms.message.StreamMessageImpl; 49 import org.jfox.jms.message.TextMessageImpl; 50 51 54 55 public class JMSSession implements Session , 56 QueueSession , 57 TopicSession , 58 XASession , 59 XAQueueSession , 60 XATopicSession { 61 private JMSConnection conn; 62 63 private boolean transacted; 64 65 private int acknowledgeMode; 66 67 private boolean isXA; 68 69 private boolean closed = false; 70 71 private MessageListener listener; 72 73 private Map <String , JMSConsumer> consumers = new HashMap <String , JMSConsumer>(); 74 75 private String sessionId = UUID.randomUUID().toString(); 76 77 private Map <String , JMSMessage> asyncMessages = new HashMap <String , JMSMessage>(); 78 79 public JMSSession(JMSConnection conn, boolean transacted, int acknowledgeMode, boolean isXA) { 80 this.conn = conn; 81 this.transacted = transacted; 82 this.acknowledgeMode = acknowledgeMode; 83 this.isXA = isXA; 84 start(); 85 } 86 87 public BytesMessage createBytesMessage() throws JMSException { 88 checkClosed(); 89 return new BytesMessageImpl(); 90 } 91 92 public MapMessage createMapMessage() throws JMSException { 93 checkClosed(); 94 return new MapMessageImpl(); 95 } 96 97 public Message createMessage() throws JMSException { 98 checkClosed(); 99 return new JMSMessage(); 100 } 101 102 public ObjectMessage createObjectMessage() throws JMSException { 103 checkClosed(); 104 return new ObjectMessageImpl(); 105 } 106 107 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 108 checkClosed(); 109 ObjectMessageImpl om = new ObjectMessageImpl(); 110 om.setObject(object); 111 return om; 112 } 113 114 public StreamMessage createStreamMessage() throws JMSException { 115 checkClosed(); 116 return new StreamMessageImpl(); 117 } 118 119 public TextMessage createTextMessage() throws JMSException { 120 checkClosed(); 121 return new TextMessageImpl(); 122 } 123 124 public TextMessage createTextMessage(String text) throws JMSException { 125 checkClosed(); 126 TextMessageImpl tm = new TextMessageImpl(); 127 tm.setText(text); 128 return tm; 129 } 130 131 public boolean getTransacted() throws JMSException { 132 return transacted; 133 } 134 135 public int getAcknowledgeMode() throws JMSException { 136 return acknowledgeMode; 137 } 138 139 public synchronized void commit() throws JMSException { 140 checkClosed(); 141 throw new JMSException ("not support now!"); 142 } 143 144 public synchronized void rollback() throws JMSException { 145 checkClosed(); 146 throw new JMSException ("not support now!"); 147 } 148 149 public synchronized void close() throws JMSException { 150 if (closed) return; 151 this.closed = true; 152 conn.closeSession(sessionId); 153 synchronized (this) { 154 notifyAll(); 155 } 156 } 157 158 public synchronized void recover() throws JMSException { 159 throw new JMSException ("not support now!"); 160 } 161 162 public MessageListener getMessageListener() throws JMSException { 163 return listener; 164 } 165 166 public void setMessageListener(MessageListener listener) throws JMSException { 167 checkClosed(); 168 this.listener = listener; 169 start(); 170 } 171 172 public MessageProducer createProducer(Destination destination) throws JMSException { 173 if (destination == null) { 174 throw new InvalidDestinationException ("destination is null"); 175 } 176 JMSProducer producer = new JMSProducer(this, destination); 177 return producer; 178 } 179 180 public MessageConsumer createConsumer(Destination destination) throws JMSException { 181 return createConsumer(destination, null); 182 } 183 184 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { 185 return createConsumer(destination, messageSelector, false); 186 } 187 188 public synchronized MessageConsumer createConsumer(Destination destination, 189 String messageSelector, 190 boolean NoLocal) throws JMSException { 191 if (destination == null) { 192 throw new InvalidDestinationException ("destination is null"); 193 } 194 JMSConsumer consumer = new JMSConsumer(this, destination, messageSelector, NoLocal); 195 getJMSConnection().getContainer().registerConsumer(getJMSConnection().getClientID(), getSessionId(), consumer.getConsumerId(), consumer.getDestination()); 197 consumers.put(consumer.getConsumerId(), consumer); 198 return consumer; 199 } 200 201 public Queue createQueue(String queueName) throws JMSException { 202 throw new JMSException ("not support now!"); 203 } 204 205 public Topic createTopic(String topicName) throws JMSException { 206 throw new JMSException ("not support now!"); 207 } 208 209 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { 210 throw new JMSException ("not support now!"); 211 } 212 213 public TopicSubscriber createDurableSubscriber(Topic topic, 214 String name, 215 String messageSelector, 216 boolean noLocal) throws JMSException { 217 throw new JMSException ("not support now!"); 218 } 219 220 public QueueBrowser createBrowser(Queue queue) throws JMSException { 221 throw new JMSException ("not support now!"); 222 } 223 224 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { 225 throw new JMSException ("not support now!"); 226 } 227 228 public TemporaryQueue createTemporaryQueue() throws JMSException { 229 throw new JMSException ("not support now!"); 230 } 231 232 public TemporaryTopic createTemporaryTopic() throws JMSException { 233 throw new JMSException ("not support now!"); 234 } 235 236 public void unsubscribe(String name) throws JMSException { 237 throw new JMSException ("not support now!"); 238 } 239 240 public QueueReceiver createReceiver(Queue queue) throws JMSException { 241 return createReceiver(queue, null); 242 } 243 244 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { 245 return (QueueReceiver ) createConsumer(queue, messageSelector); 246 } 247 248 public QueueSender createSender(Queue queue) throws JMSException { 249 throw new JMSException ("not support now!"); 250 } 251 252 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 253 return createSubscriber(topic, null, false); 254 } 255 256 public TopicSubscriber createSubscriber(Topic topic, 257 String messageSelector, 258 boolean noLocal) throws JMSException { 259 return (TopicSubscriber ) createConsumer(topic, messageSelector, noLocal); 260 } 261 262 263 public TopicPublisher createPublisher(Topic topic) throws JMSException { 264 return (TopicPublisher ) createProducer(topic); 265 } 266 267 public Session getSession() throws JMSException { 268 return this; 269 } 270 271 public XAResource getXAResource() { 272 if (isXA == false) { 273 throw new java.lang.IllegalStateException ("current session " + this + " is not an XASession"); 274 } 275 return null; 277 } 278 279 public QueueSession getQueueSession() throws JMSException { 280 return (QueueSession ) getSession(); 281 } 282 283 public TopicSession getTopicSession() throws JMSException { 284 return (TopicSession ) getSession(); 285 } 286 287 290 public void run() { 291 while (!closed) { 292 try { 293 synchronized (this) { 294 if (asyncMessages.isEmpty()) { 295 wait(); 296 } 297 if (closed) break; 298 } 299 for (Iterator it = asyncMessages.entrySet().iterator(); it.hasNext();) { 300 Map.Entry <String , JMSMessage> entry = (Map.Entry <String , JMSMessage>) it.next(); 301 String consumerId = entry.getKey(); 302 JMSMessage message = entry.getValue(); 303 JMSConsumer consumer = consumers.get(consumerId); 304 message.setSession(this); 305 message.setConsumer(consumer); 306 consumer.getMessageListener().onMessage(message); 307 it.remove(); 308 if (this.getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE) { 309 acknowledge(consumer, message); 310 } 311 } 312 } catch (Exception e) { 313 e.printStackTrace(); 314 } 315 } 316 } 317 318 private void checkClosed() throws javax.jms.IllegalStateException { 319 if (closed) { 320 throw new javax.jms.IllegalStateException ("connection closed"); 321 } 322 } 323 324 329 protected String getSessionId() { 330 return sessionId; 331 } 332 333 JMSConnection getJMSConnection() { 334 return conn; 335 } 336 337 protected void start() { 338 new Thread (this, "JMSSession-" + sessionId).start(); 339 } 340 341 void sendMessage(Message message) throws JMSException { 342 getJMSConnection().getContainer().sendMessage((JMSMessage) message); 343 } 344 345 349 JMSMessage receiveMessage(JMSConsumer consumer, long timeout) throws JMSException { 350 if (!getJMSConnection().isStarted()) { 351 throw new IllegalStateException ("connection " + getJMSConnection().getClientID() + " not started, can't receive message."); 352 } 353 JMSMessage message = getJMSConnection().getContainer().receiveMessage(getJMSConnection().getClientID(), 354 getSessionId(), 355 consumer.getConsumerId(), 356 timeout); 357 if (getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE) { 359 getJMSConnection().getContainer().acknowledge(getJMSConnection().getClientID(), 360 getSessionId(), 361 consumer.getConsumerId(), 362 message.getJMSMessageID()); 363 } 364 return message; 365 } 366 367 protected synchronized void setConsumerAsync(JMSConsumer consumer, boolean async) throws JMSException { 368 getJMSConnection().getContainer().setConsumerAsync(getJMSConnection().getClientID(), 369 getSessionId(), 370 consumer.getConsumerId(), 371 async); 372 } 373 374 protected void onMessage(String consumerId, JMSMessage message) { 375 synchronized (this) { 376 asyncMessages.put(consumerId, message); 377 this.notifyAll(); 378 } 379 } 380 381 public void acknowledge(JMSConsumer consumer, JMSMessage message) throws JMSException { 382 getJMSConnection().getContainer().acknowledge(getJMSConnection().getClientID(), sessionId, consumer.getConsumerId(), message.getJMSMessageID()); 383 } 384 385 void closeConsumer(String consumerId) { 386 consumers.remove(consumerId); 387 } 388 389 public static void main(String [] args) { 390 391 } 392 } 393 | Popular Tags |