KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > jms > log > LogQueue


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  * Free SoftwareFoundation, Inc.
23  * 59 Temple Place, Suite 330
24  * Boston, MA 02111-1307 USA
25  *
26  * @author Scott Ferguson
27  */

28
29 package com.caucho.jms.log;
30
31 import com.caucho.config.ConfigException;
32 import com.caucho.jms.AbstractDestination;
33 import com.caucho.jms.JMSExceptionWrapper;
34 import com.caucho.jms.selector.Selector;
35 import com.caucho.log.Log;
36 import com.caucho.util.L10N;
37 import com.caucho.vfs.Path;
38 import com.caucho.vfs.TempBuffer;
39 import com.caucho.vfs.TempStream;
40 import com.caucho.vfs.WriteStream;
41
42 import javax.jms.JMSException JavaDoc;
43 import javax.jms.Message JavaDoc;
44 import javax.jms.Queue JavaDoc;
45 import java.io.IOException JavaDoc;
46 import java.io.ObjectOutputStream JavaDoc;
47 import java.io.OutputStream JavaDoc;
48 import java.util.ArrayList JavaDoc;
49 import java.util.Enumeration JavaDoc;
50 import java.util.logging.Level JavaDoc;
51 import java.util.logging.Logger JavaDoc;
52
53 /**
54  * A log queue.
55  */

56 public class LogQueue extends AbstractDestination
57   implements Queue JavaDoc {
58   static final Logger JavaDoc log = Log.open(LogQueue.class);
59   static final L10N L = new L10N(LogQueue.class);
60
61   private static final byte []RESIN = new byte[] { 'R', 'E', 'S', 'I', 'N' };
62
63   ArrayList JavaDoc<Message JavaDoc> _queue = new ArrayList JavaDoc<Message JavaDoc>();
64
65   private String JavaDoc _queueName;
66   private Selector _selector;
67
68   private Path _basePath;
69   private long _logFileSize = 10 * 1024 * 1024;
70
71   private Path _pathA;
72   private long _lengthA;
73   private TempOutputStream _tempA = new TempOutputStream();
74   private WriteStream _writeA;
75   private TempStream _outA = new TempStream();
76
77   public LogQueue()
78   {
79   }
80
81   /**
82    * Sets the queue's base log file.
83    */

84   public void setPath(Path path)
85   {
86     _basePath = path;
87   }
88
89   /**
90    * Returns the queue's name.
91    */

92   public String JavaDoc getQueueName()
93   {
94     return _queueName;
95   }
96
97   /**
98    * Sets the queue's name.
99    */

100   public void setQueueName(String JavaDoc name)
101   {
102     _queueName = name;
103   }
104
105   /**
106    * Sets the queue's selector.
107    */

108   public void setSelector(Selector selector)
109   {
110     _selector = selector;
111   }
112
113   /**
114    * Gets the queue's selector.
115    */

116   public Selector getSelector()
117   {
118     return _selector;
119   }
120
121   /**
122    * Initialize the queue.
123    */

124   public void init()
125     throws ConfigException, IOException JavaDoc
126   {
127     if (_basePath == null)
128       throw new ConfigException(L.l("LogQueue requires a <path> element."));
129
130     if (_basePath.isDirectory())
131       throw new ConfigException(L.l("<path> must be a file prefix, not a directory."));
132
133     _basePath.getParent().mkdirs();
134
135     String JavaDoc tail = _basePath.getTail();
136
137     _pathA = _basePath.getParent().lookup(tail + "_a");
138     _lengthA = _pathA.getLength();
139     _writeA = _pathA.openAppend();
140   }
141
142   /**
143    * Sends the message to the queue.
144    */

145   public void send(Message JavaDoc message)
146     throws JMSException JavaDoc
147   {
148     if (_selector != null && ! _selector.isMatch(message))
149       return;
150
151     long sequenceId = nextConsumerSequenceId();
152
153     if (log.isLoggable(Level.FINE))
154       log.fine("jms log queue:" + _queueName + " send message " + sequenceId);
155
156     try {
157       synchronized (_tempA) {
158     _tempA.clearWrite();
159     _tempA.write('S');
160     int offset = _tempA.getLength();
161     writeInt(_tempA, 0);
162     writeLong(_tempA, message.getJMSExpiration());
163
164     ObjectOutputStream JavaDoc oos = new ObjectOutputStream JavaDoc(_tempA);
165
166     oos.writeObject(message);
167
168     oos.close();
169
170     int length = _tempA.getLength() - offset;
171     writeInt(_tempA, length);
172     _tempA.write(RESIN, 0, RESIN.length);
173
174     TempBuffer ptr = _tempA.getHead();
175     if (ptr != null) {
176       byte []buffer = ptr.getBuffer();
177       buffer[1] = (byte) (length >> 24);
178       buffer[2] = (byte) (length >> 16);
179       buffer[3] = (byte) (length >> 8);
180       buffer[4] = (byte) (length);
181       
182       for (; ptr != null; ptr = ptr.getNext()) {
183         _writeA.write(ptr.getBuffer(), 0, ptr.getLength());
184       }
185       // XXX: eventually needs more complicated flush.
186
_writeA.flush();
187       
188       _tempA.clearWrite();
189       
190     }
191       }
192     } catch (Exception JavaDoc e) {
193       throw new JMSExceptionWrapper(e);
194     }
195
196     messageAvailable();
197   }
198
199   /**
200    * Removes the first message matching the selector.
201    */

202   public Message JavaDoc receive(Selector selector)
203     throws JMSException JavaDoc
204   {
205     synchronized (_queue) {
206       int i;
207       int size = _queue.size();
208
209       for (i = 0; i < size; i++) {
210     Message JavaDoc message = _queue.get(i);
211
212     if (selector == null || selector.isMatch(message)) {
213       _queue.remove(i);
214       return message;
215     }
216       }
217     }
218
219     return null;
220   }
221
222   /**
223    * Returns an enumeration of the matching messages.
224    */

225   public Enumeration JavaDoc getEnumeration(Selector selector)
226   {
227     return new BrowserEnumeration(this, selector);
228   }
229
230   /**
231    * Removes the first message matching the selector.
232    */

233   private boolean hasMessage(Selector selector)
234     throws JMSException JavaDoc
235   {
236     synchronized (_queue) {
237       int i;
238       int size = _queue.size();
239
240       for (i = 0; i < size; i++) {
241     Message JavaDoc message = _queue.get(i);
242
243     if (selector == null || selector.isMatch(message))
244       return true;
245       }
246     }
247
248     return false;
249   }
250
251   /**
252    * Writes an integer.
253    */

254   private void writeInt(OutputStream JavaDoc os, int value)
255     throws IOException JavaDoc
256   {
257     os.write(value >> 24);
258     os.write(value >> 16);
259     os.write(value >> 8);
260     os.write(value);
261   }
262
263   /**
264    * Writes a long
265    */

266   private void writeLong(OutputStream JavaDoc os, long value)
267     throws IOException JavaDoc
268   {
269     os.write((int) (value >> 56));
270     os.write((int) (value >> 48));
271     os.write((int) (value >> 40));
272     os.write((int) (value >> 32));
273     
274     os.write((int) (value >> 24));
275     os.write((int) (value >> 16));
276     os.write((int) (value >> 8));
277     os.write((int) value);
278   }
279
280   /**
281    * Returns a printable view of the queue.
282    */

283   public String JavaDoc toString()
284   {
285     return "MemoryQueue[" + _queueName + "]";
286   }
287
288   static class BrowserEnumeration implements Enumeration JavaDoc {
289     private LogQueue _queue;
290     private Selector _selector;
291
292     BrowserEnumeration(LogQueue queue, Selector selector)
293     {
294       _queue = queue;
295       _selector = selector;
296     }
297
298     public boolean hasMoreElements()
299     {
300       try {
301     return _queue.hasMessage(_selector);
302       } catch (Exception JavaDoc e) {
303     throw new RuntimeException JavaDoc(e);
304       }
305     }
306
307     public Object JavaDoc nextElement()
308     {
309       try {
310     return _queue.receive(_selector);
311       } catch (Exception JavaDoc e) {
312     throw new RuntimeException JavaDoc(e);
313       }
314     }
315   }
316
317   static class TempOutputStream extends OutputStream JavaDoc {
318     private TempStream _tempStream = new TempStream();
319     private byte []_oneBuf = new byte[1];
320     private int _length;
321
322     void clearWrite()
323     {
324       _tempStream.clearWrite();
325     }
326
327     TempBuffer getHead()
328     {
329       return _tempStream.getHead();
330     }
331
332     /**
333      * Writes a byte to the temp stream.
334      */

335     public void write(int ch)
336       throws IOException JavaDoc
337     {
338       _length++;
339       _oneBuf[0] = (byte) ch;
340       _tempStream.write(_oneBuf, 0, 1, false);
341     }
342
343     /**
344      * Writes a buffer to the temp stream.
345      */

346     public void write(byte []buffer, int offset, int length)
347       throws IOException JavaDoc
348     {
349       _length += length;
350       _tempStream.write(buffer, offset, length, false);
351     }
352
353     /**
354      * Returns the current length.
355      */

356     int getLength()
357     {
358       return _length;
359     }
360   }
361 }
362
363
Popular Tags