| 1 package net.walend.somnifugi.juc; 2 3 import java.util.Comparator ; 4 import java.util.Map ; 5 import java.util.HashMap ; 6 import java.util.List ; 7 import java.util.ArrayList ; 8 import java.util.Collections ; 9 10 import java.util.concurrent.TimeUnit ; 11 import java.util.concurrent.PriorityBlockingQueue ; 12 13 import java.util.concurrent.locks.ReentrantLock ; 14 import java.util.concurrent.locks.Condition ; 15 16 import java.util.concurrent.atomic.AtomicMarkableReference ; 17 18 import javax.jms.Message ; 19 import javax.jms.JMSException ; 20 21 import net.walend.somnifugi.SomniMessageSelector; 22 import net.walend.somnifugi.SomniMessageSelectorException; 23 import net.walend.somnifugi.SomniRuntimeException; 24 25 30 31 class MessageSelectingPriorityBlockingQueue 32 { 33 34 private static final int INITIALCAPACITY = 5; 35 36 private final PriorityBlockingQueue <AtomicMarkableReference <Message >> masterQueue; 37 38 private final ReentrantLock lock = new ReentrantLock (true); 40 41 private final Comparator <AtomicMarkableReference <Message >> refComparator; 42 43 private final Map <SomniMessageSelector,PriorityBlockingQueue <AtomicMarkableReference <Message >>> messageSelectorsToQueues; 44 45 MessageSelectingPriorityBlockingQueue(int initialCapacity,Comparator <Message > comparator) 46 { 47 refComparator = new RefComparator(comparator); 48 49 messageSelectorsToQueues = new HashMap <SomniMessageSelector,PriorityBlockingQueue <AtomicMarkableReference <Message >>>(); 50 51 masterQueue = getQueueForMessageSelector(SomniMessageSelector.ALLMESSAGESELECTOR); 52 } 53 54 private PriorityBlockingQueue <AtomicMarkableReference <Message >> getQueueForMessageSelector(SomniMessageSelector messageSelector) 55 { 56 try 57 { 58 lock.lock(); 59 60 PriorityBlockingQueue <AtomicMarkableReference <Message >> queue = messageSelectorsToQueues.get(messageSelector); 61 if(queue == null) 62 { 63 queue = new PriorityBlockingQueue <AtomicMarkableReference <Message >>(INITIALCAPACITY,refComparator); 64 messageSelectorsToQueues.put(messageSelector,queue); 65 66 if(messageSelector != SomniMessageSelector.ALLMESSAGESELECTOR) 67 { 68 List <AtomicMarkableReference <Message >> list = new ArrayList <AtomicMarkableReference <Message >>(masterQueue); 70 71 for(AtomicMarkableReference <Message > ref : list) 72 { 73 if(messageSelector.matches(ref.getReference())) 74 { 75 queue.add(ref); 76 } 77 } 78 } 79 } 80 81 return queue; 82 } 83 catch(SomniMessageSelectorException smse) 84 { 85 throw new SomniRuntimeException("While using "+messageSelector,smse); 86 } 87 finally 88 { 89 lock.unlock(); 90 } 91 } 92 93 public void removeMessageSelector(SomniMessageSelector messageSelector) 94 { 95 try 96 { 97 lock.lock(); 98 messageSelectorsToQueues.remove(messageSelector); 99 } 100 finally 101 { 102 lock.unlock(); 103 } 104 } 105 106 public void put(Message message) 107 { 108 if (message == null) 109 { 110 throw new NullPointerException ("message can not be null."); 111 } 112 AtomicMarkableReference <Message > ref = new AtomicMarkableReference <Message >(message,false); 113 114 try 115 { 116 lock.lock(); 117 118 for(SomniMessageSelector messageSelector : messageSelectorsToQueues.keySet()) 120 { 121 if(messageSelector.matches(message)) 122 { 123 PriorityBlockingQueue <AtomicMarkableReference <Message >> queue = getQueueForMessageSelector(messageSelector); 124 queue.put(ref); 125 } 126 } 127 } 128 catch(SomniMessageSelectorException smse) 129 { 130 throw new SomniRuntimeException("While offering "+message,smse); 131 } 132 finally 133 { 134 lock.unlock(); 135 } 136 } 137 138 143 public boolean offer(Message message,long timeout,TimeUnit timeUnit) 144 { 145 if (message == null) 146 { 147 throw new NullPointerException ("message can not be null."); 148 } 149 AtomicMarkableReference <Message > ref = new AtomicMarkableReference <Message >(message,false); 150 151 try 152 { 153 lock.lock(); 154 boolean ok = masterQueue.offer(ref,timeout,timeUnit); 155 156 if(ok) 157 { 158 for(SomniMessageSelector messageSelector : messageSelectorsToQueues.keySet()) 160 { 161 if(messageSelector.matches(message)) 162 { 163 PriorityBlockingQueue <AtomicMarkableReference <Message >> queue = getQueueForMessageSelector(messageSelector); 164 if(queue!=masterQueue) 165 { 166 queue.put(ref); 167 } 168 } 169 } 170 } 171 return ok; 172 } 173 catch(SomniMessageSelectorException smse) 174 { 175 throw new SomniRuntimeException("While offering "+message,smse); 176 } 177 finally 178 { 179 lock.unlock(); 180 } 181 } 182 183 public Message take(SomniMessageSelector messageSelector) 184 throws InterruptedException 185 { 186 PriorityBlockingQueue <AtomicMarkableReference <Message >> queue = getQueueForMessageSelector(messageSelector); 187 188 190 AtomicMarkableReference <Message > ref = null; 191 192 do 193 { 194 ref = queue.take(); 195 } 196 while(!ref.compareAndSet(ref.getReference(),ref.getReference(),false,true)); 197 198 if(SomniMessageSelector.ALLMESSAGESELECTOR!=messageSelector) 199 { 200 masterQueue.remove(ref); 201 } 202 203 return ref.getReference(); 204 } 205 206 public Message poll(SomniMessageSelector messageSelector) 207 { 208 PriorityBlockingQueue <AtomicMarkableReference <Message >> queue = getQueueForMessageSelector(messageSelector); 209 210 AtomicMarkableReference <Message > ref = null; 211 212 do 214 { 215 ref = queue.poll(); 216 } 217 while((ref!=null)&&(!ref.compareAndSet(ref.getReference(),ref.getReference(),false,true))); 218 219 if(ref == null) 220 { 221 return null; 222 } 223 224 if(SomniMessageSelector.ALLMESSAGESELECTOR!=messageSelector) 225 { 226 masterQueue.remove(ref); 227 } 228 229 return ref.getReference(); 230 } 231 232 public Message poll(long timeout,TimeUnit unit,SomniMessageSelector messageSelector) throws InterruptedException 233 { 234 PriorityBlockingQueue <AtomicMarkableReference <Message >> queue = getQueueForMessageSelector(messageSelector); 235 236 long nanos = unit.toNanos(timeout); 237 long now = System.nanoTime(); 239 final long timeOutTime = now + nanos; 240 long timeLeft; 241 242 AtomicMarkableReference <Message > ref = null; 243 244 do 246 { 247 now = System.nanoTime(); 249 timeLeft = timeOutTime - now; 250 ref = queue.poll(timeLeft,TimeUnit.NANOSECONDS); 251 } 252 while((timeLeft>0)&&(ref!=null)&&(!ref.compareAndSet(ref.getReference(),ref.getReference(),false,true))); 253 254 if(ref == null) 255 { 256 return null; 257 } 258 259 if(timeLeft <= 0) 260 { 261 return null; 262 } 263 264 if(SomniMessageSelector.ALLMESSAGESELECTOR!=messageSelector) 265 { 266 masterQueue.remove(ref); 267 } 268 269 return ref.getReference(); 270 } 271 272 public Message peek() 273 { 274 AtomicMarkableReference <Message > ref = masterQueue.peek(); 275 if(ref == null) 276 { 277 return null; 278 } 279 return masterQueue.peek().getReference(); 280 } 281 282 public int size() 283 { 284 return masterQueue.size(); 285 } 286 287 List <Message > snapMasterQueueContents() 288 { 289 List <AtomicMarkableReference <Message >> refs = new ArrayList <AtomicMarkableReference <Message >>(masterQueue); 290 Collections.sort(refs,refComparator); 291 List <Message > messages = new ArrayList <Message >(refs.size()); 292 for(AtomicMarkableReference <Message > ref : refs) 293 { 294 messages.add(ref.getReference()); 295 } 296 return messages; 297 } 298 299 public String toString() 300 { 301 return masterQueue.toString(); 302 } 303 304 private static final class RefComparator 305 implements Comparator <AtomicMarkableReference <Message >> 306 { 307 private Comparator <Message > comparator; 308 309 private RefComparator(Comparator <Message > comparator) 310 { 311 this.comparator = comparator; 312 } 313 314 public int compare(AtomicMarkableReference <Message > ref1,AtomicMarkableReference <Message > ref2) 315 { 316 return comparator.compare(ref1.getReference(),ref2.getReference()); 317 } 318 } 319 } 320 321 341 | Popular Tags |