1 28 29 package com.caucho.jms.jdbc; 30 31 import com.caucho.jms.selector.Selector; 32 import com.caucho.jms.selector.SelectorParser; 33 import com.caucho.jms.session.SessionImpl; 34 import com.caucho.log.Log; 35 import com.caucho.util.Alarm; 36 import com.caucho.util.L10N; 37 38 import javax.jms.JMSException ; 39 import javax.jms.Message ; 40 import javax.jms.Queue ; 41 import javax.jms.QueueBrowser ; 42 import java.sql.Connection ; 43 import java.sql.PreparedStatement ; 44 import java.sql.ResultSet ; 45 import java.util.Enumeration ; 46 import java.util.logging.Level ; 47 import java.util.logging.Logger ; 48 49 52 public class JdbcQueueBrowser implements QueueBrowser { 53 static final Logger log = Log.open(JdbcQueueBrowser.class); 54 static final L10N L = new L10N(JdbcQueueBrowser.class); 55 56 private SessionImpl _session; 57 private JdbcManager _jdbcManager; 58 59 private JdbcQueue _queue; 60 61 private String _messageSelector; 62 private Selector _selector; 63 64 private boolean _isClosed; 65 66 public JdbcQueueBrowser(SessionImpl session, String messageSelector, 67 JdbcQueue queue) 68 throws JMSException 69 { 70 _session = session; 71 _jdbcManager = queue.getJdbcManager(); 72 _queue = queue; 73 74 _messageSelector = messageSelector; 75 if (_messageSelector != null) { 76 SelectorParser parser = new SelectorParser(); 77 _selector = parser.parse(messageSelector); 78 } 79 } 80 81 84 public Queue getQueue() 85 { 86 return _queue; 87 } 88 89 92 public String getMessageSelector() 93 { 94 return _messageSelector; 95 } 96 97 100 public Enumeration getEnumeration() 101 { 102 return new BrowserEnumeration(); 103 } 104 105 108 public void close() 109 throws JMSException 110 { 111 if (_isClosed) 112 return; 113 _isClosed = true; 114 } 115 116 119 public String toString() 120 { 121 return "JdbcQueueBrowser[" + _queue + "]"; 122 } 123 124 class BrowserEnumeration implements Enumeration { 125 private long _maxId = -1; 126 private Message _msg; 127 128 public boolean hasMoreElements() 129 { 130 if (_msg == null) 131 getNextMessage(); 132 133 return _msg != null; 134 } 135 136 public Object nextElement() 137 { 138 if (_msg == null) 139 getNextMessage(); 140 141 Message msg = _msg; 142 _msg = null; 143 144 return msg; 145 } 146 147 150 private Message getNextMessage() 151 { 152 try { 153 Connection conn = _jdbcManager.getDataSource().getConnection(); 154 String messageTable = _jdbcManager.getMessageTable(); 155 156 try { 157 String sql = ("SELECT m_id, msg_type, delivered, body, header" + 158 " FROM " + messageTable + 159 " WHERE ?<m_id AND queue=?" + 160 " AND consumer IS NULL AND ?<expire"); 161 162 PreparedStatement pstmt = conn.prepareStatement(sql); 163 pstmt.setLong(1, _maxId); 164 pstmt.setInt(2, _queue.getId()); 165 pstmt.setLong(3, Alarm.getCurrentTime()); 166 167 ResultSet rs = pstmt.executeQuery(); 168 while (rs.next()) { 169 _maxId = rs.getLong(1); 170 171 _msg = _jdbcManager.getJdbcMessage().readMessage(rs); 172 173 if (_selector == null || _selector.isMatch(_msg)) 174 break; 175 else 176 _msg = null; 177 } 178 179 rs.close(); 180 pstmt.close(); 181 } finally { 182 conn.close(); 183 } 184 } catch (Throwable e) { 185 log.log(Level.WARNING, e.toString(), e); 186 } 187 188 return _msg; 189 } 190 } 191 } 192 193 | Popular Tags |