Your browser does not support JavaScript and this site utilizes JavaScript to build content and provide links to additional information. You should either enable JavaScript in your browser settings or use a browser that supports JavaScript in order to take full advantage of this site.
1 16 17 package org.logicalcobwebs.concurrent; 18 19 30 31 public class LinkedQueue implements Channel { 32 33 34 38 protected LinkedNode head_; 39 40 43 protected final Object putLock_ = new Object (); 44 45 48 protected LinkedNode last_; 49 50 57 protected int waitingForTake_ = 0; 58 59 public LinkedQueue() { 60 head_ = new LinkedNode(null); 61 last_ = head_; 62 } 63 64 65 protected void insert(Object x) { 66 synchronized (putLock_) { 67 LinkedNode p = new LinkedNode(x); 68 synchronized (last_) { 69 last_.next = p; 70 last_ = p; 71 } 72 if (waitingForTake_ > 0) 73 putLock_.notify(); 74 } 75 } 76 77 78 protected synchronized Object extract() { 79 synchronized (head_) { 80 Object x = null; 81 LinkedNode first = head_.next; 82 if (first != null) { 83 x = first.value; 84 first.value = null; 85 head_ = first; 86 } 87 return x; 88 } 89 } 90 91 92 public void put(Object x) throws InterruptedException { 93 if (x == null) throw new IllegalArgumentException (); 94 if (Thread.interrupted()) throw new InterruptedException (); 95 insert(x); 96 } 97 98 public boolean offer(Object x, long msecs) throws InterruptedException { 99 if (x == null) throw new IllegalArgumentException (); 100 if (Thread.interrupted()) throw new InterruptedException (); 101 insert(x); 102 return true; 103 } 104 105 public Object take() throws InterruptedException { 106 if (Thread.interrupted()) throw new InterruptedException (); 107 Object x = extract(); 109 if (x != null) 110 return x; 111 else { 112 synchronized (putLock_) { 113 try { 114 ++waitingForTake_; 115 for (; ;) { 116 x = extract(); 117 if (x != null) { 118 --waitingForTake_; 119 return x; 120 } else { 121 putLock_.wait(); 122 } 123 } 124 } catch (InterruptedException ex) { 125 --waitingForTake_; 126 putLock_.notify(); 127 throw ex; 128 } 129 } 130 } 131 } 132 133 public Object peek() { 134 synchronized (head_) { 135 LinkedNode first = head_.next; 136 if (first != null) 137 return first.value; 138 else 139 return null; 140 } 141 } 142 143 144 public boolean isEmpty() { 145 synchronized (head_) { 146 return head_.next == null; 147 } 148 } 149 150 public Object poll(long msecs) throws InterruptedException { 151 if (Thread.interrupted()) throw new InterruptedException (); 152 Object x = extract(); 153 if (x != null) 154 return x; 155 else { 156 synchronized (putLock_) { 157 try { 158 long waitTime = msecs; 159 long start = (msecs <= 0) ? 0 : System.currentTimeMillis(); 160 ++waitingForTake_; 161 for (; ;) { 162 x = extract(); 163 if (x != null || waitTime <= 0) { 164 --waitingForTake_; 165 return x; 166 } else { 167 putLock_.wait(waitTime); 168 waitTime = msecs - (System.currentTimeMillis() - start); 169 } 170 } 171 } catch (InterruptedException ex) { 172 --waitingForTake_; 173 putLock_.notify(); 174 throw ex; 175 } 176 } 177 } 178 } 179 } 180 181 182
| Popular Tags
|