1 38 39 40 package org.jahia.services.cache; 41 42 import java.util.ArrayList ; 43 44 45 58 class JMSMessageQueue { 59 60 61 final private static org.apache.log4j.Logger logger = 62 org.apache.log4j.Logger.getLogger (JMSMessageQueue.class); 63 64 65 private ArrayList queue; 66 67 68 69 public JMSMessageQueue () { 70 queue = new ArrayList (); 72 } 73 74 87 public synchronized void add (final JMSCacheMessage message) { 88 if (message == null) 90 return; 91 92 94 queue.add(message); 96 if (logger.isDebugEnabled()) { 97 logger.debug("Added message " + message + " to queue. Queue size is now " + queue.size()); 98 } 99 } 100 101 private void optimizeQueue (JMSCacheMessage message) { 102 if (message.isFlush()) { 106 logger.debug("Clearing queue because of flush message"); 107 queue.clear(); 108 109 } else { 115 for (int i=0; i<queue.size(); i++) { 117 JMSCacheMessage msg = (JMSCacheMessage)queue.get(i); 118 119 if (message.isPut()) { 121 if (msg.isPut() && (message.getEntryKey().equals (msg.getEntryKey()))) { 123 logger.debug("Removing duplicate PUT message for entry key " + message.getEntryKey()); 124 queue.remove(i); 125 } 126 127 } else { 129 130 if (msg.isRemove() && (message.getEntryKey().equals(msg.getEntryKey()))) { 132 logger.debug("Removing duplicate REMOVE message for entry key " + message.getEntryKey()); 133 queue.remove(i); 134 135 } else if (msg.isPut() && (message.getEntryKey().equals(msg.getEntryKey()))) { 138 logger.debug("Removing previous PUT message because of REMOVE message on entry key " + message.getEntryKey()); 139 queue.remove(i); 140 } 141 } 142 } 143 } 144 } 145 146 152 public synchronized JMSCacheMessage[] extractMessages(int sendCountThreshold) { 153 if (queue.size() == 0) 154 return new JMSCacheMessage[0]; 155 156 JMSCacheMessage[] messages = null; 157 if (queue.size() < sendCountThreshold) { 158 messages = (JMSCacheMessage[]) queue.toArray(new JMSCacheMessage[queue.size()]); 160 161 queue.clear(); 163 } else { 164 messages = new JMSCacheMessage[sendCountThreshold]; 165 for (int i=0; i < sendCountThreshold; i++) { 166 messages[i] = (JMSCacheMessage) queue.remove(0); 167 } 168 } 169 return messages; 170 } 171 172 173 177 final public int size() { 178 return queue.size(); 179 } 180 181 } 182 | Popular Tags |