1 16 17 package EDU.oswego.cs.dl.util.concurrent; 18 19 47 48 public class WaitFreeQueue implements Channel { 49 50 57 58 59 protected final static class Node { 60 protected final Object value; 61 protected volatile Node next; 62 63 64 protected Node(Object x) { value = x; } 65 66 67 protected synchronized boolean CASNext(Node oldNext, Node newNext) { 68 if (next == oldNext) { 69 next = newNext; 70 return true; 71 } 72 else 73 return false; 74 } 75 } 76 77 78 protected volatile Node head = new Node(null); 79 80 protected volatile Node tail = head; 81 82 83 protected final Object tailLock = new Object (); 84 85 86 protected synchronized boolean CASHead(Node oldHead, Node newHead) { 87 if (head == oldHead) { 88 head = newHead; 89 return true; 90 } 91 else 92 return false; 93 } 94 95 96 protected boolean CASTail(Node oldTail, Node newTail) { 97 synchronized(tailLock) { 98 if (tail == oldTail) { 99 tail = newTail; 100 return true; 101 } 102 else 103 return false; 104 } 105 } 106 107 public void put(Object x) throws InterruptedException { 108 if (x == null) throw new IllegalArgumentException (); 109 if (Thread.interrupted()) throw new InterruptedException (); 110 Node n = new Node(x); 111 112 for(;;) { 113 Node t = tail; 114 if (t.CASNext(null, n)) { 116 CASTail(t, n); 119 return; 120 } 121 122 CASTail(t, t.next); 124 } 125 } 126 127 public boolean offer(Object x, long msecs) throws InterruptedException { 128 put(x); 129 return true; 130 } 131 132 133 protected Object extract() throws InterruptedException { 134 for (;;) { 135 Node h = head; 136 Node first = h.next; 137 138 if (first == null) 139 return null; 140 141 Object result = first.value; 142 if (CASHead(h, first)) 143 return result; 144 } 145 } 146 147 public Object peek() { 148 Node first = head.next; 149 150 if (first == null) 151 return null; 152 153 synchronized(this) { 157 return first.value; 158 } 159 } 160 161 169 public Object take() throws InterruptedException { 170 if (Thread.interrupted()) throw new InterruptedException (); 171 for(;;) { 172 Object x = extract(); 173 if (x != null) 174 return x; 175 else 176 Thread.sleep(0); 177 } 178 } 179 180 185 public Object poll(long msecs) throws InterruptedException { 186 if (Thread.interrupted()) throw new InterruptedException (); 187 if (msecs <= 0) 188 return extract(); 189 190 long startTime = System.currentTimeMillis(); 191 for(;;) { 192 Object x = extract(); 193 if (x != null) 194 return x; 195 else if (System.currentTimeMillis() - startTime >= msecs) 196 return null; 197 else 198 Thread.sleep(0); 199 } 200 201 } 202 } 203 | Popular Tags |