1 package net.walend.somnifugi.juc; 2 3 import java.util.Comparator ; 4 import java.util.Enumeration ; 5 import java.util.Collections ; 6 import java.util.List ; 7 import java.util.ArrayList ; 8 9 import java.util.concurrent.TimeUnit ; 10 import java.util.concurrent.BlockingQueue ; 11 import java.util.concurrent.PriorityBlockingQueue ; 12 13 import javax.jms.Message ; 14 import javax.jms.JMSException ; 15 16 import net.walend.somnifugi.SomniRuntimeException; 17 import net.walend.somnifugi.SomniMessage; 18 import net.walend.somnifugi.SomniMessageSelector; 19 import net.walend.somnifugi.SomniMessageSelectorException; 20 21 import net.walend.somnifugi.channel.Channel; 22 import net.walend.somnifugi.channel.Puttable; 23 import net.walend.somnifugi.channel.Takable; 24 25 30 31 public class MessageSelectingPriorityChannel 32 implements Channel<Message >,Puttable<Message > 33 { 34 private final MessageSelectingPriorityBlockingQueue queue = new MessageSelectingPriorityBlockingQueue(11,new MessageComparator()); 35 private final Takable<Message > takable = new MessageSelectingTakable(SomniMessageSelector.ALLMESSAGESELECTOR,queue); 36 37 public MessageSelectingPriorityChannel() 38 { 39 } 40 41 47 public boolean offer(Message elem,long timeout) 48 throws InterruptedException 49 { 50 return queue.offer(elem,timeout,TimeUnit.MILLISECONDS); 51 } 52 53 56 public void put(Message elem) 57 throws InterruptedException 58 { 59 queue.put(elem); 60 } 61 62 public boolean hasRealPushback() 64 { 65 return true; 66 } 67 68 public boolean supportsPriorities() 69 { 70 return true; 71 } 72 73 public boolean supportsMessageSelectors() 74 { 75 return true; 76 } 77 78 public Puttable<Message > getPuttable() 79 { 80 return this; 81 } 82 83 public Takable<Message > getTakable() 84 { 85 return takable; 86 } 87 88 public Takable<Message > getTakable(SomniMessageSelector messageSelector) 89 { 90 return new MessageSelectingTakable(messageSelector,queue); 91 } 92 93 private static final class MessageSelectingTakable 94 implements Takable<Message > 95 { 96 private final SomniMessageSelector messageSelector; 97 private final MessageSelectingPriorityBlockingQueue queue; 98 99 MessageSelectingTakable(SomniMessageSelector messageSelector,MessageSelectingPriorityBlockingQueue queue) 100 { 101 this.messageSelector = messageSelector; 102 this.queue = queue; 103 } 104 105 109 public Message poll() 110 { 111 return queue.poll(messageSelector); 112 } 113 114 117 public Message poll(long timeout) 118 throws InterruptedException 119 { 120 return queue.poll(timeout,TimeUnit.MILLISECONDS,messageSelector); 121 } 122 123 126 public Message take() 127 throws InterruptedException 128 { 129 return queue.take(messageSelector); 130 } 131 132 135 public void pushBack(Message elem) 136 throws InterruptedException 137 { 138 queue.put(elem); 139 } 140 141 public Message peek() 142 { 143 return queue.peek(); 144 } 145 146 public Enumeration <Message > snapShot() 147 { 148 List <Message > snap = queue.snapMasterQueueContents(); 149 150 return new EnumerationBridge<Message >(snap.iterator()); 151 } 152 153 public Enumeration snapShot(SomniMessageSelector messageSelector) 154 throws SomniMessageSelectorException 155 { 156 List <Message > snap = queue.snapMasterQueueContents(); 157 List <Message > result = new ArrayList <Message >(snap.size()); 158 159 for(Message message : snap) 160 { 161 if(messageSelector.matches(message)) 162 { 163 result.add(message); 164 } 165 } 166 return new EnumerationBridge<Message >(result.iterator()); 167 } 168 169 public int guessSize() 170 { 171 return queue.size(); 172 } 173 } 174 } 175 176 196 | Popular Tags |