KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > jms > jdbc > JdbcTopicConsumer


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  * Free SoftwareFoundation, Inc.
23  * 59 Temple Place, Suite 330
24  * Boston, MA 02111-1307 USA
25  *
26  * @author Scott Ferguson
27  */

28
29 package com.caucho.jms.jdbc;
30
31 import com.caucho.jms.JMSExceptionWrapper;
32 import com.caucho.jms.message.MessageImpl;
33 import com.caucho.jms.session.MessageConsumerImpl;
34 import com.caucho.jms.session.SessionImpl;
35 import com.caucho.log.Log;
36 import com.caucho.util.Alarm;
37 import com.caucho.util.AlarmListener;
38 import com.caucho.util.L10N;
39
40 import javax.jms.JMSException JavaDoc;
41 import javax.jms.Topic JavaDoc;
42 import javax.jms.TopicSubscriber JavaDoc;
43 import javax.sql.DataSource JavaDoc;
44 import java.io.IOException JavaDoc;
45 import java.sql.Connection JavaDoc;
46 import java.sql.PreparedStatement JavaDoc;
47 import java.sql.ResultSet JavaDoc;
48 import java.sql.SQLException JavaDoc;
49 import java.util.logging.Level JavaDoc;
50 import java.util.logging.Logger JavaDoc;
51
52 /**
53  * Represents a JDBC topic consumer.
54  */

55 public class JdbcTopicConsumer extends MessageConsumerImpl
56   implements AlarmListener, TopicSubscriber JavaDoc {
57   static final Logger JavaDoc log = Log.open(JdbcTopicConsumer.class);
58   static final L10N L = new L10N(JdbcTopicConsumer.class);
59
60   private final static long TOPIC_TIMEOUT = 3600 * 1000L;
61   
62   private JdbcManager _jdbcManager;
63
64   private JdbcTopic _topic;
65
66   private String JavaDoc _subscriber;
67   private long _consumerId;
68
69   private long _lastPurgeTime;
70   private boolean _isClosed;
71   private Alarm _alarm;
72
73   public JdbcTopicConsumer(SessionImpl session, String JavaDoc messageSelector,
74                JdbcManager jdbcManager, JdbcTopic topic,
75                boolean noLocal)
76     throws JMSException JavaDoc
77   {
78     super(session, messageSelector, topic, noLocal);
79     
80     _jdbcManager = jdbcManager;
81     _topic = topic;
82
83     createTopic();
84
85     _alarm = new Alarm(this, TOPIC_TIMEOUT / 4);
86   }
87
88   public JdbcTopicConsumer(SessionImpl session, String JavaDoc messageSelector,
89                JdbcManager jdbcManager, JdbcTopic topic,
90                boolean noLocal, String JavaDoc name)
91     throws JMSException JavaDoc
92   {
93     super(session, messageSelector, topic, noLocal);
94     
95     _jdbcManager = jdbcManager;
96     _topic = topic;
97
98     _subscriber = name;
99
100     createTopic(name);
101   }
102
103   /**
104    * Returns the topic.
105    */

106   public Topic JavaDoc getTopic()
107   {
108     return _topic;
109   }
110
111   /**
112    * Creates an ephemeral topic.
113    */

114   private void createTopic()
115     throws JMSException JavaDoc
116   {
117     try {
118       DataSource JavaDoc dataSource = _jdbcManager.getDataSource();
119       String JavaDoc consumerTable = _jdbcManager.getConsumerTable();
120       String JavaDoc consumerSequence = _jdbcManager.getConsumerSequence();
121       String JavaDoc messageTable = _jdbcManager.getMessageTable();
122     
123       Connection JavaDoc conn = dataSource.getConnection();
124       try {
125     String JavaDoc sql = ("SELECT MAX(m_id)" +
126               " FROM " + messageTable +
127               " WHERE queue=?");
128     
129     long max = -1;
130     
131     PreparedStatement JavaDoc pstmt;
132     pstmt = conn.prepareStatement(sql);
133     pstmt.setInt(1, _topic.getId());
134     
135     ResultSet JavaDoc rs = pstmt.executeQuery();
136     if (rs.next())
137       max = rs.getLong(1);
138     rs.close();
139     
140     if (consumerSequence != null) {
141       sql = _jdbcManager.getMetaData().selectSequenceSQL(consumerSequence);
142
143       pstmt = conn.prepareStatement(sql);
144
145       long id = 0;
146
147       rs = pstmt.executeQuery();
148       if (rs.next())
149         id = rs.getLong(1);
150       else
151         throw new IllegalStateException JavaDoc("no sequence value for consumer.");
152
153       rs.close();
154       pstmt.close();
155       
156       sql = ("INSERT INTO " + consumerTable +
157          " (s_id, queue, expire, read_id, ack_id) VALUES (?,?,?,?,?)");
158
159       pstmt = conn.prepareStatement(sql);
160
161       pstmt.setLong(1, id);
162       pstmt.setInt(2, _topic.getId());
163       pstmt.setLong(3, Alarm.getCurrentTime() + TOPIC_TIMEOUT);
164       pstmt.setLong(4, max);
165       pstmt.setLong(5, max);
166
167       pstmt.executeUpdate();
168       
169       _consumerId = id;
170     }
171     else {
172       sql = ("INSERT INTO " + consumerTable +
173          " (queue, expire, read_id, ack_id) VALUES (?,?,?,?)");
174
175       pstmt = conn.prepareStatement(sql,
176                     PreparedStatement.RETURN_GENERATED_KEYS);
177
178       pstmt.setInt(1, _topic.getId());
179       pstmt.setLong(2, Alarm.getCurrentTime() + TOPIC_TIMEOUT);
180       pstmt.setLong(3, max);
181       pstmt.setLong(4, max);
182
183       pstmt.executeUpdate();
184
185       ResultSet JavaDoc rsKeys = pstmt.getGeneratedKeys();
186
187       if (rsKeys.next()) {
188         _consumerId = rsKeys.getLong(1);
189       }
190       else
191         throw new JMSException JavaDoc(L.l("consumer insert didn't create a key"));
192
193       rsKeys.close();
194     }
195     
196     pstmt.close();
197       } finally {
198     conn.close();
199       }
200     } catch (SQLException JavaDoc e) {
201       throw new JMSExceptionWrapper(e);
202     }
203   }
204
205   /**
206    * Deletes an ephemeral topic.
207    */

208   private void deleteTopic()
209     throws JMSException JavaDoc
210   {
211     try {
212       DataSource JavaDoc dataSource = _jdbcManager.getDataSource();
213       String JavaDoc consumerTable = _jdbcManager.getConsumerTable();
214     
215       Connection JavaDoc conn = dataSource.getConnection();
216       try {
217     String JavaDoc sql = ("DELETE FROM " + consumerTable +
218               " WHERE s_id=?");
219     
220     PreparedStatement JavaDoc pstmt;
221     pstmt = conn.prepareStatement(sql);
222     pstmt.setLong(1, _consumerId);
223
224     pstmt.executeUpdate();
225     pstmt.close();
226       } finally {
227     conn.close();
228       }
229     } catch (SQLException JavaDoc e) {
230       throw new JMSExceptionWrapper(e);
231     }
232   }
233
234   /**
235    * Creates a durable topic subscriber.
236    */

237   private void createTopic(String JavaDoc name)
238     throws JMSException JavaDoc
239   {
240     try {
241       DataSource JavaDoc dataSource = _jdbcManager.getDataSource();
242       String JavaDoc consumerTable = _jdbcManager.getConsumerTable();
243       String JavaDoc consumerSequence = _jdbcManager.getConsumerSequence();
244       String JavaDoc messageTable = _jdbcManager.getMessageTable();
245
246       String JavaDoc clientId = _session.getClientID();
247     
248       Connection JavaDoc conn = dataSource.getConnection();
249       try {
250     String JavaDoc sql = ("SELECT s_id" +
251               " FROM " + consumerTable +
252               " WHERE queue=? AND client=? AND name=?");
253
254     PreparedStatement JavaDoc pstmt;
255     pstmt = conn.prepareStatement(sql);
256     pstmt.setInt(1, _topic.getId());
257     pstmt.setString(2, clientId);
258     pstmt.setString(3, name);
259
260     ResultSet JavaDoc rs = pstmt.executeQuery();
261     if (rs.next()) {
262       _consumerId = rs.getLong(1);
263       rs.close();
264       return;
265     }
266     
267     sql = ("SELECT MAX(m_id)" +
268            " FROM " + messageTable +
269            " WHERE queue=?");
270     
271     long max = -1;
272     
273     pstmt = conn.prepareStatement(sql);
274     pstmt.setInt(1, _topic.getId());
275     
276     rs = pstmt.executeQuery();
277     if (rs.next())
278       max = rs.getLong(1);
279     rs.close();
280     pstmt.close();
281
282     if (consumerSequence != null) {
283       sql = _jdbcManager.getMetaData().selectSequenceSQL(consumerSequence);
284
285       pstmt = conn.prepareStatement(sql);
286
287       long id = 0;
288
289       rs = pstmt.executeQuery();
290       if (rs.next())
291         id = rs.getLong(1);
292       else
293         throw new IllegalStateException JavaDoc("no sequence value for consumer.");
294
295       rs.close();
296       pstmt.close();
297       
298       sql = ("INSERT INTO " + consumerTable +
299          " (s_id, queue, client, name, expire, read_id, ack_id) VALUES (?,?,?,?,?,?,?)");
300
301       pstmt = conn.prepareStatement(sql);
302
303       pstmt.setLong(1, id);
304       pstmt.setInt(2, _topic.getId());
305       pstmt.setString(3, clientId);
306       pstmt.setString(4, name);
307       pstmt.setLong(5, Long.MAX_VALUE / 2);
308       pstmt.setLong(6, max);
309       pstmt.setLong(7, max);
310
311       pstmt.executeUpdate();
312     }
313     else {
314       sql = ("INSERT INTO " + consumerTable +
315          " (queue, client, name, expire, read_id, ack_id) VALUES (?,?,?,?,?,?)");
316
317       pstmt = conn.prepareStatement(sql,
318                     PreparedStatement.RETURN_GENERATED_KEYS);
319
320       pstmt.setInt(1, _topic.getId());
321       pstmt.setString(2, clientId);
322       pstmt.setString(3, name);
323       pstmt.setLong(4, Long.MAX_VALUE / 2);
324       pstmt.setLong(5, max);
325       pstmt.setLong(6, max);
326
327       pstmt.executeUpdate();
328
329       ResultSet JavaDoc rsKeys = pstmt.getGeneratedKeys();
330
331       if (rsKeys.next()) {
332         _consumerId = rsKeys.getLong(1);
333       }
334       else
335         throw new JMSException JavaDoc(L.l("consumer insert didn't create a key"));
336
337       rsKeys.close();
338       pstmt.close();
339     }
340       } finally {
341     conn.close();
342       }
343     } catch (SQLException JavaDoc e) {
344       throw new JMSExceptionWrapper(e);
345     }
346   }
347
348   /**
349    * Receives a message from the topic.
350    */

351   protected MessageImpl receiveImpl()
352     throws JMSException JavaDoc
353   {
354     purgeExpiredConsumers();
355     _topic.purgeExpiredMessages();
356     
357     try {
358       DataSource JavaDoc dataSource = _jdbcManager.getDataSource();
359       String JavaDoc messageTable = _jdbcManager.getMessageTable();
360       String JavaDoc consumerTable = _jdbcManager.getConsumerTable();
361       JdbcMessage jdbcMessage = _jdbcManager.getJdbcMessage();
362     
363       Connection JavaDoc conn = dataSource.getConnection();
364       try {
365     String JavaDoc sql = ("SELECT m_id, msg_type, delivered, body, header" +
366               " FROM " + messageTable + " m," +
367               " " + consumerTable + " s" +
368               " WHERE s_id=? AND m.queue=s.queue AND s.read_id<m_id" +
369               " AND ?<m.expire" +
370               " ORDER BY m_id");
371
372     PreparedStatement JavaDoc selectStmt = conn.prepareStatement(sql);
373
374     try {
375       selectStmt.setFetchSize(1);
376     } catch (Throwable JavaDoc e) {
377       log.log(Level.FINER, e.toString(), e);
378     }
379       
380     long id = -1;
381     
382     selectStmt.setLong(1, _consumerId);
383     selectStmt.setLong(2, Alarm.getCurrentTime());
384
385     MessageImpl msg = null;
386
387     ResultSet JavaDoc rs = selectStmt.executeQuery();
388     while (rs.next()) {
389       id = rs.getLong(1);
390
391       msg = jdbcMessage.readMessage(rs);
392
393       if (_selector == null || _selector.isMatch(msg))
394         break;
395       else
396         msg = null;
397     }
398
399     rs.close();
400     selectStmt.close();
401
402     if (msg == null)
403       return null;
404
405     sql = ("UPDATE " + consumerTable +
406            " SET read_id=?" +
407            " WHERE s_id=?");
408       
409     PreparedStatement JavaDoc updateStmt = conn.prepareStatement(sql);
410
411     updateStmt.setLong(1, id);
412     updateStmt.setLong(2, _consumerId);
413       
414     updateStmt.executeUpdate();
415     updateStmt.close();
416
417     return msg;
418       } finally {
419     conn.close();
420       }
421     } catch (IOException JavaDoc e) {
422       throw new JMSExceptionWrapper(e);
423     } catch (SQLException JavaDoc e) {
424       throw new JMSExceptionWrapper(e);
425     }
426   }
427
428   /**
429    * Acknowledges all received messages from the session.
430    */

431   public void acknowledge()
432     throws JMSException JavaDoc
433   {
434     try {
435       DataSource JavaDoc dataSource = _jdbcManager.getDataSource();
436       String JavaDoc consumerTable = _jdbcManager.getConsumerTable();
437     
438       Connection JavaDoc conn = dataSource.getConnection();
439
440       try {
441     String JavaDoc sql = ("UPDATE " + consumerTable +
442               " SET ack_id=read_id " +
443               " WHERE s_id=?");
444       
445     PreparedStatement JavaDoc pstmt;
446     pstmt = conn.prepareStatement(sql);
447
448     pstmt.setLong(1, _consumerId);
449
450     pstmt.executeUpdate();
451
452     pstmt.close();
453
454     deleteOldMessages();
455       } finally {
456     conn.close();
457       }
458     } catch (SQLException JavaDoc e) {
459       throw new JMSExceptionWrapper(e);
460     }
461   }
462
463   /**
464    * Delete read messages.
465    */

466   private void deleteOldMessages()
467     throws JMSException JavaDoc
468   {
469     try {
470       DataSource JavaDoc dataSource = _jdbcManager.getDataSource();
471       String JavaDoc messageTable = _jdbcManager.getMessageTable();
472       String JavaDoc consumerTable = _jdbcManager.getConsumerTable();
473     
474       Connection JavaDoc conn = dataSource.getConnection();
475
476       try {
477     String JavaDoc sql;
478     
479     sql = ("DELETE FROM " + messageTable +
480            " WHERE queue=? AND NOT EXISTS(" +
481                " SELECT * FROM " + consumerTable +
482            " WHERE queue=? AND ack_id < m_id)");
483       
484     PreparedStatement JavaDoc pstmt;
485     pstmt = conn.prepareStatement(sql);
486     pstmt.setInt(1, _topic.getId());
487     pstmt.setInt(2, _topic.getId());
488
489     pstmt.executeUpdate();
490       } finally {
491     conn.close();
492       }
493     } catch (SQLException JavaDoc e) {
494       throw new JMSExceptionWrapper(e);
495     }
496   }
497
498   /**
499    * Truncates all blobs before a deletion.
500    */

501   
502
503   /**
504    * Rollback all received messages from the session.
505    */

506   public void rollback()
507     throws JMSException JavaDoc
508   {
509     try {
510       DataSource JavaDoc dataSource = _jdbcManager.getDataSource();
511       String JavaDoc consumerTable = _jdbcManager.getConsumerTable();
512     
513       Connection JavaDoc conn = dataSource.getConnection();
514
515       try {
516     String JavaDoc sql = ("UPDATE " + consumerTable +
517               " SET read_id=ack_id " +
518               " WHERE s_id=?");
519       
520     PreparedStatement JavaDoc pstmt;
521     pstmt = conn.prepareStatement(sql);
522
523     pstmt.setLong(1, _consumerId);
524
525     pstmt.executeUpdate();
526
527     pstmt.close();
528       } finally {
529     conn.close();
530       }
531     } catch (SQLException JavaDoc e) {
532       throw new JMSExceptionWrapper(e);
533     }
534   }
535
536   /**
537    * Purges expired consumers.
538    */

539   private void purgeExpiredConsumers()
540   {
541     long now = Alarm.getCurrentTime();
542
543     if (now < _lastPurgeTime + TOPIC_TIMEOUT)
544       return;
545
546     _lastPurgeTime = now;
547     
548     try {
549       DataSource JavaDoc dataSource = _jdbcManager.getDataSource();
550       String JavaDoc messageTable = _jdbcManager.getMessageTable();
551       String JavaDoc consumerTable = _jdbcManager.getConsumerTable();
552       JdbcMessage jdbcMessage = _jdbcManager.getJdbcMessage();
553     
554       Connection JavaDoc conn = dataSource.getConnection();
555       try {
556     String JavaDoc sql = ("DELETE FROM " + consumerTable +
557               " WHERE expire<?");
558
559     PreparedStatement JavaDoc pstmt = conn.prepareStatement(sql);
560     pstmt.setLong(1, Alarm.getCurrentTime());
561
562     pstmt.executeUpdate();
563       } finally {
564     conn.close();
565       }
566     } catch (Exception JavaDoc e) {
567       log.log(Level.FINER, e.toString(), e);
568     }
569   }
570
571   /**
572    * Handles the alarm by updating the expire count.
573    */

574   public void handleAlarm(Alarm alarm)
575   {
576     if (_isClosed)
577       return;
578
579     try {
580       Connection JavaDoc conn = _jdbcManager.getDataSource().getConnection();
581       try {
582     String JavaDoc consumerTable = _jdbcManager.getConsumerTable();
583     
584     String JavaDoc sql = ("UPDATE " + consumerTable +
585               " SET expire=?" +
586               " WHERE s_id=?");
587
588     PreparedStatement JavaDoc pstmt = conn.prepareStatement(sql);
589     pstmt.setLong(1, Alarm.getCurrentTime() + TOPIC_TIMEOUT);
590     pstmt.setLong(2, _consumerId);
591
592     pstmt.executeUpdate();
593       } finally {
594     conn.close();
595       }
596     } catch (Throwable JavaDoc e) {
597       log.log(Level.WARNING, e.toString(), e);
598     } finally {
599       _alarm.queue(TOPIC_TIMEOUT / 4);
600     }
601   }
602
603   /**
604    * Closes the consumer.
605    */

606   public void close()
607     throws JMSException JavaDoc
608   {
609     if (_isClosed)
610       return;
611     _isClosed = true;
612
613     if (_alarm != null)
614       _alarm.dequeue();
615     
616     try {
617       if (_subscriber == null)
618     deleteTopic();
619       else {
620     // XXX: ejb/6a22
621
_session.unsubscribe(_subscriber);
622       }
623     } catch (Throwable JavaDoc e) {
624       log.log(Level.WARNING, e.toString(), e);
625     }
626     
627     super.close();
628   }
629
630   /**
631    * Returns a printable view of the topic.
632    */

633   public String JavaDoc toString()
634   {
635     return "JdbcTopicConsumer[" + _topic + "," + _consumerId + "]";
636   }
637 }
638
Popular Tags