1 29 30 package com.caucho.jms.jdbc; 31 32 import com.caucho.jms.JMSExceptionWrapper; 33 import com.caucho.jms.message.MessageImpl; 34 import com.caucho.jms.session.MessageConsumerImpl; 35 import com.caucho.jms.session.SessionImpl; 36 import com.caucho.log.Log; 37 import com.caucho.util.Alarm; 38 import com.caucho.util.AlarmListener; 39 import com.caucho.util.L10N; 40 41 import javax.jms.JMSException ; 42 import javax.jms.Queue ; 43 import javax.jms.QueueReceiver ; 44 import javax.sql.DataSource ; 45 import java.io.IOException ; 46 import java.sql.Connection ; 47 import java.sql.PreparedStatement ; 48 import java.sql.ResultSet ; 49 import java.sql.SQLException ; 50 import java.util.logging.Level ; 51 import java.util.logging.Logger ; 52 53 56 public class JdbcQueueConsumer extends MessageConsumerImpl 57 implements QueueReceiver , AlarmListener { 58 static final Logger log = Log.open(JdbcQueueConsumer.class); 59 static final L10N L = new L10N(JdbcQueueConsumer.class); 60 61 private final static long QUEUE_TIMEOUT = 3600 * 1000L; 62 63 private JdbcManager _jdbcManager; 64 65 private JdbcQueue _queue; 66 67 private long _consumerId; 68 69 private boolean _autoAck; 70 71 private boolean _isClosed; 72 private Alarm _alarm; 73 74 private long _lastPurgeTime; 75 76 public JdbcQueueConsumer(SessionImpl session, String messageSelector, 77 JdbcManager jdbcManager, JdbcQueue queue) 78 throws JMSException 79 { 80 super(session, messageSelector, queue, false); 81 82 _jdbcManager = jdbcManager; 83 _queue = queue; 84 85 if (session.getAcknowledgeMode() == session.AUTO_ACKNOWLEDGE || 86 session.getAcknowledgeMode() == session.DUPS_OK_ACKNOWLEDGE) 87 _autoAck = true; 88 89 createQueue(); 90 91 _alarm = new Alarm(this, QUEUE_TIMEOUT / 4); 92 93 if (log.isLoggable(Level.FINE)) 94 log.fine("JdbcQueueConsumer[" + queue + "," + _consumerId + "] created"); 95 } 96 97 100 public Queue getQueue() 101 { 102 return _queue; 103 } 104 105 108 private void createQueue() 109 throws JMSException 110 { 111 try { 112 DataSource dataSource = _jdbcManager.getDataSource(); 113 String consumerTable = _jdbcManager.getConsumerTable(); 114 String consumerSequence = _jdbcManager.getConsumerSequence(); 115 String messageTable = _jdbcManager.getMessageTable(); 116 117 Connection conn = dataSource.getConnection(); 118 try { 119 if (consumerSequence != null) { 120 String sql = _jdbcManager.getMetaData().selectSequenceSQL(consumerSequence); 121 PreparedStatement pstmt = conn.prepareStatement(sql); 122 123 long id = 0; 124 125 ResultSet rs = pstmt.executeQuery(); 126 if (rs.next()) 127 id = rs.getLong(1); 128 else 129 throw new RuntimeException ("Expected result in customer create"); 130 131 sql = ("INSERT INTO " + consumerTable + 132 " (s_id, queue, expire) VALUES (?, ?,?)"); 133 134 pstmt = conn.prepareStatement(sql); 135 136 pstmt.setLong(1, id); 137 pstmt.setInt(2, _queue.getId()); 138 pstmt.setLong(3, Alarm.getCurrentTime() + QUEUE_TIMEOUT); 139 140 pstmt.executeUpdate(); 141 } 142 else { 143 String sql = ("INSERT INTO " + consumerTable + 144 " (queue, expire) VALUES (?,?)"); 145 146 PreparedStatement pstmt; 147 pstmt = conn.prepareStatement(sql, 148 PreparedStatement.RETURN_GENERATED_KEYS); 149 150 pstmt.setInt(1, _queue.getId()); 151 pstmt.setLong(2, Alarm.getCurrentTime() + QUEUE_TIMEOUT); 152 153 pstmt.executeUpdate(); 154 155 ResultSet rsKeys = pstmt.getGeneratedKeys(); 156 157 if (rsKeys.next()) { 158 _consumerId = rsKeys.getLong(1); 159 } 160 else 161 throw new JMSException (L.l("consumer insert didn't create a key")); 162 163 rsKeys.close(); 164 pstmt.close(); 165 } 166 } finally { 167 conn.close(); 168 } 169 } catch (SQLException e) { 170 throw new JMSExceptionWrapper(e); 171 } 172 } 173 174 177 private void deleteQueue() 178 throws JMSException 179 { 180 try { 181 DataSource dataSource = _jdbcManager.getDataSource(); 182 String consumerTable = _jdbcManager.getConsumerTable(); 183 184 Connection conn = dataSource.getConnection(); 185 try { 186 String sql = ("DELETE FROM " + consumerTable + 187 " WHERE s_id=?"); 188 189 PreparedStatement pstmt; 190 pstmt = conn.prepareStatement(sql); 191 pstmt.setLong(1, _consumerId); 192 193 pstmt.executeUpdate(); 194 pstmt.close(); 195 } finally { 196 conn.close(); 197 } 198 } catch (SQLException e) { 199 throw new JMSExceptionWrapper(e); 200 } 201 } 202 203 206 protected MessageImpl receiveImpl() 207 throws JMSException 208 { 209 try { 210 purgeExpiredConsumers(); 211 _queue.purgeExpiredMessages(); 212 213 long minId = -1; 214 215 DataSource dataSource = _jdbcManager.getDataSource(); 216 String messageTable = _jdbcManager.getMessageTable(); 217 JdbcMessage jdbcMessage = _jdbcManager.getJdbcMessage(); 218 219 Connection conn = dataSource.getConnection(); 220 try { 221 String sql = ("SELECT m_id, msg_type, delivered, body, header" + 222 " FROM " + messageTable + 223 " WHERE ?<m_id AND queue=?" + 224 " AND consumer IS NULL AND ?<=expire" + 225 " ORDER BY m_id"); 226 227 PreparedStatement selectStmt = conn.prepareStatement(sql); 228 229 try { 230 selectStmt.setFetchSize(1); 231 } catch (Throwable e) { 232 log.log(Level.FINER, e.toString(), e); 233 } 234 235 if (_autoAck) { 236 sql = ("DELETE FROM " + messageTable + 237 " WHERE m_id=? AND consumer IS NULL"); 238 } 239 else 240 sql = ("UPDATE " + messageTable + 241 " SET consumer=?, delivered=1" + 242 " WHERE m_id=? AND consumer IS NULL"); 243 244 PreparedStatement updateStmt = conn.prepareStatement(sql); 245 246 long id = -1; 247 while (true) { 248 id = -1; 249 250 selectStmt.setLong(1, minId); 251 selectStmt.setInt(2, _queue.getId()); 252 selectStmt.setLong(3, Alarm.getCurrentTime()); 253 254 MessageImpl msg = null; 255 256 ResultSet rs = selectStmt.executeQuery(); 257 while (rs.next()) { 258 id = rs.getLong(1); 259 260 minId = id; 261 262 msg = jdbcMessage.readMessage(rs); 263 264 if (_selector == null || _selector.isMatch(msg)) 265 break; 266 else 267 msg = null; 268 } 269 270 rs.close(); 271 272 if (msg == null) 273 return null; 274 275 if (_autoAck) { 276 updateStmt.setLong(1, id); 277 } 278 else { 279 updateStmt.setLong(1, _consumerId); 280 updateStmt.setLong(2, id); 281 } 282 283 int updateCount = updateStmt.executeUpdate(); 284 285 if (updateCount == 1) 286 return msg; 287 } 288 } finally { 289 conn.close(); 290 } 291 } catch (IOException e) { 292 throw new JMSExceptionWrapper(e); 293 } catch (SQLException e) { 294 throw new JMSExceptionWrapper(e); 295 } 296 } 297 298 301 public void acknowledge() 302 throws JMSException 303 { 304 if (_autoAck) 305 return; 306 307 try { 308 DataSource dataSource = _jdbcManager.getDataSource(); 309 String messageTable = _jdbcManager.getMessageTable(); 310 311 Connection conn = dataSource.getConnection(); 312 313 try { 314 String sql = ("DELETE FROM " + messageTable + " " + 315 "WHERE consumer=?"); 316 317 PreparedStatement pstmt; 318 pstmt = conn.prepareStatement(sql); 319 320 pstmt.setLong(1, _consumerId); 321 322 pstmt.executeUpdate(); 323 324 pstmt.close(); 325 } finally { 326 conn.close(); 327 } 328 } catch (SQLException e) { 329 throw new JMSExceptionWrapper(e); 330 } 331 } 332 333 336 public void rollback() 337 throws JMSException 338 { 339 if (_autoAck) 340 return; 341 342 try { 343 DataSource dataSource = _jdbcManager.getDataSource(); 344 String messageTable = _jdbcManager.getMessageTable(); 345 346 Connection conn = dataSource.getConnection(); 347 348 try { 349 String sql = ("UPDATE " + messageTable + 350 " SET consumer=NULL " + 351 " WHERE consumer=?"); 352 353 PreparedStatement pstmt; 354 pstmt = conn.prepareStatement(sql); 355 356 pstmt.setLong(1, _consumerId); 357 358 pstmt.executeUpdate(); 359 360 pstmt.close(); 361 } finally { 362 conn.close(); 363 } 364 } catch (SQLException e) { 365 throw new JMSExceptionWrapper(e); 366 } 367 } 368 369 372 private void purgeExpiredConsumers() 373 { 374 long now = Alarm.getCurrentTime(); 375 376 if (now < _lastPurgeTime + QUEUE_TIMEOUT) 377 return; 378 379 _lastPurgeTime = now; 380 381 try { 382 DataSource dataSource = _jdbcManager.getDataSource(); 383 String messageTable = _jdbcManager.getMessageTable(); 384 String consumerTable = _jdbcManager.getConsumerTable(); 385 JdbcMessage jdbcMessage = _jdbcManager.getJdbcMessage(); 386 387 Connection conn = dataSource.getConnection(); 388 try { 389 String sql = ("UPDATE " + messageTable + 390 " SET consumer=NULL" + 391 " WHERE consumer IS NOT NULL" + 392 " AND EXISTS(SELECT * FROM " + consumerTable + 393 " WHERE s_id=consumer AND expire<?)"); 394 395 PreparedStatement pstmt = conn.prepareStatement(sql); 396 pstmt.setLong(1, Alarm.getCurrentTime()); 397 398 int count = pstmt.executeUpdate(); 399 pstmt.close(); 400 401 if (count > 0) 402 log.fine("JMSQueue[" + _queue.getName() + "] recovered " + count + " messages"); 403 404 sql = ("DELETE FROM " + consumerTable + 405 " WHERE expire<?"); 406 407 pstmt = conn.prepareStatement(sql); 408 pstmt.setLong(1, Alarm.getCurrentTime()); 409 410 pstmt.executeUpdate(); 411 } finally { 412 conn.close(); 413 } 414 } catch (Exception e) { 415 log.log(Level.FINER, e.toString(), e); 416 } 417 } 418 419 422 public void handleAlarm(Alarm alarm) 423 { 424 if (_isClosed) 425 return; 426 427 try { 428 Connection conn = _jdbcManager.getDataSource().getConnection(); 429 try { 430 String consumerTable = _jdbcManager.getConsumerTable(); 431 432 String sql = ("UPDATE " + consumerTable + 433 " SET expire=?" + 434 " WHERE s_id=?"); 435 436 PreparedStatement pstmt = conn.prepareStatement(sql); 437 pstmt.setLong(1, Alarm.getCurrentTime() + QUEUE_TIMEOUT); 438 pstmt.setLong(2, _consumerId); 439 440 pstmt.executeUpdate(); 441 } finally { 442 conn.close(); 443 } 444 } catch (Throwable e) { 445 log.log(Level.WARNING, e.toString(), e); 446 } finally { 447 _alarm.queue(QUEUE_TIMEOUT / 4); 448 } 449 } 450 451 454 public void close() 455 throws JMSException 456 { 457 if (_isClosed) 458 return; 459 _isClosed = true; 460 461 _alarm.dequeue(); 462 463 try { 464 deleteQueue(); 465 } catch (Throwable e) { 466 log.log(Level.WARNING, e.toString(), e); 467 } 468 469 super.close(); 470 } 471 472 475 public String toString() 476 { 477 return "JdbcQueueConsumer[" + _queue + "," + _consumerId + "]"; 478 } 479 } 480 481 | Popular Tags |