1 18 package org.apache.activemq; 19 20 import java.util.ArrayList ; 21 import java.util.LinkedList ; 22 import java.util.List ; 23 24 import javax.jms.JMSException ; 25 26 import org.apache.activemq.command.MessageDispatch; 27 28 public class MessageDispatchChannel { 29 30 private final Object mutex = new Object (); 31 private final LinkedList list; 32 private boolean closed; 33 private boolean running; 34 35 public MessageDispatchChannel() { 36 this.list = new LinkedList (); 37 } 38 39 public void enqueue(MessageDispatch message) { 40 synchronized(mutex) { 41 list.addLast(message); 42 mutex.notify(); 43 } 44 } 45 46 public void enqueueFirst(MessageDispatch message) { 47 synchronized(mutex) { 48 list.addFirst(message); 49 mutex.notify(); 50 } 51 } 52 53 public boolean isEmpty() { 54 synchronized(mutex) { 55 return list.isEmpty(); 56 } 57 } 58 59 73 public MessageDispatch dequeue(long timeout) throws InterruptedException { 74 synchronized (mutex) { 75 while(timeout != 0 && !closed && (list.isEmpty() || !running)) { 77 if (timeout == -1) { 78 mutex.wait(); 79 } else { 80 mutex.wait(timeout); 81 break; 82 } 83 } 84 if (closed || !running || list.isEmpty()) { 85 return null; 86 } 87 return (MessageDispatch) list.removeFirst(); 88 } 89 } 90 91 public MessageDispatch dequeueNoWait() { 92 synchronized (mutex) { 93 if (closed || !running || list.isEmpty()) { 94 return null; 95 } 96 return (MessageDispatch) list.removeFirst(); 97 } 98 } 99 100 public MessageDispatch peek() { 101 synchronized (mutex) { 102 if (closed || !running || list.isEmpty()) { 103 return null; 104 } 105 return (MessageDispatch) list.getFirst(); 106 } 107 } 108 109 public void start() { 110 synchronized (mutex) { 111 running = true; 112 mutex.notifyAll(); 113 } 114 } 115 116 public void stop() { 117 synchronized (mutex) { 118 running = false; 119 mutex.notifyAll(); 120 } 121 } 122 123 public void close() { 124 synchronized (mutex) { 125 if (!closed) { 126 running = false; 127 closed = true; 128 } 129 mutex.notifyAll(); 130 } 131 } 132 133 public void clear() { 134 synchronized(mutex) { 135 list.clear(); 136 } 137 } 138 139 public boolean isClosed() { 140 return closed; 141 } 142 143 public int size() { 144 synchronized(mutex) { 145 return list.size(); 146 } 147 } 148 149 public Object getMutex() { 150 return mutex; 151 } 152 153 public boolean isRunning() { 154 return running; 155 } 156 157 public List removeAll() { 158 synchronized(mutex) { 159 ArrayList rc = new ArrayList (list); 160 list.clear(); 161 return rc; 162 } 163 } 164 165 public String toString() { 166 synchronized(mutex) { 167 return list.toString(); 168 } 169 } 170 } 171 | Popular Tags |