1 55 package org.lateralnz.common.util; 56 57 import java.io.Serializable ; 58 import java.util.ArrayList ; 59 60 import org.lateralnz.common.wrapper.IntHolder; 61 62 68 public class Queue implements Serializable , Constants { 69 private InnerArrayList queue = new InnerArrayList(); 70 private IntHolder messageIdx = new IntHolder(0); 71 private int cleanupSize = 100; 72 private boolean blocking = false; 73 74 public Queue() { 75 this(100, false); 76 } 77 78 public Queue(boolean blocking) { 79 this(100, blocking); 80 } 81 82 public Queue(int cleanupSize) { 83 this(cleanupSize, false); 84 } 85 86 public Queue(int cleanupSize, boolean blocking) { 87 this.cleanupSize = cleanupSize; 88 this.blocking = blocking; 89 } 90 91 public int getSize() { 92 return queue.size() - messageIdx.value; 93 } 94 95 public synchronized void add(Object obj) { 96 queue.add(obj); 97 98 if (messageIdx.value > cleanupSize) { 99 int removed = queue.remove(0, cleanupSize); 100 messageIdx.value -= removed; 101 } 102 103 if (blocking) { 104 try { 105 notify(); 106 } 107 catch (IllegalMonitorStateException itme) { 108 System.err.println("illegal monitor state"); 109 } 110 } 111 } 112 113 private boolean available() { 114 if (messageIdx.value >= 0 && messageIdx.value < queue.size()) { 115 return true; 116 } 117 else { 118 return false; 119 } 120 } 121 122 public synchronized Object get() throws InterruptedException { 123 if (blocking) { 124 while (!available()) { 125 wait(); 126 } 127 } 128 129 Object rtn = null; 130 int idx = messageIdx.value; 131 132 if (available()) { 133 messageIdx.value++; 134 rtn = queue.get(idx); 135 } 136 137 return rtn; 138 } 139 140 143 private class InnerArrayList extends ArrayList { 144 147 public int remove(int start, int end) { 148 if (start > size()) { 149 return 0; 150 } 151 if (end > size()) { 152 end = size(); 153 } 154 155 this.removeRange(start, end); 156 return end - start; 157 } 158 } 159 160 public static final void main(String [] args) throws Exception { 162 final Queue queue = new Queue(true); 163 final ArrayList test = new ArrayList (); 164 165 final int max = 10000; 166 167 Thread t = new Thread () { 168 public void run() { 169 java.util.Random rand = new java.util.Random (); 170 for (int i = 0; i < max; i++) { 171 queue.add("test" + i); 172 try { sleep(rand.nextInt(5)); } catch (Exception e) { } 173 } 174 } 175 }; 176 177 for (int i = 0; i < 5; i++) { 178 final int idx = i; 179 Thread l = new Thread () { 180 public void run() { 181 java.util.Random rand = new java.util.Random (); 182 while (true) { 183 try { 184 System.out.println("#" + idx + " listening"); 185 Object obj = queue.get(); 186 if (obj == null) { 187 System.out.println(">>ERROR<<"); 188 System.exit(1); 189 } 190 System.out.println("#" + idx + " got " + obj); 191 test.add(obj); 192 if (test.size() >= max) { 193 System.exit(0); 194 } 195 sleep(rand.nextInt(10)); 196 } 197 catch (Exception e) { } 198 } 199 } 200 }; 201 l.start(); 202 } 203 204 t.start(); 205 } 206 } | Popular Tags |