1 16 17 package org.logicalcobwebs.concurrent; 18 19 30 31 public class LinkedQueue implements Channel { 32 33 34 38 protected LinkedNode head_; 39 40 43 protected final Object putLock_ = new Object (); 44 45 48 protected LinkedNode last_; 49 50 57 protected int waitingForTake_ = 0; 58 59 public LinkedQueue() { 60 head_ = new LinkedNode(null); 61 last_ = head_; 62 } 63 64 65 protected void insert(Object x) { 66 synchronized (putLock_) { 67 LinkedNode p = new LinkedNode(x); 68 synchronized (last_) { 69 last_.next = p; 70 last_ = p; 71 } 72 if (waitingForTake_ > 0) 73 putLock_.notify(); 74 } 75 } 76 77 78 protected synchronized Object extract() { 79 synchronized (head_) { 80 Object x = null; 81 LinkedNode first = head_.next; 82 if (first != null) { 83 x = first.value; 84 first.value = null; 85 head_ = first; 86 } 87 return x; 88 } 89 } 90 91 92 public void put(Object x) throws InterruptedException { 93 if (x == null) throw new IllegalArgumentException (); 94 if (Thread.interrupted()) throw new InterruptedException (); 95 insert(x); 96 } 97 98 public boolean offer(Object x, long msecs) throws InterruptedException { 99 if (x == null) throw new IllegalArgumentException (); 100 if (Thread.interrupted()) throw new InterruptedException (); 101 insert(x); 102 return true; 103 } 104 105 public Object take() throws InterruptedException { 106 if (Thread.interrupted()) throw new InterruptedException (); 107 Object x = extract(); 109 if (x != null) 110 return x; 111 else { 112 synchronized (putLock_) { 113 try { 114 ++waitingForTake_; 115 for (; ;) { 116 x = extract(); 117 if (x != null) { 118 --waitingForTake_; 119 return x; 120 } else { 121 putLock_.wait(); 122 } 123 } 124 } catch (InterruptedException ex) { 125 --waitingForTake_; 126 putLock_.notify(); 127 throw ex; 128 } 129 } 130 } 131 } 132 133 public Object peek() { 134 synchronized (head_) { 135 LinkedNode first = head_.next; 136 if (first != null) 137 return first.value; 138 else 139 return null; 140 } 141 } 142 143 144 public boolean isEmpty() { 145 synchronized (head_) { 146 return head_.next == null; 147 } 148 } 149 150 public Object poll(long msecs) throws InterruptedException { 151 if (Thread.interrupted()) throw new InterruptedException (); 152 Object x = extract(); 153 if (x != null) 154 return x; 155 else { 156 synchronized (putLock_) { 157 try { 158 long waitTime = msecs; 159 long start = (msecs <= 0) ? 0 : System.currentTimeMillis(); 160 ++waitingForTake_; 161 for (; ;) { 162 x = extract(); 163 if (x != null || waitTime <= 0) { 164 --waitingForTake_; 165 return x; 166 } else { 167 putLock_.wait(waitTime); 168 waitTime = msecs - (System.currentTimeMillis() - start); 169 } 170 } 171 } catch (InterruptedException ex) { 172 --waitingForTake_; 173 putLock_.notify(); 174 throw ex; 175 } 176 } 177 } 178 } 179 } 180 181 182 | Popular Tags |