1 22 package fr.dyade.aaa.agent; 23 24 import java.io.*; 25 26 import org.objectweb.util.monolog.api.BasicLevel; 27 import org.objectweb.util.monolog.api.Logger; 28 29 import fr.dyade.aaa.util.EmptyQueueException; 30 31 42 final class MessageVector implements MessageQueue { 43 private Logger logmon = null; 44 private String logmsg = null; 45 private long cpt1, cpt2; 46 47 55 private Object data[]; 56 57 private int first; 58 63 private int count; 64 65 private int validated; 66 67 private boolean persistent; 68 69 MessageVector(String name, boolean persistent) { 70 logmon = Debug.getLogger(getClass().getName() + '.' + name); 71 logmsg = name + ".MessageVector: "; 72 73 this.persistent = persistent; 74 data = new Object [50]; 75 first = 0; 76 count = 0; 77 validated = 0; 78 } 79 80 86 public synchronized void insert(Message item) { 87 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 88 logmon.log(BasicLevel.DEBUG, logmsg + "insert(" + item + ")"); 89 90 int i = 0; 91 for (; i<validated; i++) { 92 Message msg = getMessageAt(i); 93 if (item.getStamp() < msg.getStamp()) break; 94 } 95 insertMessageAt(item, i); 96 validated += 1; 97 } 98 99 106 public synchronized void push(Message item) { 107 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 108 logmon.log(BasicLevel.DEBUG, logmsg + "push(" + item + ")"); 109 addMessage(item); 110 } 111 112 119 public synchronized Message pop() throws EmptyQueueException { 120 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 121 logmon.log(BasicLevel.DEBUG, logmsg + "pop()"); 122 123 if (validated == 0) 124 throw new EmptyQueueException(); 125 126 Message item = getMessageAt(0); 127 removeMessageAt(0); 128 validated -= 1; 129 130 return item; 131 } 132 133 137 public synchronized void validate() { 138 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 139 logmon.log(BasicLevel.DEBUG, logmsg + "validate()"); 140 validated = size(); 141 notify(); 142 } 143 144 154 public synchronized Message get() throws InterruptedException { 155 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) { 156 logmon.log(BasicLevel.DEBUG, logmsg + "get()"); 157 158 cpt1 += 1; cpt2 += validated; 159 if ((cpt1 & 0xFFFFL) == 0L) { 160 logmon.log(BasicLevel.DEBUG, logmsg + (cpt2/cpt1) + '/' + validated); 161 } 162 } 163 164 while (validated == 0) { 165 wait(); 166 } 167 Message item = getMessageAt(0); 168 169 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 170 logmon.log(BasicLevel.DEBUG, logmsg + "get() -> " + item); 171 172 return item; 173 } 174 175 187 public synchronized Message get(long timeout) throws InterruptedException { 188 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) { 189 logmon.log(BasicLevel.DEBUG, logmsg + "get(" + timeout + ")"); 190 191 cpt1 += 1; cpt2 += validated; 192 if ((cpt1 & 0xFFFFL) == 0L) { 193 logmon.log(BasicLevel.DEBUG, logmsg + (cpt2/cpt1) + '/' + validated); 194 } 195 } 196 197 Message item = null; 198 if ((validated == 0) && (timeout > 0)) wait(timeout); 199 if (validated > 0) item = getMessageAt(0); 200 201 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 202 logmon.log(BasicLevel.DEBUG, logmsg + "get() -> " + item); 203 204 return item; 205 } 206 207 216 public synchronized Message getMessageTo(short to) { 217 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) { 218 logmon.log(BasicLevel.DEBUG, logmsg + "getFrom(" + to + ")"); 219 220 cpt1 += 1; cpt2 += validated; 221 if ((cpt1 & 0xFFFFL) == 0L) { 222 logmon.log(BasicLevel.DEBUG, logmsg + (cpt2/cpt1) + '/' + validated); 223 } 224 } 225 226 Message item = null; 227 for (int i=0; i<validated; i++) { 228 Message msg = getMessageAt(i); 229 if (msg.getDest() == to) { 230 item = msg; 231 break; 232 } 233 } 234 235 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 236 logmon.log(BasicLevel.DEBUG, logmsg + "get() -> " + item); 237 238 return item; 239 } 240 241 246 synchronized void removeMessage(Message msg) { 247 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 248 logmon.log(BasicLevel.DEBUG, 249 logmsg + "removeMessage #" + msg.getStamp()); 250 251 for (int i = 0; i<validated; i++) { 252 if (getMessageAt(i) == msg) { 253 254 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 255 logmon.log(BasicLevel.DEBUG, 256 logmsg + "removeMessage #" + msg.getStamp() + " -> " + i); 257 258 removeMessageAt(i); 259 validated -= 1; 260 return; 261 } 262 } 263 264 logmon.log(BasicLevel.ERROR, 265 logmsg + "removeMessage #" + msg.getStamp() + " not found"); 266 267 return; 268 } 269 270 275 synchronized int remove(int stamp) { 276 if (validated == 0) return 0; 277 278 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 279 logmon.log(BasicLevel.DEBUG, logmsg + "remove #" + stamp); 280 281 int i = 0; 282 for (; i<validated; i++) { 283 Message msg = getMessageAt(i); 284 if (stamp < msg.getStamp()) break; 285 } 286 287 for (int j=0; j<i; j++) { 288 removeMessageAt(0); 289 } 290 validated -= i; 291 292 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 293 logmon.log(BasicLevel.DEBUG, logmsg + "remove #" + stamp + " ->" +i); 294 295 return i; 296 } 297 298 306 void insertMessageAt(Message item, int index) { 307 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 308 logmon.log(BasicLevel.DEBUG, 309 logmsg + "insertMessageAt(" + item + ", " + index + ")"); 310 311 if (count == data.length) { 312 Object newData[] = new Object [data.length *2]; 313 if ((first + count) < data.length) { 314 System.arraycopy(data, first, newData, 0, count); 315 } else { 316 int j = data.length - first; 317 System.arraycopy(data, first, newData, 0, j); 318 System.arraycopy(data, 0, newData, j, count - j); 319 } 320 first = 0; 321 data = newData; 322 } 323 if (index != count) 324 System.arraycopy(data, index, data, index + 1, count - index); 325 if (persistent) 326 data[(first + index)%data.length] = new MessageSoftRef(item); 327 else 328 data[(first + index)%data.length] = item; 329 count += 1; 330 331 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 332 logmon.log(BasicLevel.DEBUG, 333 logmsg + "insertMessageAt() -> " + this); 334 } 335 336 341 void addMessage(Message item) { 342 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 343 logmon.log(BasicLevel.DEBUG, 344 logmsg + "addMessage(" + item + ")"); 345 346 insertMessageAt(item, count); 347 } 348 349 355 Message getMessageAt(int index) { 356 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 357 logmon.log(BasicLevel.DEBUG, logmsg + "getMessageAt(" + index + ")"); 358 359 int idx = (first + index)%data.length; 360 if (persistent) { 361 Message msg = ((MessageSoftRef) data[idx]).getMessage(); 362 if (msg == null) { 363 msg = ((MessageSoftRef) data[idx]).loadMessage(); 364 data[idx] = new MessageSoftRef(msg); 365 } 366 return msg; 367 } else { 368 return (Message) data[idx]; 369 } 370 } 371 372 377 void removeMessageAt(int index) { 378 if ((first + index) < data.length) { 379 System.arraycopy(data, first, 381 data, first +1, index); 382 data[first] = null; 384 first = (first +1)%data.length; 386 count -= 1; 387 } else { 388 System.arraycopy(data, (first + index)%data.length +1, 390 data, (first + index)%data.length, count - index -1); 391 data[(first + count -1)%data.length] = null; 393 count -= 1; 395 } 396 if (count == 0) first = 0; 397 398 if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) 399 logmon.log(BasicLevel.DEBUG, 400 logmsg + "removeMessageAt(" + index + ") -> " + this); 401 } 402 403 408 public int size() { 409 return count; 410 } 411 412 419 public String toString() { 420 StringBuffer strbuf = new StringBuffer (); 421 422 strbuf.append('(').append(super.toString()); 423 strbuf.append(",first=").append(first); 424 strbuf.append(",count=").append(count); 425 strbuf.append(",validated=").append(validated).append(",("); 426 for (int i=0; i<data.length; i++) { 427 strbuf.append(data[i]).append(','); 428 } 429 strbuf.append("))"); 430 431 return strbuf.toString(); 432 } 433 434 final class MessageSoftRef extends java.lang.ref.SoftReference { 435 439 String name = null; 440 444 Message ref = null; 445 446 MessageSoftRef(Message msg) { 447 super(msg); 448 if (msg.isPersistent()) 449 name = msg.toStringId(); 450 else 451 ref = msg; 452 } 453 454 460 public Message getMessage() { 461 if (ref != null) return ref; 462 return (Message) get(); 463 } 464 465 472 public Message loadMessage() throws TransactionError { 473 if (ref != null) return ref; 474 475 Message msg; 476 try { 477 msg = Message.load(name); 478 479 if (logmon.isLoggable(BasicLevel.DEBUG)) 480 logmon.log(BasicLevel.DEBUG, logmsg + "reload from disk " + msg); 481 } catch (Exception exc) { 482 logmon.log(BasicLevel.ERROR, 483 logmsg + "Can't load message " + name, exc); 484 throw new TransactionError(exc); 485 } 486 return msg; 487 } 488 489 495 public String toString() { 496 StringBuffer strbuf = new StringBuffer (); 497 498 strbuf.append('(').append(super.toString()); 499 strbuf.append(",name=").append(name); 500 strbuf.append(",ref=").append(ref); 501 strbuf.append("))"); 502 503 return strbuf.toString(); 504 } 505 } 506 507 final class TransactionError extends Error { 508 TransactionError(Throwable cause) { 509 super(cause); 510 } 511 } 512 } 513 | Popular Tags |