1 28 29 package com.caucho.jms.session; 30 31 import com.caucho.jms.AbstractDestination; 32 import com.caucho.jms.message.MessageImpl; 33 import com.caucho.jms.selector.Selector; 34 import com.caucho.jms.selector.SelectorParser; 35 import com.caucho.log.Log; 36 import com.caucho.util.Alarm; 37 import com.caucho.util.L10N; 38 39 import javax.jms.JMSException ; 40 import javax.jms.Message ; 41 import javax.jms.MessageConsumer ; 42 import javax.jms.MessageListener ; 43 import javax.jms.Session ; 44 import java.util.logging.Logger ; 45 46 49 public class MessageConsumerImpl 50 implements MessageConsumer , MessageAvailableListener { 51 static final Logger log = Log.open(MessageConsumerImpl.class); 52 static final L10N L = new L10N(MessageConsumerImpl.class); 53 54 private final Object _consumerLock = new Object (); 55 56 protected SessionImpl _session; 57 private AbstractDestination _queue; 58 private MessageListener _messageListener; 59 private String _messageSelector; 60 protected Selector _selector; 61 private boolean _noLocal; 62 63 private volatile boolean _isClosed; 64 65 protected MessageConsumerImpl(SessionImpl session, 66 String messageSelector, 67 AbstractDestination queue, 68 boolean noLocal) 69 throws JMSException 70 { 71 _session = session; 72 _queue = queue; 73 _messageSelector = messageSelector; 74 if (_messageSelector != null) { 75 SelectorParser parser = new SelectorParser(); 76 _selector = parser.parse(messageSelector); 77 } 78 _noLocal = noLocal; 79 80 _queue.addListener(this); 81 } 82 83 86 public boolean getNoLocal() 87 { 88 return _noLocal; 89 } 90 91 94 public MessageListener getMessageListener() 95 { 96 return _messageListener; 97 } 98 99 102 public void setMessageListener(MessageListener listener) 103 { 104 _messageListener = listener; 105 _session.setAsynchronous(); 106 } 107 108 111 public String getMessageSelector() 112 { 113 return _messageSelector; 114 } 115 116 119 public Selector getSelector() 120 { 121 return _selector; 122 } 123 124 127 public boolean isActive() 128 { 129 return _session.isActive() && ! _isClosed; 130 } 131 132 135 public boolean isClosed() 136 { 137 return _isClosed; 138 } 139 140 143 public Message receive() 144 throws JMSException 145 { 146 return receive(Long.MAX_VALUE); 147 } 148 149 152 public Message receive(long timeout) 153 throws JMSException 154 { 155 _session.checkOpen(); 156 157 if (Long.MAX_VALUE / 2 < timeout || timeout < 0) 158 timeout = Long.MAX_VALUE / 2; 159 160 long now = Alarm.getCurrentTime(); 161 long expireTime = Alarm.getCurrentTime() + timeout; 162 163 166 while (! isClosed()) { 167 Message msg = receiveNoWait(); 168 if (msg != null) 169 return msg; 170 171 long delta = expireTime - Alarm.getCurrentTime(); 172 173 if (delta <= 0 || _isClosed || Alarm.isTest()) 174 return null; 175 176 synchronized (_consumerLock) { 177 try { 178 _consumerLock.wait(delta); 179 } catch (Throwable e) { 180 } 181 } 182 } 183 184 return null; 185 } 186 187 190 public Message receiveNoWait() 191 throws JMSException 192 { 193 if (_isClosed) 194 throw new javax.jms.IllegalStateException (L.l("can't receive when consumer is closed")); 195 196 if (! _session.isActive()) 197 return null; 198 199 MessageImpl msg = receiveImpl(); 200 201 if (msg == null) 202 return null; 203 204 switch (_session.getAcknowledgeMode()) { 205 case Session.CLIENT_ACKNOWLEDGE: 206 msg.setSession(_session); 207 break; 208 209 case Session.AUTO_ACKNOWLEDGE: 210 case Session.DUPS_OK_ACKNOWLEDGE: 211 acknowledge(); 212 break; 213 214 default: 215 break; 217 } 218 219 return msg; 220 } 221 222 225 protected MessageImpl receiveImpl() 226 throws JMSException 227 { 228 throw new UnsupportedOperationException (); 229 } 230 231 234 public void acknowledge() 235 throws JMSException 236 { 237 } 238 239 242 public void rollback() 243 throws JMSException 244 { 245 } 246 247 250 public void close() 251 throws JMSException 252 { 253 _isClosed = true; 254 _queue.removeListener(this); 255 } 258 259 262 public void messageAvailable() 263 { 264 _session.notifyListener(); 265 synchronized (_consumerLock) { 266 try { 267 _consumerLock.notify(); 268 } catch (Throwable e) { 269 } 270 } 271 } 272 } 273 274 | Popular Tags |