1 package com.ubermq.jms.server.journal.impl; 2 3 import com.ubermq.jms.common.datagram.*; 4 import com.ubermq.kernel.*; 5 import java.io.*; 6 import java.nio.*; 7 import java.util.*; 8 9 import com.ubermq.jms.server.journal.IJournal; 10 import com.ubermq.kernel.overflow.DropIncoming; 11 import java.nio.channels.FileChannel ; 12 13 24 public final class SimpleJournal 25 implements IJournal 26 { 27 private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(SimpleJournal.class); 28 29 private RandomAccessFile logFile; 30 private FileChannel journalChannel; 31 private MappedByteBuffer logfileBuffer; 32 private IntBuffer offsetBuffer; 33 private ByteBuffer journalBuffer; 34 private Map messageOffsets; 35 36 private IDatagramFactory factory; 37 38 private static final int POSITION_INDEX = 0; 39 private static final int LIMIT_INDEX = 1; 40 private static final int OFFSET_DATA_SIZE = 8; 41 42 private static final int OFFSET_INITIAL_MAP_SIZE = 50; 43 44 55 public SimpleJournal(File f, 56 IDatagramFactory factory, 57 long maximumSize) 58 throws IOException 59 { 60 this.factory = factory; 61 init(f, maximumSize); 62 } 63 64 private void init(File f, long size) 65 throws IOException 66 { 67 this.logFile = new RandomAccessFile(f, "rw"); 68 69 this.journalChannel = logFile.getChannel(); 72 73 this.logfileBuffer = journalChannel.map(FileChannel.MapMode.READ_WRITE, 0, size); 76 this.offsetBuffer = ((ByteBuffer)logfileBuffer.slice().limit(OFFSET_DATA_SIZE)).asIntBuffer(); 77 logfileBuffer.position(OFFSET_DATA_SIZE); 78 this.journalBuffer = logfileBuffer.slice(); 79 80 restoreWritePosition(); 82 83 messageOffsets = new HashMap(OFFSET_INITIAL_MAP_SIZE); 85 } 86 87 91 public void destroy() 92 { 93 checkpoint(0); 94 limit(0); 95 } 96 97 public void close() 98 { 99 try 100 { 101 journalChannel.close(); 102 logFile.close(); 103 } 104 catch (IOException e) { 105 log.error("", e); 106 } 107 } 108 109 116 private synchronized int handleOverflow(IDatagram d, int position, IOverflowHandler h) 117 { 118 log.debug("handleOverflow enetered."); 119 120 restoreCheckpoint(); 122 journalBuffer.compact(); 123 124 int action = IOverflowHandler.ACTION_RETRY; 126 int moved = journalBuffer.position() - position; 127 if (moved >= 0 && position > 0) 128 action = h.overflow(d); 129 130 Iterator iter = messageOffsets.keySet().iterator(); 133 while (iter.hasNext()) 134 { 135 Object mid = iter.next(); 136 137 int oldOffset = ((Integer )messageOffsets.get(mid)).intValue(); 140 int newOffset = oldOffset + moved; 141 142 if (newOffset >= 0) 145 messageOffsets.put(mid, new Integer (newOffset)); 146 else iter.remove(); 147 } 148 149 compact(moved); 150 return action; 151 } 152 153 private synchronized void restoreCheckpoint() 154 { 155 int position = offsetBuffer.get(POSITION_INDEX); 156 int limit = offsetBuffer.get(LIMIT_INDEX); 157 158 journalBuffer.position(position); 159 journalBuffer.limit(limit); 160 161 log.debug("restored checkpoint to " + journalBuffer.position() + " to " + journalBuffer.limit()); 162 } 163 164 private synchronized void restoreWritePosition() 165 { 166 int limit = offsetBuffer.get(LIMIT_INDEX); 167 168 journalBuffer.limit(journalBuffer.capacity()); 169 journalBuffer.position(limit); 170 171 log.debug("restored write position to " + journalBuffer.position()); 172 } 173 174 private synchronized void checkpoint(int checkpt) 175 { 176 int writing = Math.min(offsetBuffer.get(LIMIT_INDEX), 177 Math.max(offsetBuffer.get(POSITION_INDEX), checkpt)); 178 log.debug("checkpointing " + writing + " requested = " + checkpt); 179 offsetBuffer.put(POSITION_INDEX, writing); 180 logfileBuffer.force(); 181 } 182 183 private synchronized void limit(int limit) 184 { 185 log.debug("limit is " + limit); 186 offsetBuffer.put(LIMIT_INDEX, limit); 187 logfileBuffer.force(); 188 } 189 190 private synchronized void compact(int delta) 191 { 192 log.debug("compacting by " + delta); 193 offsetBuffer.put(POSITION_INDEX, offsetBuffer.get(POSITION_INDEX) + delta); 194 offsetBuffer.put(LIMIT_INDEX, offsetBuffer.get(LIMIT_INDEX) + delta); 195 logfileBuffer.force(); 196 } 197 198 201 public void ack(MessageId id) 202 { 203 log.debug("waiting for ack on: " + messageOffsets ); 204 205 Integer offset = (Integer )messageOffsets.get(id); 209 if (offset != null) { 210 checkpoint(offset.intValue()); 211 212 messageOffsets.remove(id); 214 } 215 } 216 217 221 public boolean isOpen() 222 { 223 return true; 224 } 225 226 234 public void checkpoint() 235 { 236 } 237 238 245 public void output(IDatagram d, IOverflowHandler h) 246 { 247 if (d instanceof IMessageDatagram) 248 { 249 final int position = journalBuffer.position(); 250 try { 251 ByteBuffer write = journalBuffer.slice(); 252 factory.outgoing(write, d); 253 journalBuffer.position(journalBuffer.position() + write.position()); 254 limit(journalBuffer.position()); 255 256 if (d instanceof IMessageDatagram) 258 messageOffsets.put(((IMessageDatagram)d).getMessageId(), new Integer (journalBuffer.position())); 259 } catch(BufferOverflowException boe) { 260 if (handleOverflow(d, position, h) == IOverflowHandler.ACTION_RETRY) 262 output(d, h); 263 } 264 } 265 } 266 267 public void recover(final IMessageProcessor p) 268 { 269 int position = offsetBuffer.get(POSITION_INDEX); 271 int limit = offsetBuffer.get(LIMIT_INDEX); 272 final ByteBuffer recoveryBuffer = journalBuffer.duplicate(); 273 recoveryBuffer.position(position).limit(limit); 274 275 TrivialConnectionInfo recoveryManager = new TrivialConnectionInfo( 278 new IMessageProcessor() { 279 public void remove(IConnectionInfo conn) {p.remove(conn);} 280 public void accept(IConnectionInfo conn) {p.accept(conn);} 281 public void process(IConnectionInfo conn, IDatagram read) 282 { 283 if (read instanceof IMessageDatagram) { 290 IMessageDatagram md = (IMessageDatagram)read; 291 292 messageOffsets.put(md.getMessageId(), 293 new Integer (recoveryBuffer.position())); 294 295 } 296 297 p.process(conn, read); 299 } 300 }, 301 factory, 302 recoveryBuffer); 303 304 recoveryManager.processData(); 306 307 restoreWritePosition(); 309 } 310 311 314 private final static class TrivialConnectionInfo 315 extends AbstractConnectionInfo 316 { 317 private ByteBuffer theBuffer; 318 319 private TrivialConnectionInfo(IMessageProcessor mp, 320 IDatagramFactory f, 321 ByteBuffer b) 322 { 323 super(mp, f, b, b); 324 this.theBuffer = b; 325 } 326 327 public void start() 328 { 329 } 330 331 public void stop() 332 { 333 } 334 335 public void close() 336 { 337 } 338 339 public final int doWrite(ByteBuffer writeBuffer) throws IOException 340 { 341 return 0; 345 } 346 347 protected void preProcessData() 348 { 349 } 356 357 protected void postProcessData() 358 { 359 } 360 } 361 362 363 } 364 | Popular Tags |