1 29 30 package com.caucho.jms.memory; 31 32 import com.caucho.jms.AbstractDestination; 33 import com.caucho.jms.message.MessageImpl; 34 import com.caucho.jms.selector.Selector; 35 import com.caucho.jms.session.SessionImpl; 36 import com.caucho.log.Log; 37 import com.caucho.util.L10N; 38 39 import javax.jms.JMSException ; 40 import javax.jms.Message ; 41 import javax.jms.MessageConsumer ; 42 import javax.jms.Queue ; 43 import java.util.ArrayList ; 44 import java.util.Enumeration ; 45 import java.util.logging.Level ; 46 import java.util.logging.Logger ; 47 48 51 public class MemoryQueue extends AbstractDestination 52 implements Queue { 53 static final Logger log = Log.open(MemoryQueue.class); 54 static final L10N L = new L10N(MemoryQueue.class); 55 56 ArrayList <Item> _queue = new ArrayList <Item>(); 57 58 private String _queueName; 59 private Selector _selector; 60 61 private long _queueId; 62 63 private int _consumerId; 64 65 public MemoryQueue() 66 { 67 } 68 69 72 public String getQueueName() 73 { 74 return _queueName; 75 } 76 77 80 public void setQueueName(String name) 81 { 82 _queueName = name; 83 } 84 85 public void setSelector(Selector selector) 86 { 87 _selector = selector; 88 } 89 90 public Selector getSelector() 91 { 92 return _selector; 93 } 94 95 98 public long generateQueueId() 99 { 100 return ++_queueId; 101 } 102 103 106 public int generateConsumerId() 107 { 108 return ++_consumerId; 109 } 110 111 public void send(Message message) 112 throws JMSException 113 { 114 if (_selector != null && ! _selector.isMatch(message)) 115 return; 116 117 long sequenceId = nextConsumerSequenceId(); 118 119 if (log.isLoggable(Level.FINE)) 120 log.fine("MemoryQueue[" + _queueName + "] send " + sequenceId); 121 122 synchronized (_queue) { 123 _queue.add(new Item(generateQueueId(), (MessageImpl) message)); 124 _queue.notify(); 125 } 126 127 messageAvailable(); 128 } 129 130 137 public MessageConsumer createConsumer(SessionImpl session, 138 String selector, 139 boolean noLocal) 140 throws JMSException 141 { 142 return new MemoryQueueConsumer(session, selector, this); 143 } 144 145 148 MessageImpl receive(Selector selector, long consumerId, boolean autoAck) 149 throws JMSException 150 { 151 synchronized (_queue) { 152 int i; 153 int size = _queue.size(); 154 155 for (i = 0; i < size; i++) { 156 Item item = _queue.get(i); 157 158 if (item.getConsumerId() >= 0) 159 continue; 160 161 MessageImpl message = item.getMessage(); 162 163 if (selector == null || selector.isMatch(message)) { 164 message.setJMSRedelivered(item.getDelivered()); 165 166 if (autoAck) 167 _queue.remove(i); 168 else 169 item.setConsumerId(consumerId); 170 171 return message; 172 } 173 } 174 } 175 176 return null; 177 } 178 179 182 void rollback(long consumerId) 183 throws JMSException 184 { 185 synchronized (_queue) { 186 for (int i = _queue.size() -1; i >= 0; i--) { 187 Item item = _queue.get(i); 188 189 if (item.getConsumerId() == consumerId) 190 item.setConsumerId(-1); 191 } 192 } 193 } 194 195 198 void acknowledge(long consumerId, long messageId) 199 throws JMSException 200 { 201 synchronized (_queue) { 202 for (int i = _queue.size() -1; i >= 0; i--) { 203 Item item = _queue.get(i); 204 205 if (item.getConsumerId() == consumerId) 206 _queue.remove(i); 207 } 208 } 209 } 210 211 214 public MemoryQueueBrowser createBrowser(SessionImpl session, 215 String selector) 216 throws JMSException 217 { 218 return new MemoryQueueBrowser(session, this, selector); 219 } 220 221 224 public Enumeration getEnumeration(Selector selector) 225 { 226 return new BrowserEnumeration(this, selector); 227 } 228 229 232 private boolean hasMessage(Selector selector) 233 throws JMSException 234 { 235 synchronized (_queue) { 236 int i; 237 int size = _queue.size(); 238 239 for (i = 0; i < size; i++) { 240 Item item = _queue.get(i); 241 242 if (item.getConsumerId() >= 0) 243 continue; 244 245 Message message = item.getMessage(); 246 247 if (selector == null || selector.isMatch(message)) 248 return true; 249 } 250 } 251 252 return false; 253 } 254 255 258 private long nextId(Selector selector, long id) 259 throws JMSException 260 { 261 synchronized (_queue) { 262 int i; 263 int size = _queue.size(); 264 265 for (i = 0; i < size; i++) { 266 Item item = _queue.get(i); 267 268 if (item.getConsumerId() >= 0) 269 continue; 270 271 else if (item.getId() < id) 272 continue; 273 274 Message message = item.getMessage(); 275 276 if (selector == null || selector.isMatch(message)) 277 return item.getId(); 278 } 279 } 280 281 return Long.MAX_VALUE; 282 } 283 284 287 private Message nextValue(Selector selector, long id) 288 throws JMSException 289 { 290 synchronized (_queue) { 291 int i; 292 int size = _queue.size(); 293 294 for (i = 0; i < size; i++) { 295 Item item = _queue.get(i); 296 297 if (item.getConsumerId() >= 0) 298 continue; 299 300 else if (item.getId() < id) 301 continue; 302 303 Message message = item.getMessage(); 304 305 if (selector == null || selector.isMatch(message)) 306 return message; 307 } 308 } 309 310 return null; 311 } 312 313 316 public String toString() 317 { 318 return "MemoryQueue[" + _queueName + "]"; 319 } 320 321 static class BrowserEnumeration implements Enumeration { 322 private MemoryQueue _queue; 323 private Selector _selector; 324 private long _id = -1; 325 326 BrowserEnumeration(MemoryQueue queue, Selector selector) 327 { 328 _queue = queue; 329 _selector = selector; 330 } 331 332 public boolean hasMoreElements() 333 { 334 try { 335 if (_id < 0) 336 _id = _queue.nextId(_selector, _id); 337 338 return (_id < Long.MAX_VALUE); 339 } catch (Exception e) { 340 throw new RuntimeException (e); 341 } 342 } 343 344 public Object nextElement() 345 { 346 try { 347 if (_id < 0) 349 _id = _queue.nextId(_selector, _id); 350 351 Object value = _queue.nextValue(_selector, _id); 352 353 _id = _queue.nextId(_selector, _id + 1); 354 355 return value; 356 } catch (Exception e) { 357 throw new RuntimeException (e); 358 } 359 } 360 } 361 362 static class Item { 363 private MessageImpl _msg; 364 private long _id; 365 private long _consumerId = -1; 366 private boolean _delivered; 367 368 Item(long id, MessageImpl msg) 369 { 370 _id = id; 371 372 _msg = msg; 373 } 374 375 MessageImpl getMessage() 376 { 377 return _msg; 378 } 379 380 long getId() 381 { 382 return _id; 383 } 384 385 long getConsumerId() 386 { 387 return _consumerId; 388 } 389 390 void setConsumerId(long consumerId) 391 { 392 _consumerId = consumerId; 393 _delivered = true; 394 } 395 396 boolean getDelivered() 397 { 398 return _delivered; 399 } 400 } 401 } 402 403 | Popular Tags |