1 7 package org.jboss.jms.client.p2p; 8 9 import java.io.Serializable ; 10 import java.util.ArrayList ; 11 import java.util.Collection ; 12 import java.util.Iterator ; 13 import java.util.List ; 14 import java.util.Map ; 15 import java.util.TreeMap ; 16 17 import javax.jms.BytesMessage ; 18 import javax.jms.Destination ; 19 import javax.jms.JMSException ; 20 import javax.jms.MapMessage ; 21 import javax.jms.Message ; 22 import javax.jms.MessageListener ; 23 import javax.jms.ObjectMessage ; 24 import javax.jms.Queue ; 25 import javax.jms.Session ; 26 import javax.jms.StreamMessage ; 27 import javax.jms.TextMessage ; 28 import javax.transaction.xa.XAResource ; 29 30 import org.jboss.jms.BytesMessageImpl; 31 import org.jboss.jms.MapMessageImpl; 32 import org.jboss.jms.MessageImpl; 33 import org.jboss.jms.ObjectMessageImpl; 34 import org.jboss.jms.StreamMessageImpl; 35 import org.jboss.jms.TextMessageImpl; 36 import org.jboss.jms.client.BrowserDelegate; 37 import org.jboss.jms.client.ConsumerDelegate; 38 import org.jboss.jms.client.ProducerDelegate; 39 import org.jboss.jms.client.SessionDelegate; 40 41 48 public class P2PSessionDelegate 49 implements SessionDelegate 50 { 51 53 55 private P2PConnectionDelegate connection = null; 56 private int acknowledgeMode; 57 private boolean closed = false; private MessageListener messageListener = null; 59 private boolean transacted = false; 60 private List messageConsumers = new ArrayList (); 63 private List messageProducers = new ArrayList (); 64 private List queueBrowsers = new ArrayList (); 65 66 private Map unacknowledgedMessageMap = new TreeMap (); 67 private long nextDeliveryId = 0; 68 private boolean recovering = false; 69 private Object recoveryLock = new Object (); 70 private List uncommittedMessages = new ArrayList (); 71 72 74 76 public P2PSessionDelegate(P2PConnectionDelegate connection, boolean transaction, int acknowledgeMode) 77 throws JMSException 78 { 79 this.connection = connection; 80 this.transacted = transaction; 81 this.acknowledgeMode = acknowledgeMode; 82 } 83 84 86 88 public void close() throws JMSException 89 { 90 if (!this.closed) 91 { 92 if (this.transacted) 93 { 94 this.rollback(); 95 } 96 Iterator iterator = this.messageConsumers.iterator(); 97 while (iterator.hasNext()) 98 { 99 ((ConsumerDelegate) iterator.next()).close(); 100 iterator.remove(); 101 } 102 iterator = this.messageProducers.iterator(); 103 while (iterator.hasNext()) 104 { 105 ((ProducerDelegate) iterator.next()).close(); 106 } 107 iterator = this.queueBrowsers.iterator(); 108 while (iterator.hasNext()) 109 { 110 ((BrowserDelegate) iterator.next()).close(); 111 } 112 this.closed = true; 113 } 114 } 115 116 public void closing() throws JMSException 117 { 118 } 119 120 public void commit() throws JMSException 121 { 122 this.throwExceptionIfClosed(); 123 if (this.transacted) 124 { 125 this.recovering = true; 126 if (this.uncommittedMessages.size() > 0) 127 { 128 this.connection.send((Collection ) ((ArrayList ) this.uncommittedMessages).clone()); 129 } 130 this.unacknowledgedMessageMap.clear(); 131 this.uncommittedMessages.clear(); 132 this.recovering = false; 133 synchronized (this.recoveryLock) 134 { 135 this.recoveryLock.notify(); 136 } 137 } 138 else 139 { 140 throw new IllegalStateException ("Illegal Operation: This is not a transacted Session."); 141 } 142 } 143 144 public BrowserDelegate createBrowser(Queue queue, String selector) throws JMSException 145 { 146 this.throwExceptionIfClosed(); 147 return new P2PBrowserDelegate(this, queue, selector); 148 } 149 150 public BytesMessage createBytesMessage() throws JMSException 151 { 152 this.throwExceptionIfClosed(); 153 return new BytesMessageImpl(); 154 } 155 156 public ConsumerDelegate createConsumer( 157 Destination destination, 158 String subscription, 159 String selector, 160 boolean noLocal) 161 throws JMSException 162 { 163 this.throwExceptionIfClosed(); 164 ConsumerDelegate messageConsumer = new P2PConsumerDelegate(this, destination, selector, noLocal); 165 this.messageConsumers.add(messageConsumer); 166 return messageConsumer; 167 } 168 169 public MapMessage createMapMessage() throws JMSException 170 { 171 this.throwExceptionIfClosed(); 172 return new MapMessageImpl(); 173 } 174 175 public javax.jms.Message createMessage() throws JMSException 176 { 177 this.throwExceptionIfClosed(); 178 return new MessageImpl(); 179 } 180 181 public ObjectMessage createObjectMessage(Serializable object) throws JMSException 182 { 183 this.throwExceptionIfClosed(); 184 return new ObjectMessageImpl(object); 185 } 186 187 public ProducerDelegate createProducer(Destination destination) throws JMSException 188 { 189 this.throwExceptionIfClosed(); 190 ProducerDelegate messageProducer = new P2PProducerDelegate(this, destination); 191 this.messageProducers.add(messageProducer); 192 return messageProducer; 193 } 194 195 public StreamMessage createStreamMessage() throws JMSException 196 { 197 this.throwExceptionIfClosed(); 198 return new StreamMessageImpl(); 199 } 200 201 public Destination createTempDestination(int type) throws JMSException 202 { 203 return null; 205 } 206 207 public TextMessage createTextMessage(String text) throws JMSException 208 { 209 this.throwExceptionIfClosed(); 210 return new TextMessageImpl(text); 211 } 212 213 public Destination getDestination(String name) throws JMSException 214 { 215 return null; 217 } 218 219 public XAResource getXAResource() 220 { 221 return null; 223 } 224 225 public void recover() throws JMSException 226 { 227 this.throwExceptionIfClosed(); 228 if (this.transacted) 229 { 230 throw new IllegalStateException ("Illegal Operation: This is a transacted Session. Use rollback instead."); 231 } 232 synchronized (this.unacknowledgedMessageMap) 233 { 234 this.recovering = true; 235 Map clone = (Map ) ((TreeMap ) this.unacknowledgedMessageMap).clone(); 236 this.unacknowledgedMessageMap.clear(); 237 this.restart(clone); 238 } 239 } 240 241 public void rollback() throws JMSException 242 { 243 this.throwExceptionIfClosed(); 244 if (this.transacted) 245 { 246 synchronized (this.unacknowledgedMessageMap) 247 { 248 this.recovering = true; 249 Map clone = (Map ) ((TreeMap ) this.unacknowledgedMessageMap).clone(); 250 this.unacknowledgedMessageMap.clear(); 251 this.restart(clone); 252 } 253 this.uncommittedMessages.clear(); 254 } 255 else 256 { 257 throw new IllegalStateException ("Illegal Operation: This is not a transacted Session."); 258 } 259 } 260 261 public void run() 262 { 263 } 265 266 public void setMessageListener(MessageListener listener) throws JMSException 267 { 268 this.throwExceptionIfClosed(); 269 this.messageListener = listener; 270 } 271 272 public void unsubscribe(String name) throws JMSException 273 { 274 this.throwExceptionIfClosed(); 275 } 276 277 279 281 synchronized void send(MessageImpl message) throws JMSException 282 { 283 if (this.transacted) 284 { 285 this.uncommittedMessages.add(message.clone()); 286 } 287 else 288 { 289 this.connection.send(message); 290 } 291 } 292 293 public void acknowledge(Message message, boolean acknowledge) 294 { 295 if (!this.transacted) 296 { 297 synchronized (this.unacknowledgedMessageMap) 298 { 299 Iterator iterator = this.unacknowledgedMessageMap.keySet().iterator(); 300 while (iterator.hasNext()) 301 { 302 Long currentKey = (Long ) iterator.next(); 303 if (currentKey.longValue() <= ((MessageImpl) message).deliveryId) 304 { 305 iterator.remove(); 306 } 307 } 308 } 309 } 310 } 311 void deliver(MessageImpl message) 312 { 313 this.deliver(message, false); 314 } 315 316 private void deliver(MessageImpl message, boolean recoveryOperation) 317 { 318 if (this.recovering && !recoveryOperation) 319 { 320 synchronized (this.recoveryLock) 321 { 322 try 323 { 324 this.recoveryLock.wait(); 325 } 326 catch (InterruptedException e) 327 { 328 } 329 } 330 } 331 message.setSession(this); 332 message.setDeliveryId(++this.nextDeliveryId); 333 Iterator iterator = this.messageConsumers.iterator(); 334 if (this.acknowledgeMode != Session.AUTO_ACKNOWLEDGE) 335 { 336 synchronized (unacknowledgedMessageMap) 337 { 338 this.unacknowledgedMessageMap.put(new Long (this.nextDeliveryId), message); 339 } 340 } 341 while (iterator.hasNext()) 342 { 343 ((P2PConsumerDelegate) iterator.next()).deliver(message); 344 } 345 } 346 348 private void throwExceptionIfClosed() throws IllegalStateException 349 { 350 if (this.closed) 351 { 352 throw new IllegalStateException ("The session is closed."); 353 } 354 } 355 356 private void restart(final Map unacknowledgedMessage) 357 { 358 Thread thread = new Thread (new Runnable () 359 { 360 public void run() 361 { 362 Iterator iterator = unacknowledgedMessage.keySet().iterator(); 363 while (iterator.hasNext()) 364 { 365 MessageImpl message = (MessageImpl) unacknowledgedMessage.get(iterator.next()); 366 message.setJMSRedelivered(true); 367 deliver(message, true); 368 } 369 recovering = false; 370 synchronized (recoveryLock) 371 { 372 recoveryLock.notify(); 373 } 374 } 375 }); 376 thread.start(); 377 } 378 379 381 } 382 | Popular Tags |