KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > jms > jdbc > JdbcQueueConsumer


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  *
23  * Free Software Foundation, Inc.
24  * 59 Temple Place, Suite 330
25  * Boston, MA 02111-1307 USA
26  *
27  * @author Scott Ferguson
28  */

29
30 package com.caucho.jms.jdbc;
31
32 import com.caucho.jms.JMSExceptionWrapper;
33 import com.caucho.jms.message.MessageImpl;
34 import com.caucho.jms.session.MessageConsumerImpl;
35 import com.caucho.jms.session.SessionImpl;
36 import com.caucho.log.Log;
37 import com.caucho.util.Alarm;
38 import com.caucho.util.AlarmListener;
39 import com.caucho.util.L10N;
40
41 import javax.jms.JMSException JavaDoc;
42 import javax.jms.Queue JavaDoc;
43 import javax.jms.QueueReceiver JavaDoc;
44 import javax.sql.DataSource JavaDoc;
45 import java.io.IOException JavaDoc;
46 import java.sql.Connection JavaDoc;
47 import java.sql.PreparedStatement JavaDoc;
48 import java.sql.ResultSet JavaDoc;
49 import java.sql.SQLException JavaDoc;
50 import java.util.logging.Level JavaDoc;
51 import java.util.logging.Logger JavaDoc;
52
53 /**
54  * Represents a JDBC queue consumer.
55  */

56 public class JdbcQueueConsumer extends MessageConsumerImpl
57   implements QueueReceiver JavaDoc, AlarmListener {
58   static final Logger JavaDoc log = Log.open(JdbcQueueConsumer.class);
59   static final L10N L = new L10N(JdbcQueueConsumer.class);
60
61   private final static long QUEUE_TIMEOUT = 3600 * 1000L;
62   
63   private JdbcManager _jdbcManager;
64
65   private JdbcQueue _queue;
66
67   private long _consumerId;
68
69   private boolean _autoAck;
70   
71   private boolean _isClosed;
72   private Alarm _alarm;
73
74   private long _lastPurgeTime;
75
76   public JdbcQueueConsumer(SessionImpl session, String JavaDoc messageSelector,
77                JdbcManager jdbcManager, JdbcQueue queue)
78     throws JMSException JavaDoc
79   {
80     super(session, messageSelector, queue, false);
81     
82     _jdbcManager = jdbcManager;
83     _queue = queue;
84
85     if (session.getAcknowledgeMode() == session.AUTO_ACKNOWLEDGE ||
86     session.getAcknowledgeMode() == session.DUPS_OK_ACKNOWLEDGE)
87       _autoAck = true;
88
89     createQueue();
90
91     _alarm = new Alarm(this, QUEUE_TIMEOUT / 4);
92
93     if (log.isLoggable(Level.FINE))
94       log.fine("JdbcQueueConsumer[" + queue + "," + _consumerId + "] created");
95   }
96
97   /**
98    * Returns the queue.
99    */

100   public Queue JavaDoc getQueue()
101   {
102     return _queue;
103   }
104
105   /**
106    * Creates an ephemeral topic.
107    */

108   private void createQueue()
109     throws JMSException JavaDoc
110   {
111     try {
112       DataSource JavaDoc dataSource = _jdbcManager.getDataSource();
113       String JavaDoc consumerTable = _jdbcManager.getConsumerTable();
114       String JavaDoc consumerSequence = _jdbcManager.getConsumerSequence();
115       String JavaDoc messageTable = _jdbcManager.getMessageTable();
116     
117       Connection JavaDoc conn = dataSource.getConnection();
118       try {
119     if (consumerSequence != null) {
120       String JavaDoc sql = _jdbcManager.getMetaData().selectSequenceSQL(consumerSequence);
121       PreparedStatement JavaDoc pstmt = conn.prepareStatement(sql);
122
123       long id = 0;
124
125       ResultSet JavaDoc rs = pstmt.executeQuery();
126       if (rs.next())
127         id = rs.getLong(1);
128       else
129         throw new RuntimeException JavaDoc("Expected result in customer create");
130
131       sql = ("INSERT INTO " + consumerTable +
132          " (s_id, queue, expire) VALUES (?, ?,?)");
133
134       pstmt = conn.prepareStatement(sql);
135
136       pstmt.setLong(1, id);
137       pstmt.setInt(2, _queue.getId());
138       pstmt.setLong(3, Alarm.getCurrentTime() + QUEUE_TIMEOUT);
139
140       pstmt.executeUpdate();
141     }
142     else {
143       String JavaDoc sql = ("INSERT INTO " + consumerTable +
144             " (queue, expire) VALUES (?,?)");
145
146       PreparedStatement JavaDoc pstmt;
147       pstmt = conn.prepareStatement(sql,
148                     PreparedStatement.RETURN_GENERATED_KEYS);
149
150       pstmt.setInt(1, _queue.getId());
151       pstmt.setLong(2, Alarm.getCurrentTime() + QUEUE_TIMEOUT);
152
153       pstmt.executeUpdate();
154
155       ResultSet JavaDoc rsKeys = pstmt.getGeneratedKeys();
156
157       if (rsKeys.next()) {
158         _consumerId = rsKeys.getLong(1);
159       }
160       else
161         throw new JMSException JavaDoc(L.l("consumer insert didn't create a key"));
162
163       rsKeys.close();
164       pstmt.close();
165     }
166       } finally {
167     conn.close();
168       }
169     } catch (SQLException JavaDoc e) {
170       throw new JMSExceptionWrapper(e);
171     }
172   }
173
174   /**
175    * Deletes an ephemeral queue.
176    */

177   private void deleteQueue()
178     throws JMSException JavaDoc
179   {
180     try {
181       DataSource JavaDoc dataSource = _jdbcManager.getDataSource();
182       String JavaDoc consumerTable = _jdbcManager.getConsumerTable();
183     
184       Connection JavaDoc conn = dataSource.getConnection();
185       try {
186     String JavaDoc sql = ("DELETE FROM " + consumerTable +
187               " WHERE s_id=?");
188     
189     PreparedStatement JavaDoc pstmt;
190     pstmt = conn.prepareStatement(sql);
191     pstmt.setLong(1, _consumerId);
192
193     pstmt.executeUpdate();
194     pstmt.close();
195       } finally {
196     conn.close();
197       }
198     } catch (SQLException JavaDoc e) {
199       throw new JMSExceptionWrapper(e);
200     }
201   }
202
203   /**
204    * Receives a message from the queue.
205    */

206   protected MessageImpl receiveImpl()
207     throws JMSException JavaDoc
208   {
209     try {
210       purgeExpiredConsumers();
211       _queue.purgeExpiredMessages();
212       
213       long minId = -1;
214
215       DataSource JavaDoc dataSource = _jdbcManager.getDataSource();
216       String JavaDoc messageTable = _jdbcManager.getMessageTable();
217       JdbcMessage jdbcMessage = _jdbcManager.getJdbcMessage();
218     
219       Connection JavaDoc conn = dataSource.getConnection();
220       try {
221     String JavaDoc sql = ("SELECT m_id, msg_type, delivered, body, header" +
222               " FROM " + messageTable +
223               " WHERE ?<m_id AND queue=?" +
224               " AND consumer IS NULL AND ?<=expire" +
225               " ORDER BY m_id");
226
227     PreparedStatement JavaDoc selectStmt = conn.prepareStatement(sql);
228
229     try {
230       selectStmt.setFetchSize(1);
231     } catch (Throwable JavaDoc e) {
232       log.log(Level.FINER, e.toString(), e);
233     }
234
235     if (_autoAck) {
236       sql = ("DELETE FROM " + messageTable +
237          " WHERE m_id=? AND consumer IS NULL");
238     }
239     else
240       sql = ("UPDATE " + messageTable +
241          " SET consumer=?, delivered=1" +
242          " WHERE m_id=? AND consumer IS NULL");
243       
244     PreparedStatement JavaDoc updateStmt = conn.prepareStatement(sql);
245
246     long id = -1;
247     while (true) {
248       id = -1;
249
250       selectStmt.setLong(1, minId);
251       selectStmt.setInt(2, _queue.getId());
252       selectStmt.setLong(3, Alarm.getCurrentTime());
253
254       MessageImpl msg = null;
255
256       ResultSet JavaDoc rs = selectStmt.executeQuery();
257       while (rs.next()) {
258         id = rs.getLong(1);
259
260         minId = id;
261
262         msg = jdbcMessage.readMessage(rs);
263
264         if (_selector == null || _selector.isMatch(msg))
265           break;
266         else
267           msg = null;
268       }
269
270       rs.close();
271
272       if (msg == null)
273         return null;
274
275       if (_autoAck) {
276         updateStmt.setLong(1, id);
277       }
278       else {
279         updateStmt.setLong(1, _consumerId);
280         updateStmt.setLong(2, id);
281       }
282       
283       int updateCount = updateStmt.executeUpdate();
284     
285       if (updateCount == 1)
286         return msg;
287     }
288       } finally {
289     conn.close();
290       }
291     } catch (IOException JavaDoc e) {
292       throw new JMSExceptionWrapper(e);
293     } catch (SQLException JavaDoc e) {
294       throw new JMSExceptionWrapper(e);
295     }
296   }
297
298   /**
299    * Acknowledges all received messages from the session.
300    */

301   public void acknowledge()
302     throws JMSException JavaDoc
303   {
304     if (_autoAck)
305       return;
306     
307     try {
308       DataSource JavaDoc dataSource = _jdbcManager.getDataSource();
309       String JavaDoc messageTable = _jdbcManager.getMessageTable();
310     
311       Connection JavaDoc conn = dataSource.getConnection();
312
313       try {
314     String JavaDoc sql = ("DELETE FROM " + messageTable + " " +
315               "WHERE consumer=?");
316       
317     PreparedStatement JavaDoc pstmt;
318     pstmt = conn.prepareStatement(sql);
319
320     pstmt.setLong(1, _consumerId);
321
322     pstmt.executeUpdate();
323
324     pstmt.close();
325       } finally {
326     conn.close();
327       }
328     } catch (SQLException JavaDoc e) {
329       throw new JMSExceptionWrapper(e);
330     }
331   }
332
333   /**
334    * Rollback all received messages from the session.
335    */

336   public void rollback()
337     throws JMSException JavaDoc
338   {
339     if (_autoAck)
340       return;
341     
342     try {
343       DataSource JavaDoc dataSource = _jdbcManager.getDataSource();
344       String JavaDoc messageTable = _jdbcManager.getMessageTable();
345     
346       Connection JavaDoc conn = dataSource.getConnection();
347
348       try {
349     String JavaDoc sql = ("UPDATE " + messageTable +
350               " SET consumer=NULL " +
351               " WHERE consumer=?");
352       
353     PreparedStatement JavaDoc pstmt;
354     pstmt = conn.prepareStatement(sql);
355
356     pstmt.setLong(1, _consumerId);
357
358     pstmt.executeUpdate();
359
360     pstmt.close();
361       } finally {
362     conn.close();
363       }
364     } catch (SQLException JavaDoc e) {
365       throw new JMSExceptionWrapper(e);
366     }
367   }
368
369   /**
370    * Purges expired consumers.
371    */

372   private void purgeExpiredConsumers()
373   {
374     long now = Alarm.getCurrentTime();
375
376     if (now < _lastPurgeTime + QUEUE_TIMEOUT)
377       return;
378
379     _lastPurgeTime = now;
380     
381     try {
382       DataSource JavaDoc dataSource = _jdbcManager.getDataSource();
383       String JavaDoc messageTable = _jdbcManager.getMessageTable();
384       String JavaDoc consumerTable = _jdbcManager.getConsumerTable();
385       JdbcMessage jdbcMessage = _jdbcManager.getJdbcMessage();
386     
387       Connection JavaDoc conn = dataSource.getConnection();
388       try {
389     String JavaDoc sql = ("UPDATE " + messageTable +
390               " SET consumer=NULL" +
391               " WHERE consumer IS NOT NULL" +
392               " AND EXISTS(SELECT * FROM " + consumerTable +
393               " WHERE s_id=consumer AND expire<?)");
394
395     PreparedStatement JavaDoc pstmt = conn.prepareStatement(sql);
396     pstmt.setLong(1, Alarm.getCurrentTime());
397
398     int count = pstmt.executeUpdate();
399     pstmt.close();
400
401     if (count > 0)
402       log.fine("JMSQueue[" + _queue.getName() + "] recovered " + count + " messages");
403
404     sql = ("DELETE FROM " + consumerTable +
405            " WHERE expire<?");
406
407     pstmt = conn.prepareStatement(sql);
408     pstmt.setLong(1, Alarm.getCurrentTime());
409
410     pstmt.executeUpdate();
411       } finally {
412     conn.close();
413       }
414     } catch (Exception JavaDoc e) {
415       log.log(Level.FINER, e.toString(), e);
416     }
417   }
418
419   /**
420    * Handles the alarm by updating the expire count.
421    */

422   public void handleAlarm(Alarm alarm)
423   {
424     if (_isClosed)
425       return;
426
427     try {
428       Connection JavaDoc conn = _jdbcManager.getDataSource().getConnection();
429       try {
430     String JavaDoc consumerTable = _jdbcManager.getConsumerTable();
431     
432     String JavaDoc sql = ("UPDATE " + consumerTable +
433               " SET expire=?" +
434               " WHERE s_id=?");
435
436     PreparedStatement JavaDoc pstmt = conn.prepareStatement(sql);
437     pstmt.setLong(1, Alarm.getCurrentTime() + QUEUE_TIMEOUT);
438     pstmt.setLong(2, _consumerId);
439
440     pstmt.executeUpdate();
441       } finally {
442     conn.close();
443       }
444     } catch (Throwable JavaDoc e) {
445       log.log(Level.WARNING, e.toString(), e);
446     } finally {
447       _alarm.queue(QUEUE_TIMEOUT / 4);
448     }
449   }
450
451   /**
452    * Closes the consumer.
453    */

454   public void close()
455     throws JMSException JavaDoc
456   {
457     if (_isClosed)
458       return;
459     _isClosed = true;
460
461     _alarm.dequeue();
462     
463     try {
464       deleteQueue();
465     } catch (Throwable JavaDoc e) {
466       log.log(Level.WARNING, e.toString(), e);
467     }
468     
469     super.close();
470   }
471
472   /**
473    * Returns a printable view of the queue.
474    */

475   public String JavaDoc toString()
476   {
477     return "JdbcQueueConsumer[" + _queue + "," + _consumerId + "]";
478   }
479 }
480
481
Popular Tags