1 23 package org.objectweb.joram.mom.proxies; 24 25 import org.objectweb.util.monolog.api.BasicLevel; 26 import org.objectweb.joram.shared.JoramTracing; 27 28 import java.util.*; 29 30 public class AckedQueue implements java.io.Serializable { 31 32 private Vector list; 33 34 private int current; 35 36 public AckedQueue() { 37 list = new Vector(); 38 current = 0; 39 } 40 41 public void push(ProxyMessage msg) { 42 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 43 JoramTracing.dbgProxy.log( 44 BasicLevel.DEBUG, "AckedQueue.push(" + msg + ')'); 45 synchronized (list) { 46 list.addElement(msg); 47 list.notify(); 48 } 49 } 50 51 public ProxyMessage get() throws InterruptedException { 52 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 53 JoramTracing.dbgProxy.log( 54 BasicLevel.DEBUG, "AckedQueue.get()"); 55 synchronized (list) { 56 while ((list.size() - current) == 0) { 57 list.wait(); 58 } 59 ProxyMessage msg = 60 (ProxyMessage)list.elementAt(current); 61 current++; 62 return msg; 63 } 64 } 65 66 public void ack(long ackId) { 67 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 68 JoramTracing.dbgProxy.log( 69 BasicLevel.DEBUG, "AckedQueue.ack(" + ackId + ')'); 70 synchronized (list) { 71 while (list.size() > 0) { 72 ProxyMessage m = 73 (ProxyMessage)list.elementAt(0); 74 if (ackId < m.getId()) { 75 return; 76 } else { 77 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 79 JoramTracing.dbgProxy.log( 80 BasicLevel.DEBUG, "AckedQueue acked " + m.getId()); 81 list.removeElementAt(0); 82 if (current > 0) { 83 current--; 84 } 85 } 86 } 87 } 88 } 89 90 public void reset() { 91 if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) 92 JoramTracing.dbgProxy.log( 93 BasicLevel.DEBUG, "AckedQueue.reset()"); 94 current = 0; 95 } 96 } 97 | Popular Tags |