1 18 package org.apache.activemq.memory.buffer; 19 20 import java.util.ArrayList ; 21 import java.util.Iterator ; 22 import java.util.List ; 23 24 29 public class SizeBasedMessageBuffer implements MessageBuffer { 30 31 private int limit = 100 * 64 * 1024; 32 private List bubbleList = new ArrayList (); 33 private int size; 34 private Object lock = new Object (); 35 36 public SizeBasedMessageBuffer() { 37 } 38 39 public SizeBasedMessageBuffer(int limit) { 40 this.limit = limit; 41 } 42 43 public int getSize() { 44 synchronized (lock) { 45 return size; 46 } 47 } 48 49 52 public MessageQueue createMessageQueue() { 53 MessageQueue queue = new MessageQueue(this); 54 synchronized (lock) { 55 queue.setPosition(bubbleList.size()); 56 bubbleList.add(queue); 57 } 58 return queue; 59 } 60 61 67 public void onSizeChanged(MessageQueue queue, int delta, int queueSize) { 68 synchronized (lock) { 69 bubbleUp(queue, queueSize); 70 71 size += delta; 72 while (size > limit) { 73 MessageQueue biggest = (MessageQueue) bubbleList.get(0); 74 size -= biggest.evictMessage(); 75 76 bubbleDown(biggest, 0); 77 } 78 } 79 } 80 81 public void clear() { 82 synchronized (lock) { 83 for (Iterator iter = bubbleList.iterator(); iter.hasNext();) { 84 MessageQueue queue = (MessageQueue) iter.next(); 85 queue.clear(); 86 } 87 size = 0; 88 } 89 } 90 91 protected void bubbleUp(MessageQueue queue, int queueSize) { 92 int position = queue.getPosition(); 94 while (--position >= 0) { 95 MessageQueue pivot = (MessageQueue) bubbleList.get(position); 96 if (pivot.getSize() < queueSize) { 97 swap(position, pivot, position + 1, queue); 98 } 99 else { 100 break; 101 } 102 } 103 } 104 105 protected void bubbleDown(MessageQueue biggest, int position) { 106 int queueSize = biggest.getSize(); 107 for (int second = position + 1, end = bubbleList.size(); second < end; second++) { 108 MessageQueue pivot = (MessageQueue) bubbleList.get(second); 109 if (pivot.getSize() > queueSize) { 110 swap(position, biggest, second, pivot); 111 } 112 else { 113 break; 114 } 115 position = second; 116 } 117 } 118 119 protected void swap(int firstPosition, MessageQueue first, int secondPosition, MessageQueue second) { 120 bubbleList.set(firstPosition, second); 121 bubbleList.set(secondPosition, first); 122 first.setPosition(secondPosition); 123 second.setPosition(firstPosition); 124 } 125 } 126 | Popular Tags |