1 22 package org.jboss.mq.pm.jdbc2; 23 24 import java.io.IOException ; 25 import java.sql.Connection ; 26 import java.sql.PreparedStatement ; 27 import java.sql.ResultSet ; 28 import java.sql.SQLException ; 29 30 import javax.jms.JMSException ; 31 32 import org.jboss.mq.SpyMessage; 33 import org.jboss.mq.pm.Tx; 34 35 43 public class OracleThinPersistenceManager extends PersistenceManager 44 { 45 46 protected String INSERT_EMPTY_BLOB = "INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,EMPTY_BLOB(),?,?)"; 47 48 protected String LOCK_EMPTY_BLOB = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID = ? AND DESTINATION = ? FOR UPDATE"; 49 50 55 public OracleThinPersistenceManager() throws JMSException 56 { 57 } 58 59 protected void add(Connection c, String queue, SpyMessage message, Tx txId, String mark) throws SQLException , IOException 60 { 61 PreparedStatement stmt = null; 62 ResultSet rs = null; 63 try 64 { 65 stmt = c.prepareStatement(INSERT_EMPTY_BLOB); 66 67 stmt.setLong(1, message.header.messageId); 68 stmt.setString(2, queue); 69 70 if (txId != null) 71 stmt.setLong(3, txId.longValue()); 72 else 73 stmt.setNull(3, java.sql.Types.BIGINT); 74 stmt.setString(4, mark); 75 76 int count = stmt.executeUpdate(); 77 safeClose(stmt, null); 78 if (count != 1) 79 throw new IOException ("Could not insert empty blob in the database: insert affected " + count + " rows. message=" + message); 80 81 stmt = c.prepareStatement(LOCK_EMPTY_BLOB); 82 stmt.setLong(1, message.header.messageId); 83 stmt.setString(2, queue); 84 85 rs = stmt.executeQuery(); 86 if (rs.next() == false) 87 throw new IOException ("Could not lock empty blob in the database. message=" + message); 88 safeClose(stmt, rs); 89 90 stmt = c.prepareStatement(UPDATE_MESSAGE); 91 setBlob(stmt, 1, message); 92 stmt.setLong(2, message.header.messageId); 93 stmt.setString(3, queue); 94 95 count = stmt.executeUpdate(); 96 safeClose(stmt, null); 97 if (count != 1) 98 throw new IOException ("Could not update real blob in the database: update affected " + count + " rows. message=" + message); 99 } 100 finally 101 { 102 safeClose(stmt, rs); 103 } 104 } 105 106 public void startService() throws Exception 107 { 108 INSERT_EMPTY_BLOB = sqlProperties.getProperty("INSERT_EMPTY_BLOB", INSERT_EMPTY_BLOB); 109 LOCK_EMPTY_BLOB = sqlProperties.getProperty("LOCK_EMPTY_BLOB", LOCK_EMPTY_BLOB); 110 super.startService(); 111 } 112 113 protected void safeClose(PreparedStatement stmt, ResultSet rs) 114 { 115 try 116 { 117 if (rs != null) 118 { 119 rs.close(); 120 } 121 } 122 catch (SQLException ignored) 123 { 124 log.trace("Ignored", ignored); 125 } 126 try 127 { 128 if (stmt != null) 129 { 130 stmt.close(); 131 } 132 } 133 catch (SQLException ignored) 134 { 135 log.trace("Ignored", ignored); 136 } 137 } 138 } 139 | Popular Tags |