1 43 package org.exolab.jms.messagemgr; 44 45 import javax.jms.InvalidSelectorException ; 46 47 import org.apache.commons.logging.Log; 48 import org.apache.commons.logging.LogFactory; 49 50 import org.exolab.jms.client.JmsDestination; 51 import org.exolab.jms.message.MessageImpl; 52 import org.exolab.jms.scheduler.Scheduler; 53 import org.exolab.jms.selector.Selector; 54 import org.exolab.jms.server.ServerConnection; 55 import org.exolab.jms.server.JmsServerSession; 56 57 58 65 public abstract class AbstractConsumerEndpoint implements ConsumerEndpoint { 66 67 70 private final ConsumerEndpointListener _session; 71 72 75 private final long _id; 76 77 80 private final long _connectionId; 81 82 85 private final JmsDestination _destination; 86 87 91 private final Selector _selector; 92 93 97 private final boolean _noLocal; 98 99 103 private volatile boolean _stopped = true; 104 105 108 private volatile boolean _closed = false; 109 110 114 private boolean _scheduled = false; 115 116 119 private final Object _lock = new Object (); 120 121 124 private boolean _waitingForMessage = false;; 125 126 130 protected ConsumerEndpointListener _listener = null; 131 132 135 protected int _size = 1000; 136 137 141 protected transient Scheduler _scheduler = null; 142 143 146 private static final Log _log = LogFactory.getLog( 147 AbstractConsumerEndpoint.class); 148 149 150 166 public AbstractConsumerEndpoint(long consumerId, JmsServerSession session, 167 JmsDestination destination, 168 String selector, boolean noLocal, 169 Scheduler scheduler) 170 throws InvalidSelectorException { 171 if (session == null) { 172 throw new IllegalArgumentException ("Argument 'session' is null"); 173 } 174 if (destination == null) { 175 throw new IllegalArgumentException ( 176 "Argument 'destination' is null"); 177 } 178 if (scheduler == null) { 179 throw new IllegalArgumentException ("Argument 'scheduler' is null"); 180 } 181 182 _id = consumerId; 183 _connectionId = session.getConnectionId(); 184 _destination = destination; 185 _selector = (selector != null) ? new Selector(selector) : null; 186 _noLocal = noLocal; 187 _session = session; 188 _scheduler = scheduler; 189 } 190 191 196 public long getId() { 197 return _id; 198 } 199 200 208 public boolean isPersistent() { 209 return false; 210 } 211 212 219 public String getPersistentId() { 220 return null; 221 } 222 223 228 public JmsDestination getDestination() { 229 return _destination; 230 } 231 232 238 public long getConnectionId() { 239 return _connectionId; 240 } 241 242 248 public Selector getSelector() { 249 return _selector; 250 } 251 252 253 259 public boolean getNoLocal() { 260 return _noLocal; 261 } 262 263 269 public synchronized void setStopped(boolean stop) { 270 if (stop) { 271 _stopped = true; 272 } else { 273 _stopped = false; 274 schedule(); 276 } 277 } 278 279 287 public synchronized void setMessageListener( 288 ConsumerEndpointListener listener) { 289 _listener = listener; 290 if (listener == null) { 291 _scheduler.remove(this); 293 _scheduled = false; 294 } else { 295 schedule(); 297 } 298 } 299 300 306 public void run() { 307 synchronized (_lock) { 308 if (!_closed) { 309 boolean reschedule = deliverMessages(); 310 _scheduled = false; 311 if (reschedule) { 312 schedule(); 313 } 314 } 315 } 316 } 317 318 319 325 public final void close() { 326 _stopped = true; 327 328 synchronized (_lock) { 329 _scheduler.remove(this); _scheduled = false; 332 333 } 334 335 synchronized (this) { 336 doClose(); 337 _closed = true; 338 } 339 } 340 341 346 public String toString() { 347 return _id + ":" + getDestination(); 348 } 349 350 351 356 protected abstract boolean deliverMessages(); 357 358 361 protected abstract void doClose(); 362 363 366 protected void schedule() { 367 if (!_stopped && !_closed && _listener != null && !_scheduled) { 368 _scheduled = true; 369 _scheduler.add(this); 370 } 371 } 372 373 378 protected final boolean isStopped() { 379 return _stopped; 380 } 381 382 386 protected void notifyMessageAvailable() { 387 if (isWaitingForMessage()) { 389 clearWaitingForMessage(); 390 391 try { 392 _session.onMessageAvailable(getId()); 393 } catch (Exception exception) { 394 if (_log.isDebugEnabled()) { 395 _log.debug("Failed to notify consumer of available message", 396 exception); 397 } 398 } 399 } 400 } 401 402 407 protected final boolean isWaitingForMessage() { 408 return _waitingForMessage; 409 } 410 411 414 protected final void setWaitingForMessage() { 415 _waitingForMessage = true; 416 } 417 418 421 protected final void clearWaitingForMessage() { 422 _waitingForMessage = false; 423 } 424 425 432 protected boolean stopDelivery() { 433 return (_stopped || getMessageCount() == 0 || _listener == null); 434 } 435 436 443 protected boolean selects(MessageImpl message) { 444 return (_selector == null || _selector.selects(message)); 445 } 446 447 } 448 | Popular Tags |