1 28 29 package com.caucho.jms.jdbc; 30 31 import com.caucho.jms.JMSExceptionWrapper; 32 import com.caucho.jms.message.MessageImpl; 33 import com.caucho.jms.session.MessageConsumerImpl; 34 import com.caucho.jms.session.SessionImpl; 35 import com.caucho.log.Log; 36 import com.caucho.util.Alarm; 37 import com.caucho.util.AlarmListener; 38 import com.caucho.util.L10N; 39 40 import javax.jms.JMSException ; 41 import javax.jms.Topic ; 42 import javax.jms.TopicSubscriber ; 43 import javax.sql.DataSource ; 44 import java.io.IOException ; 45 import java.sql.Connection ; 46 import java.sql.PreparedStatement ; 47 import java.sql.ResultSet ; 48 import java.sql.SQLException ; 49 import java.util.logging.Level ; 50 import java.util.logging.Logger ; 51 52 55 public class JdbcTopicConsumer extends MessageConsumerImpl 56 implements AlarmListener, TopicSubscriber { 57 static final Logger log = Log.open(JdbcTopicConsumer.class); 58 static final L10N L = new L10N(JdbcTopicConsumer.class); 59 60 private final static long TOPIC_TIMEOUT = 3600 * 1000L; 61 62 private JdbcManager _jdbcManager; 63 64 private JdbcTopic _topic; 65 66 private String _subscriber; 67 private long _consumerId; 68 69 private long _lastPurgeTime; 70 private boolean _isClosed; 71 private Alarm _alarm; 72 73 public JdbcTopicConsumer(SessionImpl session, String messageSelector, 74 JdbcManager jdbcManager, JdbcTopic topic, 75 boolean noLocal) 76 throws JMSException 77 { 78 super(session, messageSelector, topic, noLocal); 79 80 _jdbcManager = jdbcManager; 81 _topic = topic; 82 83 createTopic(); 84 85 _alarm = new Alarm(this, TOPIC_TIMEOUT / 4); 86 } 87 88 public JdbcTopicConsumer(SessionImpl session, String messageSelector, 89 JdbcManager jdbcManager, JdbcTopic topic, 90 boolean noLocal, String name) 91 throws JMSException 92 { 93 super(session, messageSelector, topic, noLocal); 94 95 _jdbcManager = jdbcManager; 96 _topic = topic; 97 98 _subscriber = name; 99 100 createTopic(name); 101 } 102 103 106 public Topic getTopic() 107 { 108 return _topic; 109 } 110 111 114 private void createTopic() 115 throws JMSException 116 { 117 try { 118 DataSource dataSource = _jdbcManager.getDataSource(); 119 String consumerTable = _jdbcManager.getConsumerTable(); 120 String consumerSequence = _jdbcManager.getConsumerSequence(); 121 String messageTable = _jdbcManager.getMessageTable(); 122 123 Connection conn = dataSource.getConnection(); 124 try { 125 String sql = ("SELECT MAX(m_id)" + 126 " FROM " + messageTable + 127 " WHERE queue=?"); 128 129 long max = -1; 130 131 PreparedStatement pstmt; 132 pstmt = conn.prepareStatement(sql); 133 pstmt.setInt(1, _topic.getId()); 134 135 ResultSet rs = pstmt.executeQuery(); 136 if (rs.next()) 137 max = rs.getLong(1); 138 rs.close(); 139 140 if (consumerSequence != null) { 141 sql = _jdbcManager.getMetaData().selectSequenceSQL(consumerSequence); 142 143 pstmt = conn.prepareStatement(sql); 144 145 long id = 0; 146 147 rs = pstmt.executeQuery(); 148 if (rs.next()) 149 id = rs.getLong(1); 150 else 151 throw new IllegalStateException ("no sequence value for consumer."); 152 153 rs.close(); 154 pstmt.close(); 155 156 sql = ("INSERT INTO " + consumerTable + 157 " (s_id, queue, expire, read_id, ack_id) VALUES (?,?,?,?,?)"); 158 159 pstmt = conn.prepareStatement(sql); 160 161 pstmt.setLong(1, id); 162 pstmt.setInt(2, _topic.getId()); 163 pstmt.setLong(3, Alarm.getCurrentTime() + TOPIC_TIMEOUT); 164 pstmt.setLong(4, max); 165 pstmt.setLong(5, max); 166 167 pstmt.executeUpdate(); 168 169 _consumerId = id; 170 } 171 else { 172 sql = ("INSERT INTO " + consumerTable + 173 " (queue, expire, read_id, ack_id) VALUES (?,?,?,?)"); 174 175 pstmt = conn.prepareStatement(sql, 176 PreparedStatement.RETURN_GENERATED_KEYS); 177 178 pstmt.setInt(1, _topic.getId()); 179 pstmt.setLong(2, Alarm.getCurrentTime() + TOPIC_TIMEOUT); 180 pstmt.setLong(3, max); 181 pstmt.setLong(4, max); 182 183 pstmt.executeUpdate(); 184 185 ResultSet rsKeys = pstmt.getGeneratedKeys(); 186 187 if (rsKeys.next()) { 188 _consumerId = rsKeys.getLong(1); 189 } 190 else 191 throw new JMSException (L.l("consumer insert didn't create a key")); 192 193 rsKeys.close(); 194 } 195 196 pstmt.close(); 197 } finally { 198 conn.close(); 199 } 200 } catch (SQLException e) { 201 throw new JMSExceptionWrapper(e); 202 } 203 } 204 205 208 private void deleteTopic() 209 throws JMSException 210 { 211 try { 212 DataSource dataSource = _jdbcManager.getDataSource(); 213 String consumerTable = _jdbcManager.getConsumerTable(); 214 215 Connection conn = dataSource.getConnection(); 216 try { 217 String sql = ("DELETE FROM " + consumerTable + 218 " WHERE s_id=?"); 219 220 PreparedStatement pstmt; 221 pstmt = conn.prepareStatement(sql); 222 pstmt.setLong(1, _consumerId); 223 224 pstmt.executeUpdate(); 225 pstmt.close(); 226 } finally { 227 conn.close(); 228 } 229 } catch (SQLException e) { 230 throw new JMSExceptionWrapper(e); 231 } 232 } 233 234 237 private void createTopic(String name) 238 throws JMSException 239 { 240 try { 241 DataSource dataSource = _jdbcManager.getDataSource(); 242 String consumerTable = _jdbcManager.getConsumerTable(); 243 String consumerSequence = _jdbcManager.getConsumerSequence(); 244 String messageTable = _jdbcManager.getMessageTable(); 245 246 String clientId = _session.getClientID(); 247 248 Connection conn = dataSource.getConnection(); 249 try { 250 String sql = ("SELECT s_id" + 251 " FROM " + consumerTable + 252 " WHERE queue=? AND client=? AND name=?"); 253 254 PreparedStatement pstmt; 255 pstmt = conn.prepareStatement(sql); 256 pstmt.setInt(1, _topic.getId()); 257 pstmt.setString(2, clientId); 258 pstmt.setString(3, name); 259 260 ResultSet rs = pstmt.executeQuery(); 261 if (rs.next()) { 262 _consumerId = rs.getLong(1); 263 rs.close(); 264 return; 265 } 266 267 sql = ("SELECT MAX(m_id)" + 268 " FROM " + messageTable + 269 " WHERE queue=?"); 270 271 long max = -1; 272 273 pstmt = conn.prepareStatement(sql); 274 pstmt.setInt(1, _topic.getId()); 275 276 rs = pstmt.executeQuery(); 277 if (rs.next()) 278 max = rs.getLong(1); 279 rs.close(); 280 pstmt.close(); 281 282 if (consumerSequence != null) { 283 sql = _jdbcManager.getMetaData().selectSequenceSQL(consumerSequence); 284 285 pstmt = conn.prepareStatement(sql); 286 287 long id = 0; 288 289 rs = pstmt.executeQuery(); 290 if (rs.next()) 291 id = rs.getLong(1); 292 else 293 throw new IllegalStateException ("no sequence value for consumer."); 294 295 rs.close(); 296 pstmt.close(); 297 298 sql = ("INSERT INTO " + consumerTable + 299 " (s_id, queue, client, name, expire, read_id, ack_id) VALUES (?,?,?,?,?,?,?)"); 300 301 pstmt = conn.prepareStatement(sql); 302 303 pstmt.setLong(1, id); 304 pstmt.setInt(2, _topic.getId()); 305 pstmt.setString(3, clientId); 306 pstmt.setString(4, name); 307 pstmt.setLong(5, Long.MAX_VALUE / 2); 308 pstmt.setLong(6, max); 309 pstmt.setLong(7, max); 310 311 pstmt.executeUpdate(); 312 } 313 else { 314 sql = ("INSERT INTO " + consumerTable + 315 " (queue, client, name, expire, read_id, ack_id) VALUES (?,?,?,?,?,?)"); 316 317 pstmt = conn.prepareStatement(sql, 318 PreparedStatement.RETURN_GENERATED_KEYS); 319 320 pstmt.setInt(1, _topic.getId()); 321 pstmt.setString(2, clientId); 322 pstmt.setString(3, name); 323 pstmt.setLong(4, Long.MAX_VALUE / 2); 324 pstmt.setLong(5, max); 325 pstmt.setLong(6, max); 326 327 pstmt.executeUpdate(); 328 329 ResultSet rsKeys = pstmt.getGeneratedKeys(); 330 331 if (rsKeys.next()) { 332 _consumerId = rsKeys.getLong(1); 333 } 334 else 335 throw new JMSException (L.l("consumer insert didn't create a key")); 336 337 rsKeys.close(); 338 pstmt.close(); 339 } 340 } finally { 341 conn.close(); 342 } 343 } catch (SQLException e) { 344 throw new JMSExceptionWrapper(e); 345 } 346 } 347 348 351 protected MessageImpl receiveImpl() 352 throws JMSException 353 { 354 purgeExpiredConsumers(); 355 _topic.purgeExpiredMessages(); 356 357 try { 358 DataSource dataSource = _jdbcManager.getDataSource(); 359 String messageTable = _jdbcManager.getMessageTable(); 360 String consumerTable = _jdbcManager.getConsumerTable(); 361 JdbcMessage jdbcMessage = _jdbcManager.getJdbcMessage(); 362 363 Connection conn = dataSource.getConnection(); 364 try { 365 String sql = ("SELECT m_id, msg_type, delivered, body, header" + 366 " FROM " + messageTable + " m," + 367 " " + consumerTable + " s" + 368 " WHERE s_id=? AND m.queue=s.queue AND s.read_id<m_id" + 369 " AND ?<m.expire" + 370 " ORDER BY m_id"); 371 372 PreparedStatement selectStmt = conn.prepareStatement(sql); 373 374 try { 375 selectStmt.setFetchSize(1); 376 } catch (Throwable e) { 377 log.log(Level.FINER, e.toString(), e); 378 } 379 380 long id = -1; 381 382 selectStmt.setLong(1, _consumerId); 383 selectStmt.setLong(2, Alarm.getCurrentTime()); 384 385 MessageImpl msg = null; 386 387 ResultSet rs = selectStmt.executeQuery(); 388 while (rs.next()) { 389 id = rs.getLong(1); 390 391 msg = jdbcMessage.readMessage(rs); 392 393 if (_selector == null || _selector.isMatch(msg)) 394 break; 395 else 396 msg = null; 397 } 398 399 rs.close(); 400 selectStmt.close(); 401 402 if (msg == null) 403 return null; 404 405 sql = ("UPDATE " + consumerTable + 406 " SET read_id=?" + 407 " WHERE s_id=?"); 408 409 PreparedStatement updateStmt = conn.prepareStatement(sql); 410 411 updateStmt.setLong(1, id); 412 updateStmt.setLong(2, _consumerId); 413 414 updateStmt.executeUpdate(); 415 updateStmt.close(); 416 417 return msg; 418 } finally { 419 conn.close(); 420 } 421 } catch (IOException e) { 422 throw new JMSExceptionWrapper(e); 423 } catch (SQLException e) { 424 throw new JMSExceptionWrapper(e); 425 } 426 } 427 428 431 public void acknowledge() 432 throws JMSException 433 { 434 try { 435 DataSource dataSource = _jdbcManager.getDataSource(); 436 String consumerTable = _jdbcManager.getConsumerTable(); 437 438 Connection conn = dataSource.getConnection(); 439 440 try { 441 String sql = ("UPDATE " + consumerTable + 442 " SET ack_id=read_id " + 443 " WHERE s_id=?"); 444 445 PreparedStatement pstmt; 446 pstmt = conn.prepareStatement(sql); 447 448 pstmt.setLong(1, _consumerId); 449 450 pstmt.executeUpdate(); 451 452 pstmt.close(); 453 454 deleteOldMessages(); 455 } finally { 456 conn.close(); 457 } 458 } catch (SQLException e) { 459 throw new JMSExceptionWrapper(e); 460 } 461 } 462 463 466 private void deleteOldMessages() 467 throws JMSException 468 { 469 try { 470 DataSource dataSource = _jdbcManager.getDataSource(); 471 String messageTable = _jdbcManager.getMessageTable(); 472 String consumerTable = _jdbcManager.getConsumerTable(); 473 474 Connection conn = dataSource.getConnection(); 475 476 try { 477 String sql; 478 479 sql = ("DELETE FROM " + messageTable + 480 " WHERE queue=? AND NOT EXISTS(" + 481 " SELECT * FROM " + consumerTable + 482 " WHERE queue=? AND ack_id < m_id)"); 483 484 PreparedStatement pstmt; 485 pstmt = conn.prepareStatement(sql); 486 pstmt.setInt(1, _topic.getId()); 487 pstmt.setInt(2, _topic.getId()); 488 489 pstmt.executeUpdate(); 490 } finally { 491 conn.close(); 492 } 493 } catch (SQLException e) { 494 throw new JMSExceptionWrapper(e); 495 } 496 } 497 498 501 502 503 506 public void rollback() 507 throws JMSException 508 { 509 try { 510 DataSource dataSource = _jdbcManager.getDataSource(); 511 String consumerTable = _jdbcManager.getConsumerTable(); 512 513 Connection conn = dataSource.getConnection(); 514 515 try { 516 String sql = ("UPDATE " + consumerTable + 517 " SET read_id=ack_id " + 518 " WHERE s_id=?"); 519 520 PreparedStatement pstmt; 521 pstmt = conn.prepareStatement(sql); 522 523 pstmt.setLong(1, _consumerId); 524 525 pstmt.executeUpdate(); 526 527 pstmt.close(); 528 } finally { 529 conn.close(); 530 } 531 } catch (SQLException e) { 532 throw new JMSExceptionWrapper(e); 533 } 534 } 535 536 539 private void purgeExpiredConsumers() 540 { 541 long now = Alarm.getCurrentTime(); 542 543 if (now < _lastPurgeTime + TOPIC_TIMEOUT) 544 return; 545 546 _lastPurgeTime = now; 547 548 try { 549 DataSource dataSource = _jdbcManager.getDataSource(); 550 String messageTable = _jdbcManager.getMessageTable(); 551 String consumerTable = _jdbcManager.getConsumerTable(); 552 JdbcMessage jdbcMessage = _jdbcManager.getJdbcMessage(); 553 554 Connection conn = dataSource.getConnection(); 555 try { 556 String sql = ("DELETE FROM " + consumerTable + 557 " WHERE expire<?"); 558 559 PreparedStatement pstmt = conn.prepareStatement(sql); 560 pstmt.setLong(1, Alarm.getCurrentTime()); 561 562 pstmt.executeUpdate(); 563 } finally { 564 conn.close(); 565 } 566 } catch (Exception e) { 567 log.log(Level.FINER, e.toString(), e); 568 } 569 } 570 571 574 public void handleAlarm(Alarm alarm) 575 { 576 if (_isClosed) 577 return; 578 579 try { 580 Connection conn = _jdbcManager.getDataSource().getConnection(); 581 try { 582 String consumerTable = _jdbcManager.getConsumerTable(); 583 584 String sql = ("UPDATE " + consumerTable + 585 " SET expire=?" + 586 " WHERE s_id=?"); 587 588 PreparedStatement pstmt = conn.prepareStatement(sql); 589 pstmt.setLong(1, Alarm.getCurrentTime() + TOPIC_TIMEOUT); 590 pstmt.setLong(2, _consumerId); 591 592 pstmt.executeUpdate(); 593 } finally { 594 conn.close(); 595 } 596 } catch (Throwable e) { 597 log.log(Level.WARNING, e.toString(), e); 598 } finally { 599 _alarm.queue(TOPIC_TIMEOUT / 4); 600 } 601 } 602 603 606 public void close() 607 throws JMSException 608 { 609 if (_isClosed) 610 return; 611 _isClosed = true; 612 613 if (_alarm != null) 614 _alarm.dequeue(); 615 616 try { 617 if (_subscriber == null) 618 deleteTopic(); 619 else { 620 _session.unsubscribe(_subscriber); 622 } 623 } catch (Throwable e) { 624 log.log(Level.WARNING, e.toString(), e); 625 } 626 627 super.close(); 628 } 629 630 633 public String toString() 634 { 635 return "JdbcTopicConsumer[" + _topic + "," + _consumerId + "]"; 636 } 637 } 638 | Popular Tags |