1 46 50 package org.mr.core.persistent; 51 52 import java.io.IOException ; 53 54 import java.util.Iterator ; 55 import java.util.LinkedList ; 56 import java.util.List ; 57 58 import org.apache.commons.logging.Log; 59 import org.apache.commons.logging.LogFactory; 60 61 import org.mr.core.util.SynchronizedQueue; 62 import org.mr.core.util.byteable.Byteable; 63 64 65 66 70 public class PersistentQueue extends SynchronizedQueue{ 71 72 private Log log; 74 private int itemsCount = 0; 76 private boolean defaultPersistent ; 78 79 private PersistentManager persistentManager; 80 private LinkedList underLineElementsCopy= new LinkedList (); 82 83 89 public PersistentQueue(String name ,boolean defaultPersistent , boolean blocking){ 90 super(); 92 this.defaultPersistent =defaultPersistent; 93 log=LogFactory.getLog("PersistentQueue"); 94 persistentManager = PersistentManagerFactory.getPersistentManager(name); 95 96 try { 97 recover(name); 98 } catch (IOException e) { 99 if(log.isFatalEnabled()) 100 log.fatal("Can not init persistent stracture. ",e); 101 } 102 103 } 105 106 111 private void recover(String name) throws IOException { 112 persistentManager.recover(); 113 int[] keySet = persistentManager.getKeys(); 114 if(log.isInfoEnabled() && keySet.length >0 ){ 115 log.info("Recoverd "+name+". There are "+keySet.length+" elements there."); 116 } 117 for(int index =0 ; index < keySet.length; index++){ 118 119 Object entry =persistentManager.getPersistentObject(keySet[index]); 120 PersistentEvent event = new PersistentEvent(); 121 event.setEntry((Byteable)entry); 122 event.setEntryName(keySet[index]); 123 event.setPersistentState(true); 124 super.enqueue(event); 125 underLineElementsCopy.add(entry); 126 itemsCount++; 129 } 130 } 132 135 synchronized public boolean enqueue(Object o){ 136 return enqueue(o,defaultPersistent); 137 } 139 144 synchronized public boolean enqueue(Object o , boolean persistent){ 145 PersistentEvent event = new PersistentEvent(); 146 event.setEntry((Byteable)o); 147 int name = getValidFreePersistentName(); 148 event.setEntryName(name); 149 event.setPersistentState(persistent); 150 151 if(persistent){ 152 try{ 153 persistentManager.savePersistentObject(event.getEntryName(),event.getEntry()); 154 155 }catch(IOException e){ 156 if(log.isFatalEnabled()) 157 log.fatal("Can not save persistent stracture. ",e); 158 } 159 } 160 boolean b =super.enqueue(event); 161 underLineElementsCopy.add(o); 162 return b; 163 } 165 168 private int getValidFreePersistentName(){ 169 int result = itemsCount; 170 itemsCount++; 171 if(itemsCount >= PersistentManager.MAX_FILES_PER_STORAGE){ 172 itemsCount = 1; 173 } 174 175 return result; 176 } 177 178 181 synchronized public Object dequeue(){ 182 PersistentEvent event = (PersistentEvent)super.dequeue(); 183 if(event.isPersistentState()) 184 persistentManager.deletePersistentObject(event.getEntryName()); 185 underLineElementsCopy.remove(event.getEntry()); 186 return event.getEntry(); 187 } 188 189 194 public synchronized boolean removeElement(Object toBRemoved){ 195 List list = getUnderlineList(); 196 Iterator elements = list.iterator(); 197 while(elements.hasNext()){ 198 PersistentEvent event =(PersistentEvent)elements.next(); 199 if(event.getEntry() ==toBRemoved ){ 200 elements.remove(); 201 if(event.isPersistentState()) 202 persistentManager.deletePersistentObject(event.getEntryName()); 203 underLineElementsCopy.remove(toBRemoved); 204 205 return true; 206 } 207 } 208 return false; 209 210 } 211 214 synchronized public Object dequeueNoBlock(){ 215 PersistentEvent event = (PersistentEvent)super.dequeueNoBlock(); 216 if(event != null){ 217 persistentManager.deletePersistentObject(event.getEntryName()); 218 return event.getEntry(); 219 }else{ 220 return null; 221 } 222 } 223 224 synchronized public LinkedList copyUnderlineElementsList(){ 225 226 return new LinkedList (underLineElementsCopy); 227 } 228 229 230 231 } 232 | Popular Tags |