1 package net.walend.somnifugi.juc; 2 3 import java.util.Comparator ; 4 import java.util.Enumeration ; 5 import java.util.Collections ; 6 import java.util.List ; 7 import java.util.ArrayList ; 8 9 import java.util.concurrent.TimeUnit ; 10 import java.util.concurrent.BlockingQueue ; 11 import java.util.concurrent.PriorityBlockingQueue ; 12 13 import javax.jms.Message ; 14 import javax.jms.JMSException ; 15 16 import net.walend.somnifugi.SomniRuntimeException; 17 import net.walend.somnifugi.SomniMessage; 18 import net.walend.somnifugi.SomniMessageSelector; 19 import net.walend.somnifugi.SomniMessageSelectorException; 20 21 import net.walend.somnifugi.channel.Channel; 22 import net.walend.somnifugi.channel.Puttable; 23 import net.walend.somnifugi.channel.Takable; 24 25 30 31 public class PriorityChannel 32 implements Channel<Message >,Puttable<Message >,Takable<Message > 33 { 34 private PriorityBlockingQueue <Message > queue = new PriorityBlockingQueue <Message >(11,new MessageComparator()); 35 36 37 38 public PriorityChannel() 39 { 40 } 41 42 48 public boolean offer(Message elem,long timeout) 49 throws InterruptedException 50 { 51 return queue.offer(elem,timeout,TimeUnit.MILLISECONDS); 52 } 53 54 57 public void put(Message elem) 58 throws InterruptedException 59 { 60 queue.put(elem); 61 } 62 63 67 public Message poll() 68 { 69 return queue.poll(); 70 } 71 72 75 public Message poll(long timeout) 76 throws InterruptedException 77 { 78 return queue.poll(timeout,TimeUnit.MILLISECONDS); 79 } 80 81 84 public Message take() 85 throws InterruptedException 86 { 87 return queue.take(); 88 } 89 90 93 public void pushBack(Message elem) 94 throws InterruptedException 95 { 96 queue.put(elem); 97 } 98 99 public Message peek() 100 { 101 return queue.peek(); 102 } 103 104 public boolean hasRealPushback() 106 { 107 return true; 108 } 109 110 public boolean supportsPriorities() 111 { 112 return true; 113 } 114 115 public boolean supportsMessageSelectors() 116 { 117 return false; 118 } 119 120 public Enumeration <Message > snapShot() 121 { 122 List <Message > snap = new ArrayList <Message >(queue); 123 Collections.sort(snap,queue.comparator()); 124 125 return new EnumerationBridge<Message >(snap.iterator()); 126 } 127 128 public Enumeration snapShot(SomniMessageSelector messageSelector) 129 throws SomniMessageSelectorException 130 { 131 List <Message > snap = new ArrayList <Message >(queue.size()); 132 133 for(Message message : queue) 134 { 135 if(messageSelector.matches(message)) 136 { 137 snap.add(message); 138 } 139 } 140 141 return new EnumerationBridge<Message >(snap.iterator()); 142 } 143 144 public Puttable<Message > getPuttable() 145 { 146 return this; 147 } 148 149 public Takable<Message > getTakable() 150 { 151 return this; 152 } 153 154 public Takable<Message > getTakable(SomniMessageSelector messageSelector) 155 { 156 throw new UnsupportedOperationException (getClass().getName()+" does not support message selectors."); 157 } 158 159 public int guessSize() 160 { 161 return queue.size(); 162 } 163 } 164 165 185 | Popular Tags |