1 package org.jacorb.notification.queue; 2 3 23 24 import org.jacorb.notification.interfaces.Message; 25 26 30 31 abstract public class AbstractBoundedEventQueue implements MessageQueue 32 { 33 private final Object lock_; 34 35 private final int capacity_; 36 37 private final EventQueueOverflowStrategy overflowStrategy_; 38 39 protected AbstractBoundedEventQueue(int capacity, EventQueueOverflowStrategy overflowStrategy, 40 Object lock) 41 { 42 lock_ = lock; 43 capacity_ = capacity; 44 overflowStrategy_ = overflowStrategy; 45 } 46 47 abstract protected Message getEarliestTimeout(); 48 49 abstract protected Message getLeastPriority(); 50 51 abstract protected Message getNextElement(); 52 53 abstract protected Message getOldestElement(); 54 55 abstract protected Message getYoungestElement(); 56 57 abstract protected Message[] getElements(int max); 58 59 abstract protected void addElement(Message event); 60 61 abstract protected Message[] getAllElements(); 62 63 public Message[] getAllMessages(boolean wait) throws InterruptedException 64 { 65 synchronized (lock_) 66 { 67 if (wait) 68 { 69 return getAllBlocking(); 70 } 71 72 return getAllElements(); 73 } 74 } 75 76 79 private Message[] getAllBlocking() throws InterruptedException 80 { 81 while (isEmpty()) 82 { 83 lock_.wait(); 84 } 85 86 return getAllElements(); 87 } 88 89 public Message getMessage(boolean wait) throws InterruptedException 90 { 91 synchronized (lock_) 92 { 93 if (wait) 94 { 95 return getEventBlocking(); 96 } 97 98 if (isEmpty()) 99 { 100 return null; 101 } 102 103 return getNextElement(); 104 } 105 } 106 107 public Message[] getMessages(int max, boolean wait) throws InterruptedException 108 { 109 synchronized (lock_) 110 { 111 if (wait) 112 { 113 return getEventsBlocking(max); 114 } 115 116 return getElements(max); 117 } 118 } 119 120 123 private Message[] getEventsBlocking(int max) throws InterruptedException 124 { 125 while (isEmpty()) 126 { 127 lock_.wait(); 128 } 129 130 return getElements(max); 131 } 132 133 136 private Message getEventBlocking() throws InterruptedException 137 { 138 while (isEmpty()) 139 { 140 lock_.wait(); 141 } 142 143 return getOldestElement(); 144 } 145 146 public void put(Message event) 147 { 148 synchronized (lock_) 149 { 150 while (getSize() >= capacity_) 151 { 152 overflowStrategy_.removeElementFromQueue(this); 153 } 154 155 addElement(event); 156 157 lock_.notifyAll(); 158 } 159 } 160 } | Popular Tags |