1 29 30 package com.caucho.jms.jdbc; 31 32 import com.caucho.config.ConfigException; 33 import com.caucho.jdbc.JdbcMetaData; 34 import com.caucho.jms.AbstractDestination; 35 import com.caucho.util.Alarm; 36 import com.caucho.util.L10N; 37 import com.caucho.util.Log; 38 39 import javax.sql.DataSource ; 40 import java.sql.Connection ; 41 import java.sql.PreparedStatement ; 42 import java.sql.ResultSet ; 43 import java.sql.SQLException ; 44 import java.util.logging.Level ; 45 import java.util.logging.Logger ; 46 47 50 public class JdbcDestination extends AbstractDestination { 51 static final Logger log = Log.open(JdbcDestination.class); 52 static final L10N L = new L10N(JdbcDestination.class); 53 54 protected JdbcManager _jdbcManager = new JdbcManager(); 55 56 private DataSource _dataSource; 57 58 private String _name; 59 60 private long _lastPurgeTime; 61 62 public JdbcDestination() 63 { 64 } 65 66 69 public void setName(String name) 70 { 71 _name = name; 72 } 73 74 77 public String getName() 78 { 79 return _name; 80 } 81 82 85 public boolean isTopic() 86 { 87 return false; 88 } 89 90 93 public void setJdbcManager(JdbcManager jdbcManager) 94 { 95 _jdbcManager = jdbcManager; 96 } 97 98 101 public JdbcManager getJdbcManager() 102 { 103 return _jdbcManager; 104 } 105 106 109 public void setDataSource(DataSource dataSource) 110 { 111 _jdbcManager.setDataSource(dataSource); 112 } 113 114 117 public void setTablespace(String tablespace) 118 { 119 _jdbcManager.setTablespace(tablespace); 120 } 121 122 125 public void init() 126 throws ConfigException, SQLException 127 { 128 _jdbcManager.init(); 129 130 _dataSource = _jdbcManager.getDataSource(); 131 } 132 133 136 protected int createDestination(String name, boolean isTopic) 137 throws SQLException 138 { 139 Connection conn = _jdbcManager.getDataSource().getConnection(); 140 String destinationTable = _jdbcManager.getDestinationTable(); 141 String destinationSequence = _jdbcManager.getDestinationSequence(); 142 143 try { 144 String sql = ("SELECT id FROM " + destinationTable + 145 " WHERE name=? AND is_topic=?"); 146 147 PreparedStatement pstmt = conn.prepareStatement(sql); 148 pstmt.setString(1, name); 149 pstmt.setInt(2, isTopic ? 1 : 0); 150 151 ResultSet rs = pstmt.executeQuery(); 152 if (rs.next()) { 153 return rs.getInt(1); 154 } 155 rs.close(); 156 157 if (destinationSequence != null) { 158 JdbcMetaData metaData = _jdbcManager.getMetaData(); 159 sql = metaData.selectSequenceSQL(destinationSequence); 160 int id = 0; 161 162 pstmt = conn.prepareStatement(sql); 163 164 rs = pstmt.executeQuery(); 165 if (rs.next()) 166 id = rs.getInt(1); 167 else 168 throw new RuntimeException ("can't create sequence"); 169 170 sql = "INSERT INTO " + destinationTable + " (id,name,is_topic) VALUES(?,?,?)"; 171 172 pstmt = conn.prepareStatement(sql); 173 174 pstmt.setInt(1, id); 175 pstmt.setString(2, name); 176 pstmt.setInt(3, isTopic ? 1 : 0); 177 178 pstmt.executeUpdate(); 179 180 if (isTopic) 181 log.fine("JMSTopic[" + name + "," + id + "] created"); 182 else 183 log.fine("JMSQueue[" + name + "," + id + "] created"); 184 185 return id; 186 } 187 else { 188 sql = "INSERT INTO " + destinationTable + " (name,is_topic) VALUES(?,?)"; 189 pstmt = conn.prepareStatement(sql, 190 PreparedStatement.RETURN_GENERATED_KEYS); 191 pstmt.setString(1, name); 192 pstmt.setInt(2, isTopic ? 1 : 0); 193 194 pstmt.executeUpdate(); 195 196 rs = pstmt.getGeneratedKeys(); 197 198 if (rs.next()) { 199 int id = rs.getInt(1); 200 201 if (isTopic) 202 log.fine("JMSTopic[" + name + "," + id + "] created"); 203 else 204 log.fine("JMSQueue[" + name + "," + id + "] created"); 205 206 return id; 207 } 208 else 209 throw new SQLException (L.l("can't generate destination for {0}", 210 name)); 211 } 212 } finally { 213 conn.close(); 214 } 215 } 216 217 220 protected void purgeExpiredMessages() 221 { 222 long purgeInterval = _jdbcManager.getPurgeInterval(); 223 long now = Alarm.getCurrentTime(); 224 225 if (now < _lastPurgeTime + purgeInterval) 226 return; 227 228 _lastPurgeTime = now; 229 230 try { 231 DataSource dataSource = _jdbcManager.getDataSource(); 232 String messageTable = _jdbcManager.getMessageTable(); 233 JdbcMessage jdbcMessage = _jdbcManager.getJdbcMessage(); 234 235 Connection conn = dataSource.getConnection(); 236 try { 237 String sql = ("DELETE FROM " + messageTable + 238 " WHERE expire < ? AND consumer IS NULL"); 239 240 PreparedStatement pstmt = conn.prepareStatement(sql); 241 pstmt.setLong(1, Alarm.getCurrentTime()); 242 243 int count = pstmt.executeUpdate(); 244 245 if (count > 0) 246 log.fine("JMSQueue[" + getName() + "] purged " + count + " expired mesages"); 247 248 pstmt.close(); 249 } finally { 250 conn.close(); 251 } 252 } catch (Exception e) { 253 log.log(Level.FINER, e.toString(), e); 254 } 255 } 256 } 257 258 | Popular Tags |