1 24 25 package org.objectweb.dream.queue; 26 27 import org.objectweb.dream.AbstractComponent; 28 import org.objectweb.dream.message.Message; 29 import org.objectweb.dream.message.manager.MessageManager; 30 import org.objectweb.dream.util.Error; 31 import org.objectweb.fractal.api.NoSuchInterfaceException; 32 import org.objectweb.fractal.api.control.IllegalBindingException; 33 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 34 35 76 public abstract class AbstractBufferImpl extends AbstractComponent 77 implements 78 Buffer, 79 BufferAddFirstLast, 80 BufferRemoveFirstLast, 81 QueueAttributeController 82 { 83 84 85 protected MessageManager messageManagerItf; 86 87 88 protected int maxCapacity = 0; 89 90 95 protected int availableMessagesIndicator = 0; 96 97 102 protected int storedMessagesIndicator = 0; 103 104 105 protected final Object lock = new Object (); 106 107 111 114 public int availableMessagesIndicator() 115 { 116 return availableMessagesIndicator; 117 } 118 119 122 public int storedMessagesIndicator() 123 { 124 return storedMessagesIndicator; 125 } 126 127 130 public int availableSpaceIndicator() 131 { 132 if (maxCapacity <= 0) 133 { 134 return Integer.MAX_VALUE; 135 } 136 else 137 { 138 return maxCapacity - storedMessagesIndicator; 139 } 140 } 141 142 147 public void add(Message message) throws InterruptedException 148 { 149 synchronized (lock) 150 { 151 while (!canAdd(message)) 152 { 153 lock.wait(); 154 } 155 doAdd(message); 156 } 157 } 158 159 164 public Message remove() throws InterruptedException 165 { 166 synchronized (lock) 167 { 168 while (!hasAvailableMessage()) 169 { 170 lock.wait(); 171 } 172 return doRemove(); 173 } 174 } 175 176 181 public Message get() throws InterruptedException 182 { 183 synchronized (lock) 184 { 185 while (!hasAvailableMessage()) 186 { 187 lock.wait(); 188 } 189 return doGet(); 190 } 191 } 192 193 196 public void addFirst(Message message) throws InterruptedException 197 { 198 synchronized (lock) 199 { 200 while (!canAdd(message)) 201 { 202 lock.wait(); 203 } 204 doAddFirst(message); 205 } 206 } 207 208 211 public void addLast(Message message) throws InterruptedException 212 { 213 synchronized (lock) 214 { 215 while (!canAdd(message)) 216 { 217 lock.wait(); 218 } 219 doAddLast(message); 220 } 221 } 222 223 226 public Message getFirst() throws InterruptedException 227 { 228 synchronized (lock) 229 { 230 while (!hasAvailableMessage()) 231 { 232 lock.wait(); 233 } 234 return doGetFirst(); 235 } 236 } 237 238 241 public Message getLast() throws InterruptedException 242 { 243 synchronized (lock) 244 { 245 while (!hasAvailableMessage()) 246 { 247 lock.wait(); 248 } 249 return doGetLast(); 250 } 251 } 252 253 256 public Message removeFirst() throws InterruptedException 257 { 258 synchronized (lock) 259 { 260 while (!hasAvailableMessage()) 261 { 262 lock.wait(); 263 } 264 return doRemoveFirst(); 265 } 266 } 267 268 271 public Message removeLast() throws InterruptedException 272 { 273 synchronized (lock) 274 { 275 while (!hasAvailableMessage()) 276 { 277 lock.wait(); 278 } 279 return doRemoveLast(); 280 } 281 } 282 283 287 292 protected void incrementAvailableMessagesIndicator(int delta) 293 { 294 synchronized (lock) 295 { 296 availableMessagesIndicator += delta; 297 lock.notifyAll(); 298 } 299 } 300 301 306 protected void incrementStoredMessagesIndicator(int delta) 307 { 308 synchronized (lock) 309 { 310 storedMessagesIndicator += delta; 311 lock.notifyAll(); 312 } 313 } 314 315 319 325 protected abstract boolean canAdd(Message message); 326 327 334 protected abstract void doAdd(Message message); 335 336 341 protected abstract boolean hasAvailableMessage(); 342 343 350 protected abstract Message doRemove(); 351 352 359 protected abstract Message doGet(); 360 361 365 372 protected void doAddFirst(Message message) 373 { 374 Error.bug(logger, new UnsupportedOperationException ( 375 "This method is not implemented")); 376 } 377 378 385 protected void doAddLast(Message message) 386 { 387 Error.bug(logger, new UnsupportedOperationException ( 388 "This method is not implemented")); 389 } 390 391 398 protected Message doGetFirst() 399 { 400 Error.bug(logger, new UnsupportedOperationException ( 401 "This method is not implemented")); 402 return null; 403 } 404 405 412 protected Message doGetLast() 413 { 414 Error.bug(logger, new UnsupportedOperationException ( 415 "This method is not implemented")); 416 return null; 417 } 418 419 426 protected Message doRemoveFirst() 427 { 428 Error.bug(logger, new UnsupportedOperationException ( 429 "This method is not implemented")); 430 return null; 431 } 432 433 440 protected Message doRemoveLast() 441 { 442 Error.bug(logger, new UnsupportedOperationException ( 443 "This method is not implemented")); 444 return null; 445 } 446 447 451 454 public int getMaxCapacity() 455 { 456 synchronized (lock) 457 { 458 return maxCapacity; 459 } 460 } 461 462 465 public void setMaxCapacity(int maxCapacity) 466 { 467 synchronized (lock) 468 { 469 int previousCapacity = this.maxCapacity; 470 this.maxCapacity = maxCapacity; 471 if (maxCapacity > previousCapacity) 472 { 473 lock.notifyAll(); 475 } 476 } 477 } 478 479 482 public int getCurrentSize() 483 { 484 return storedMessagesIndicator; 485 } 486 487 491 495 public synchronized void bindFc(String clientItfName, Object serverItf) 496 throws NoSuchInterfaceException, IllegalBindingException, 497 IllegalLifeCycleException 498 { 499 super.bindFc(clientItfName, serverItf); 500 if (clientItfName.equals(MessageManager.ITF_NAME)) 501 { 502 messageManagerItf = (MessageManager) serverItf; 503 } 504 } 505 506 509 public String [] listFc() 510 { 511 return new String []{MessageManager.ITF_NAME}; 512 } 513 } | Popular Tags |