1 28 29 package com.caucho.jms.memory; 30 31 import com.caucho.jms.message.MessageImpl; 32 import com.caucho.jms.session.MessageConsumerImpl; 33 import com.caucho.jms.session.SessionImpl; 34 import com.caucho.log.Log; 35 import com.caucho.util.L10N; 36 37 import javax.jms.JMSException ; 38 import javax.jms.Queue ; 39 import javax.jms.QueueReceiver ; 40 import java.util.logging.Logger ; 41 42 45 public class MemoryQueueConsumer extends MessageConsumerImpl 46 implements QueueReceiver { 47 static final Logger log = Log.open(MemoryQueueConsumer.class); 48 static final L10N L = new L10N(MemoryQueueConsumer.class); 49 50 private MemoryQueue _queue; 51 52 private int _consumerId; 53 54 private boolean _autoAck; 55 56 public MemoryQueueConsumer(SessionImpl session, String messageSelector, 57 MemoryQueue queue) 58 throws JMSException 59 { 60 super(session, messageSelector, queue, false); 61 62 if (queue == null) 63 throw new NullPointerException (); 64 65 _queue = queue; 66 67 _consumerId = queue.generateConsumerId(); 68 69 if (session.getAcknowledgeMode() == session.AUTO_ACKNOWLEDGE || 70 session.getAcknowledgeMode() == session.DUPS_OK_ACKNOWLEDGE) 71 _autoAck = true; 72 } 73 74 77 public Queue getQueue() 78 { 79 return _queue; 80 } 81 82 85 protected MessageImpl receiveImpl() 86 throws JMSException 87 { 88 91 return _queue.receive(_selector, _consumerId, _autoAck); 92 } 93 94 97 public void acknowledge() 98 throws JMSException 99 { 100 if (_autoAck) 101 return; 102 103 _queue.acknowledge(_consumerId, Long.MAX_VALUE); 104 } 105 106 109 public void rollback() 110 throws JMSException 111 { 112 if (_autoAck) 113 return; 114 115 _queue.rollback(_consumerId); 116 } 117 118 121 public String toString() 122 { 123 return "MemoryQueueConsumer[" + _queue + "," + _consumerId + "]"; 124 } 125 } 126 127 | Popular Tags |