1 29 30 package com.caucho.jms.jdbc; 31 32 import com.caucho.config.ConfigException; 33 import com.caucho.jdbc.JdbcMetaData; 34 import com.caucho.jdbc.OracleMetaData; 35 import com.caucho.jms.JMSExceptionWrapper; 36 import com.caucho.jms.message.BytesMessageImpl; 37 import com.caucho.jms.message.MapMessageImpl; 38 import com.caucho.jms.message.MessageImpl; 39 import com.caucho.jms.message.ObjectMessageImpl; 40 import com.caucho.jms.message.StreamMessageImpl; 41 import com.caucho.jms.message.TextMessageImpl; 42 import com.caucho.jms.selector.Selector; 43 import com.caucho.util.CharBuffer; 44 import com.caucho.util.L10N; 45 import com.caucho.vfs.ByteToChar; 46 import com.caucho.vfs.ContextLoaderObjectInputStream; 47 import com.caucho.vfs.TempStream; 48 import com.caucho.vfs.WriteStream; 49 50 import javax.jms.*; 51 import javax.sql.DataSource ; 52 import java.io.EOFException ; 53 import java.io.IOException ; 54 import java.io.InputStream ; 55 import java.io.ObjectInputStream ; 56 import java.io.ObjectOutputStream ; 57 import java.sql.Connection ; 58 import java.sql.PreparedStatement ; 59 import java.sql.ResultSet ; 60 import java.sql.SQLException ; 61 import java.sql.Statement ; 62 import java.sql.Types ; 63 import java.util.Enumeration ; 64 import java.util.logging.Level ; 65 import java.util.logging.Logger ; 66 67 70 public class JdbcMessage 71 { 72 static final Logger log = Logger.getLogger(JdbcMessage.class.getName()); 73 static final L10N L = new L10N(JdbcMessage.class); 74 75 private static final int MESSAGE = 0; 76 private static final int TEXT = 1; 77 private static final int BYTES = 2; 78 private static final int STREAM = 3; 79 private static final int OBJECT = 4; 80 private static final int MAP = 5; 81 82 private JdbcManager _jdbcManager; 83 private DataSource _dataSource; 84 85 private String _messageTable; 86 private String _messageSequence; 87 88 private boolean _isOracle; 89 90 public JdbcMessage(JdbcManager jdbcManager) 91 { 92 _jdbcManager = jdbcManager; 93 } 94 95 98 public void init() 99 throws ConfigException, SQLException 100 { 101 _messageTable = _jdbcManager.getMessageTable(); 102 _dataSource = _jdbcManager.getDataSource(); 103 104 JdbcMetaData metaData = _jdbcManager.getMetaData(); 105 106 _isOracle = metaData instanceof OracleMetaData; 107 108 String identity = ""; 109 110 if (metaData.supportsIdentity()) 111 identity = " auto_increment"; 112 else 113 _messageSequence = _messageTable + "_cseq"; 114 115 Connection conn = _dataSource.getConnection(); 116 try { 117 Statement stmt = conn.createStatement(); 118 String sql = "SELECT 1 FROM " + _messageTable + " WHERE 1=0"; 119 120 try { 121 ResultSet rs = stmt.executeQuery(sql); 122 rs.next(); 123 rs.close(); 124 stmt.close(); 125 126 return; 127 } catch (SQLException e) { 128 log.finest(e.toString()); 129 } 130 131 String blob = _jdbcManager.getBlob(); 132 String longType = _jdbcManager.getLongType(); 133 134 log.info(L.l("creating JMS message table {0}", _messageTable)); 135 136 sql = ("CREATE TABLE " + _messageTable + " (" + 137 " m_id " + longType + " PRIMARY KEY" + identity + "," + 138 " queue INTEGER NOT NULL," + 139 " conn VARCHAR(255)," + 140 " consumer " + longType + "," + 141 " delivered INTEGER NOT NULL," + 142 " msg_type INTEGER NOT NULL," + 143 " expire " + longType + " NOT NULL," + 144 " header " + blob + "," + 145 " body " + blob + 146 ")"); 147 148 if (_isOracle) { 149 String extent = ""; 150 151 if (_jdbcManager.getTablespace() != null) { 152 extent = " tablespace " + _jdbcManager.getTablespace(); 153 } 154 155 sql += (" LOB(header) STORE AS (cache retention" + extent + ")"); 159 sql += (" LOB(body) STORE AS (cache retention" + extent + ")"); 160 } 161 162 stmt.executeUpdate(sql); 163 164 if (_messageSequence != null) { 165 stmt.executeUpdate(metaData.createSequenceSQL(_messageSequence, 1)); 166 } 167 } finally { 168 conn.close(); 169 } 170 } 171 172 175 public long send(Message message, int queue, long expireTime) 176 throws SQLException , IOException , JMSException 177 { 178 if (log.isLoggable(Level.FINE)) 179 log.fine("jms jdbc queue:" + queue + " send message"); 180 181 TempStream header = new TempStream(); 182 header.openWrite(); 183 184 WriteStream ws = new WriteStream(header); 185 writeMessageHeader(ws, message); 186 ws.close(); 187 188 TempStream body = null; 189 190 int type = MESSAGE; 191 192 if (message instanceof TextMessage) { 193 TextMessage text = (TextMessage) message; 194 195 type = TEXT; 196 197 if (text.getText() != null) { 198 body = new TempStream(); 199 body.openWrite(); 200 201 ws = new WriteStream(body); 202 ws.setEncoding("UTF-8"); 203 ws.print(text.getText()); 204 ws.close(); 205 } 206 } 207 else if (message instanceof BytesMessage) { 208 BytesMessage bytes = (BytesMessage) message; 209 210 type = BYTES; 211 212 body = writeBytes(bytes); 213 } 214 else if (message instanceof StreamMessage) { 215 StreamMessage stream = (StreamMessage) message; 216 217 type = STREAM; 218 219 body = writeStream(stream); 220 } 221 else if (message instanceof ObjectMessage) { 222 ObjectMessage obj = (ObjectMessage) message; 223 224 type = OBJECT; 225 226 body = writeObject(obj); 227 } 228 else if (message instanceof MapMessage) { 229 MapMessage obj = (MapMessage) message; 230 231 type = MAP; 232 233 body = writeMap(obj); 234 } 235 236 Connection conn = _dataSource.getConnection(); 237 try { 238 String sql; 239 240 if (_messageSequence != null) { 241 sql = _jdbcManager.getMetaData().selectSequenceSQL(_messageSequence); 242 243 PreparedStatement pstmt = conn.prepareStatement(sql);; 244 245 long mId = -1; 246 247 ResultSet rs = pstmt.executeQuery(); 248 if (rs.next()) 249 mId = rs.getLong(1); 250 else 251 throw new RuntimeException ("can't create message"); 252 253 sql = ("INSERT INTO " + _messageTable + 254 "(m_id, queue, msg_type, expire, delivered, header, body) " + 255 "VALUES (?,?,?,?,0,?,?)"); 256 257 pstmt = conn.prepareStatement(sql); 258 259 int i = 1; 260 pstmt.setLong(i++, mId); 261 pstmt.setInt(i++, queue); 262 pstmt.setInt(i++, type); 263 pstmt.setLong(i++, expireTime); 264 265 if (header.getLength() > 0) 266 pstmt.setBinaryStream(i++, header.openRead(), header.getLength()); 267 else 268 pstmt.setNull(i++, Types.BINARY); 269 270 if (body != null) 271 pstmt.setBinaryStream(i++, body.openRead(), body.getLength()); 272 else 273 pstmt.setString(i++, ""); 274 275 pstmt.executeUpdate(); 276 } 277 else { 278 sql = ("INSERT INTO " + _messageTable + 279 "(queue, msg_type, expire, delivered, header, body) " + 280 "VALUES (?,?,?,0,?,?)"); 281 PreparedStatement pstmt; 282 283 pstmt = conn.prepareStatement(sql); 284 285 int i = 1; 286 pstmt.setInt(i++, queue); 287 pstmt.setInt(i++, type); 288 pstmt.setLong(i++, expireTime); 289 pstmt.setBinaryStream(i++, header.openRead(), header.getLength()); 290 291 if (body != null) 292 pstmt.setBinaryStream(i++, body.openRead(), body.getLength()); 293 else 294 pstmt.setString(i++, ""); 295 296 pstmt.executeUpdate(); 297 } 298 299 return 0; 300 } finally { 301 conn.close(); 302 } 303 } 304 305 308 MessageImpl receive(int queue, int session) 309 throws SQLException , IOException , JMSException 310 { 311 long minId = -1; 312 313 Connection conn = _dataSource.getConnection(); 314 try { 315 String sql = ("SELECT m_id, msg_type, delivered, body, header" + 316 " FROM " + _messageTable + 317 " WHERE ?<id AND queue=? AND consumer IS NULL" + 318 " ORDER BY id"); 319 320 PreparedStatement selectStmt = conn.prepareStatement(sql); 321 322 323 sql = ("UPDATE " + _messageTable + " SET consumer=?, delivered=1 " + 324 "WHERE m_id=? AND consumer IS NULL"); 325 326 PreparedStatement updateStmt = conn.prepareStatement(sql); 327 328 long id = -1; 329 while (true) { 330 id = -1; 331 332 selectStmt.setLong(1, minId); 333 selectStmt.setInt(2, queue); 334 335 MessageImpl msg = null; 336 337 ResultSet rs = selectStmt.executeQuery(); 338 while (rs.next()) { 339 id = rs.getLong(1); 340 341 minId = id; 342 343 msg = readMessage(rs); 344 } 345 346 rs.close(); 347 348 if (msg == null) 349 return null; 350 351 updateStmt.setInt(1, session); 352 updateStmt.setLong(2, id); 353 354 int updateCount = updateStmt.executeUpdate(); 355 356 if (updateCount == 1) 357 return msg; 358 else if (log.isLoggable(Level.FINE)) { 359 log.fine("JdbcMessageQueue[" + queue + "] can't update received message " + id + " for session " + session +"."); 360 } 361 } 362 } finally { 363 conn.close(); 364 } 365 } 366 367 370 void acknowledge(int session) 371 throws SQLException 372 { 373 Connection conn = _dataSource.getConnection(); 374 375 try { 376 String sql = ("DELETE FROM " + _messageTable + " " + 377 "WHERE consumer=?"); 378 379 PreparedStatement pstmt; 380 pstmt = conn.prepareStatement(sql); 381 382 pstmt.setInt(1, session); 383 384 pstmt.executeUpdate(); 385 386 pstmt.close(); 387 } finally { 388 conn.close(); 389 } 390 } 391 392 395 MessageImpl readMessage(ResultSet rs) 396 throws SQLException , IOException , JMSException 397 { 398 int msgType = rs.getInt(2); 399 boolean redelivered = rs.getInt(3) == 1; 400 401 MessageImpl msg; 402 403 switch (msgType) { 404 case TEXT: 405 { 406 InputStream is = rs.getBinaryStream(4); 407 408 try { 409 msg = readTextMessage(is); 410 } finally { 411 if (is != null) 412 is.close(); 413 } 414 break; 415 } 416 417 case BYTES: 418 { 419 InputStream is = rs.getBinaryStream(4); 420 421 try { 422 msg = readBytesMessage(is); 423 } finally { 424 if (is != null) 425 is.close(); 426 } 427 break; 428 } 429 430 case STREAM: 431 { 432 InputStream is = rs.getBinaryStream(4); 433 434 try { 435 msg = readStreamMessage(is); 436 } finally { 437 if (is != null) 438 is.close(); 439 } 440 break; 441 } 442 443 case OBJECT: 444 { 445 InputStream is = rs.getBinaryStream(4); 446 447 try { 448 msg = readObjectMessage(is); 449 } finally { 450 if (is != null) 451 is.close(); 452 } 453 break; 454 } 455 456 case MAP: 457 { 458 InputStream is = rs.getBinaryStream(4); 459 460 try { 461 msg = readMapMessage(is); 462 } finally { 463 if (is != null) 464 is.close(); 465 } 466 break; 467 } 468 469 case MESSAGE: 470 default: 471 { 472 msg = new MessageImpl(); 473 break; 474 } 475 } 476 477 InputStream is = rs.getBinaryStream(5); 478 479 if (is != null) { 480 try { 481 readMessageHeader(is, msg); 482 } finally { 483 is.close(); 484 } 485 } 486 487 msg.setJMSRedelivered(redelivered); 488 489 return msg; 490 } 491 492 495 private void writeMessageHeader(WriteStream ws, Message msg) 496 throws IOException , JMSException 497 { 498 Enumeration names = msg.getPropertyNames(); 499 CharBuffer cb = new CharBuffer(); 500 501 while (names.hasMoreElements()) { 502 String name = (String ) names.nextElement(); 503 writeValue(ws, cb, name); 504 505 String value = msg.getStringProperty(name); 506 writeValue(ws, cb, value); 507 } 508 } 509 510 513 private void writeValue(WriteStream ws, CharBuffer cb, Object value) 514 throws IOException 515 { 516 if (value == null) 517 ws.write('N'); 518 else { 519 cb.clear(); 520 cb.append(value); 521 int length = cb.length(); 522 char []buf = cb.getBuffer(); 523 524 ws.write('S'); 525 ws.write(length >> 24); 526 ws.write(length >> 16); 527 ws.write(length >> 8); 528 ws.write(length); 529 530 for (int i = 0; i < length; i++) { 531 int ch = buf[i]; 532 533 ws.write(ch >> 8); 534 ws.write(ch); 535 } 536 } 537 } 538 539 542 private TempStream writeBytes(BytesMessage bytes) 543 throws IOException , JMSException 544 { 545 TempStream body = new TempStream(); 546 body.openWrite(); 547 548 WriteStream ws = new WriteStream(body); 549 550 int data; 551 553 while ((data = bytes.readUnsignedByte()) >= 0) { 554 ws.write(data); 555 } 556 557 ws.close(); 558 559 return body; 560 } 561 562 565 private TempStream writeStream(StreamMessage stream) 566 throws IOException , JMSException 567 { 568 TempStream body = new TempStream(); 569 body.openWrite(); 570 571 WriteStream ws = new WriteStream(body); 572 ObjectOutputStream out = new ObjectOutputStream (ws); 573 574 try { 575 while (true) { 576 Object data = stream.readObject(); 577 578 out.writeObject(data); 579 } 580 } catch (MessageEOFException e) { 581 } 582 583 out.close(); 584 ws.close(); 585 586 return body; 587 } 588 589 592 private TempStream writeObject(ObjectMessage obj) 593 throws IOException , JMSException 594 { 595 TempStream body = new TempStream(); 596 body.openWrite(); 597 598 WriteStream ws = new WriteStream(body); 599 ObjectOutputStream out = new ObjectOutputStream (ws); 600 601 out.writeObject(obj.getObject()); 602 603 out.close(); 604 ws.close(); 605 606 return body; 607 } 608 609 612 private TempStream writeMap(MapMessage map) 613 throws IOException , JMSException 614 { 615 TempStream body = new TempStream(); 616 body.openWrite(); 617 618 WriteStream ws = new WriteStream(body); 619 ObjectOutputStream out = new ObjectOutputStream (ws); 620 621 try { 622 Enumeration e = map.getMapNames(); 623 while (e.hasMoreElements()) { 624 String name = (String ) e.nextElement(); 625 out.writeUTF(name); 626 627 Object data = map.getObject(name); 628 out.writeObject(data); 629 } 630 } catch (MessageEOFException e) { 631 } 632 633 out.close(); 634 ws.close(); 635 636 return body; 637 } 638 639 642 private void readMessageHeader(InputStream is, Message msg) 643 throws IOException , JMSException 644 { 645 CharBuffer cb = new CharBuffer(); 646 647 int type; 648 649 while ((type = is.read()) > 0) { 650 String name = (String ) readValue(is, type, cb); 651 Object value = readValue(is, is.read(), cb); 652 653 msg.setObjectProperty(name, value); 654 } 655 } 656 657 660 private TextMessageImpl readTextMessage(InputStream is) 661 throws IOException , JMSException 662 { 663 TextMessageImpl text = new TextMessageImpl(); 664 665 if (is == null) 666 return text; 667 668 ByteToChar byteToChar = ByteToChar.create(); 669 670 int ch; 671 672 byteToChar.setEncoding("UTF-8"); 673 while ((ch = is.read()) >= 0) { 674 byteToChar.addByte(ch); 675 } 676 677 text.setText(byteToChar.getConvertedString()); 678 679 return text; 680 } 681 682 685 private BytesMessageImpl readBytesMessage(InputStream is) 686 throws IOException , JMSException 687 { 688 BytesMessageImpl bytes = new BytesMessageImpl(); 689 690 if (is == null) { 691 bytes.reset(); 692 693 return bytes; 694 } 695 696 int data; 697 698 while ((data = is.read()) >= 0) { 699 bytes.writeByte((byte) data); 700 } 701 702 bytes.reset(); 703 704 return bytes; 705 } 706 707 710 private StreamMessageImpl readStreamMessage(InputStream is) 711 throws IOException , JMSException 712 { 713 StreamMessageImpl stream = new StreamMessageImpl(); 714 715 if (is == null) 716 return stream; 717 718 ObjectInputStream in = new ContextLoaderObjectInputStream(is); 719 720 try { 721 while (true) { 722 Object obj = in.readObject(); 723 724 stream.writeObject(obj); 725 } 726 } catch (EOFException e) { 727 } catch (Exception e) { 728 throw new JMSExceptionWrapper(e); 729 } 730 731 in.close(); 732 733 stream.reset(); 734 735 return stream; 736 } 737 738 741 private MapMessageImpl readMapMessage(InputStream is) 742 throws IOException , JMSException 743 { 744 MapMessageImpl map = new MapMessageImpl(); 745 746 if (is == null) 747 return map; 748 749 ObjectInputStream in = new ContextLoaderObjectInputStream(is); 750 751 try { 752 while (true) { 753 String name = in.readUTF(); 754 Object obj = in.readObject(); 755 756 map.setObject(name, obj); 757 } 758 } catch (EOFException e) { 759 } catch (Exception e) { 760 throw new JMSExceptionWrapper(e); 761 } 762 763 in.close(); 764 765 return map; 766 } 767 768 771 private ObjectMessageImpl readObjectMessage(InputStream is) 772 throws IOException , JMSException 773 { 774 ObjectMessageImpl msg = new ObjectMessageImpl(); 775 776 if (is == null) 777 return msg; 778 779 ObjectInputStream in = new ContextLoaderObjectInputStream(is); 780 781 try { 782 Object obj = in.readObject(); 783 msg.setObject((java.io.Serializable ) obj); 784 } catch (IOException e) { 785 throw e; 786 } catch (Exception e) { 787 throw new JMSExceptionWrapper(e); 788 } 789 790 in.close(); 791 792 return msg; 793 } 794 795 798 private Object readValue(InputStream is, int type, CharBuffer cb) 799 throws IOException 800 { 801 switch (type) { 802 case 'N': 803 return null; 804 case 'S': 805 { 806 cb.clear(); 807 int length = readInt(is); 808 809 for (int i = 0; i < length; i++) { 810 char ch = (char) ((is.read() << 8) + is.read()); 811 812 cb.append(ch); 813 } 814 815 return cb.toString(); 816 } 817 default: 818 throw new IOException (L.l("unknown header type")); 819 } 820 } 821 822 825 private int readInt(InputStream is) 826 throws IOException 827 { 828 return ((is.read() << 24) + 829 (is.read() << 16) + 830 (is.read() << 8) + 831 (is.read())); 832 } 833 834 837 private boolean hasMessage(Selector selector) 838 throws JMSException 839 { 840 return false; 841 } 842 843 846 public String toString() 847 { 848 return "JdbcMessage[" + _messageTable + "]"; 849 } 850 } 851 852 | Popular Tags |