1 28 29 package com.caucho.jms.log; 30 31 import com.caucho.config.ConfigException; 32 import com.caucho.jms.AbstractDestination; 33 import com.caucho.jms.JMSExceptionWrapper; 34 import com.caucho.jms.selector.Selector; 35 import com.caucho.log.Log; 36 import com.caucho.util.L10N; 37 import com.caucho.vfs.Path; 38 import com.caucho.vfs.TempBuffer; 39 import com.caucho.vfs.TempStream; 40 import com.caucho.vfs.WriteStream; 41 42 import javax.jms.JMSException ; 43 import javax.jms.Message ; 44 import javax.jms.Queue ; 45 import java.io.IOException ; 46 import java.io.ObjectOutputStream ; 47 import java.io.OutputStream ; 48 import java.util.ArrayList ; 49 import java.util.Enumeration ; 50 import java.util.logging.Level ; 51 import java.util.logging.Logger ; 52 53 56 public class LogQueue extends AbstractDestination 57 implements Queue { 58 static final Logger log = Log.open(LogQueue.class); 59 static final L10N L = new L10N(LogQueue.class); 60 61 private static final byte []RESIN = new byte[] { 'R', 'E', 'S', 'I', 'N' }; 62 63 ArrayList <Message > _queue = new ArrayList <Message >(); 64 65 private String _queueName; 66 private Selector _selector; 67 68 private Path _basePath; 69 private long _logFileSize = 10 * 1024 * 1024; 70 71 private Path _pathA; 72 private long _lengthA; 73 private TempOutputStream _tempA = new TempOutputStream(); 74 private WriteStream _writeA; 75 private TempStream _outA = new TempStream(); 76 77 public LogQueue() 78 { 79 } 80 81 84 public void setPath(Path path) 85 { 86 _basePath = path; 87 } 88 89 92 public String getQueueName() 93 { 94 return _queueName; 95 } 96 97 100 public void setQueueName(String name) 101 { 102 _queueName = name; 103 } 104 105 108 public void setSelector(Selector selector) 109 { 110 _selector = selector; 111 } 112 113 116 public Selector getSelector() 117 { 118 return _selector; 119 } 120 121 124 public void init() 125 throws ConfigException, IOException  126 { 127 if (_basePath == null) 128 throw new ConfigException(L.l("LogQueue requires a <path> element.")); 129 130 if (_basePath.isDirectory()) 131 throw new ConfigException(L.l("<path> must be a file prefix, not a directory.")); 132 133 _basePath.getParent().mkdirs(); 134 135 String tail = _basePath.getTail(); 136 137 _pathA = _basePath.getParent().lookup(tail + "_a"); 138 _lengthA = _pathA.getLength(); 139 _writeA = _pathA.openAppend(); 140 } 141 142 145 public void send(Message message) 146 throws JMSException  147 { 148 if (_selector != null && ! _selector.isMatch(message)) 149 return; 150 151 long sequenceId = nextConsumerSequenceId(); 152 153 if (log.isLoggable(Level.FINE)) 154 log.fine("jms log queue:" + _queueName + " send message " + sequenceId); 155 156 try { 157 synchronized (_tempA) { 158 _tempA.clearWrite(); 159 _tempA.write('S'); 160 int offset = _tempA.getLength(); 161 writeInt(_tempA, 0); 162 writeLong(_tempA, message.getJMSExpiration()); 163 164 ObjectOutputStream oos = new ObjectOutputStream (_tempA); 165 166 oos.writeObject(message); 167 168 oos.close(); 169 170 int length = _tempA.getLength() - offset; 171 writeInt(_tempA, length); 172 _tempA.write(RESIN, 0, RESIN.length); 173 174 TempBuffer ptr = _tempA.getHead(); 175 if (ptr != null) { 176 byte []buffer = ptr.getBuffer(); 177 buffer[1] = (byte) (length >> 24); 178 buffer[2] = (byte) (length >> 16); 179 buffer[3] = (byte) (length >> 8); 180 buffer[4] = (byte) (length); 181 182 for (; ptr != null; ptr = ptr.getNext()) { 183 _writeA.write(ptr.getBuffer(), 0, ptr.getLength()); 184 } 185 _writeA.flush(); 187 188 _tempA.clearWrite(); 189 190 } 191 } 192 } catch (Exception e) { 193 throw new JMSExceptionWrapper(e); 194 } 195 196 messageAvailable(); 197 } 198 199 202 public Message receive(Selector selector) 203 throws JMSException  204 { 205 synchronized (_queue) { 206 int i; 207 int size = _queue.size(); 208 209 for (i = 0; i < size; i++) { 210 Message message = _queue.get(i); 211 212 if (selector == null || selector.isMatch(message)) { 213 _queue.remove(i); 214 return message; 215 } 216 } 217 } 218 219 return null; 220 } 221 222 225 public Enumeration getEnumeration(Selector selector) 226 { 227 return new BrowserEnumeration(this, selector); 228 } 229 230 233 private boolean hasMessage(Selector selector) 234 throws JMSException  235 { 236 synchronized (_queue) { 237 int i; 238 int size = _queue.size(); 239 240 for (i = 0; i < size; i++) { 241 Message message = _queue.get(i); 242 243 if (selector == null || selector.isMatch(message)) 244 return true; 245 } 246 } 247 248 return false; 249 } 250 251 254 private void writeInt(OutputStream os, int value) 255 throws IOException  256 { 257 os.write(value >> 24); 258 os.write(value >> 16); 259 os.write(value >> 8); 260 os.write(value); 261 } 262 263 266 private void writeLong(OutputStream os, long value) 267 throws IOException  268 { 269 os.write((int) (value >> 56)); 270 os.write((int) (value >> 48)); 271 os.write((int) (value >> 40)); 272 os.write((int) (value >> 32)); 273 274 os.write((int) (value >> 24)); 275 os.write((int) (value >> 16)); 276 os.write((int) (value >> 8)); 277 os.write((int) value); 278 } 279 280 283 public String toString() 284 { 285 return "MemoryQueue[" + _queueName + "]"; 286 } 287 288 static class BrowserEnumeration implements Enumeration { 289 private LogQueue _queue; 290 private Selector _selector; 291 292 BrowserEnumeration(LogQueue queue, Selector selector) 293 { 294 _queue = queue; 295 _selector = selector; 296 } 297 298 public boolean hasMoreElements() 299 { 300 try { 301 return _queue.hasMessage(_selector); 302 } catch (Exception e) { 303 throw new RuntimeException (e); 304 } 305 } 306 307 public Object nextElement() 308 { 309 try { 310 return _queue.receive(_selector); 311 } catch (Exception e) { 312 throw new RuntimeException (e); 313 } 314 } 315 } 316 317 static class TempOutputStream extends OutputStream { 318 private TempStream _tempStream = new TempStream(); 319 private byte []_oneBuf = new byte[1]; 320 private int _length; 321 322 void clearWrite() 323 { 324 _tempStream.clearWrite(); 325 } 326 327 TempBuffer getHead() 328 { 329 return _tempStream.getHead(); 330 } 331 332 335 public void write(int ch) 336 throws IOException  337 { 338 _length++; 339 _oneBuf[0] = (byte) ch; 340 _tempStream.write(_oneBuf, 0, 1, false); 341 } 342 343 346 public void write(byte []buffer, int offset, int length) 347 throws IOException  348 { 349 _length += length; 350 _tempStream.write(buffer, offset, length, false); 351 } 352 353 356 int getLength() 357 { 358 return _length; 359 } 360 } 361 } 362 363 | Popular Tags |