1 28 29 package com.caucho.jms.jdbc; 30 31 import com.caucho.config.ConfigException; 32 import com.caucho.config.types.Period; 33 import com.caucho.jdbc.JdbcMetaData; 34 import com.caucho.util.L10N; 35 import com.caucho.util.Log; 36 37 import javax.sql.DataSource ; 38 import java.sql.Connection ; 39 import java.sql.ResultSet ; 40 import java.sql.SQLException ; 41 import java.sql.Statement ; 42 import java.util.logging.Logger ; 43 44 47 public class JdbcManager { 48 static final Logger log = Log.open(JdbcManager.class); 49 static final L10N L = new L10N(JdbcManager.class); 50 51 private DataSource _dataSource; 52 53 private String _messageTable = "resin_jms_message"; 54 55 private String _destinationTable = "resin_jms_destination"; 56 private String _destinationSequence; 57 58 private String _consumerTable = "resin_jms_consumer"; 59 private String _consumerSequence; 60 61 private String _blob; 62 private String _longType; 63 64 private String _tablespace; 66 private boolean _isTruncateBlob; 67 68 private long _purgeInterval = 60000L; 69 70 private JdbcMessage _jdbcMessage; 71 72 private volatile boolean _isInit; 73 74 public JdbcManager() 75 { 76 } 77 78 81 public void setDataSource(DataSource dataSource) 82 { 83 _dataSource = dataSource; 84 } 85 86 89 public DataSource getDataSource() 90 { 91 return _dataSource; 92 } 93 94 97 public String getMessageTable() 98 { 99 return _messageTable; 100 } 101 102 105 public void setMessageTable(String tableName) 106 { 107 _messageTable = tableName; 108 } 109 110 113 public String getDestinationTable() 114 { 115 return _destinationTable; 116 } 117 118 121 public void setDestinationTable(String tableName) 122 { 123 _destinationTable = tableName; 124 } 125 126 129 public String getDestinationSequence() 130 { 131 return _destinationSequence; 132 } 133 134 137 public String getConsumerTable() 138 { 139 return _consumerTable; 140 } 141 142 145 public void setConsumerTable(String tableName) 146 { 147 _consumerTable = tableName; 148 } 149 150 153 public String getConsumerSequence() 154 { 155 return _consumerSequence; 156 } 157 158 161 public JdbcMetaData getMetaData() 162 { 163 return JdbcMetaData.create(_dataSource); 164 } 165 166 169 public String getBlob() 170 { 171 if (_blob == null) 172 _blob = getMetaData().getBlobType(); 173 174 return _blob; 175 } 176 177 180 public void setTablespace(String tablespace) 181 { 182 _tablespace = tablespace; 183 } 184 185 188 public String getTablespace() 189 { 190 return _tablespace; 191 } 192 193 196 public String getLongType() 197 { 198 if (_longType == null) 199 _longType = getMetaData().getLongType(); 200 201 return _longType; 202 } 203 204 207 public void setPurgeInterval(Period period) 208 { 209 _purgeInterval = period.getPeriod(); 210 } 211 212 215 public long getPurgeInterval() 216 { 217 return _purgeInterval; 218 } 219 220 223 public JdbcMessage getJdbcMessage() 224 { 225 return _jdbcMessage; 226 } 227 228 231 public boolean isTruncateBlob() 232 { 233 return _isTruncateBlob; 234 } 235 236 239 public void init() 240 throws ConfigException, SQLException 241 { 242 if (_isInit) 243 return; 244 _isInit = true; 245 246 if (_dataSource == null) 247 throw new ConfigException(L.l("JdbcManager requires a <data-source> element.")); 248 249 _jdbcMessage = new JdbcMessage(this); 250 251 _jdbcMessage.init(); 252 253 initDestinationTable(); 254 initConsumerTable(); 255 256 _isTruncateBlob = getMetaData().isTruncateBlobBeforeDelete(); 257 } 258 259 262 protected void initDestinationTable() 263 throws SQLException 264 { 265 Connection conn = _dataSource.getConnection(); 266 267 if (! getMetaData().supportsIdentity()) 268 _destinationSequence = _destinationTable + "_cseq"; 269 270 try { 271 Statement stmt = conn.createStatement(); 272 String sql = "SELECT 1 FROM " + _destinationTable + " WHERE 1=0"; 273 274 try { 275 ResultSet rs = stmt.executeQuery(sql); 276 rs.next(); 277 rs.close(); 278 stmt.close(); 279 280 return; 281 } catch (SQLException e) { 282 log.finest(e.toString()); 283 } 284 285 log.info(L.l("creating JMS destination table {0}", _destinationTable)); 286 287 String identity = ""; 288 289 if (getMetaData().supportsIdentity()) 290 identity = " auto_increment"; 291 292 String longType = getLongType(); 293 294 sql = ("CREATE TABLE " + _destinationTable + " (" + 295 " id " + longType + " PRIMARY KEY " + identity + "," + 296 " name VARCHAR(255)," + 297 " is_topic INTEGER" + 298 ")"); 299 300 stmt.executeUpdate(sql); 301 302 if (! getMetaData().supportsIdentity()) { 303 _destinationSequence = _destinationTable + "_cseq"; 304 305 stmt.executeUpdate(getMetaData().createSequenceSQL(_destinationSequence, 1)); 306 } 307 } finally { 308 conn.close(); 309 } 310 } 311 312 315 protected void initConsumerTable() 316 throws SQLException 317 { 318 if (! getMetaData().supportsIdentity()) 319 _consumerSequence = _consumerTable + "_cseq"; 320 321 Connection conn = _dataSource.getConnection(); 322 try { 323 Statement stmt = conn.createStatement(); 324 String sql = "SELECT 1 FROM " + _consumerTable + " WHERE 1=0"; 325 326 try { 327 ResultSet rs = stmt.executeQuery(sql); 328 rs.next(); 329 rs.close(); 330 stmt.close(); 331 332 return; 333 } catch (SQLException e) { 334 log.finest(e.toString()); 335 } 336 337 log.info(L.l("creating JMS subscriber table {0}", _consumerTable)); 338 339 String longType = getLongType(); 340 String identity = ""; 341 342 if (getMetaData().supportsIdentity()) 343 identity = " auto_increment"; 344 345 sql = ("CREATE TABLE " + _consumerTable + " (" + 346 " s_id " + longType + " PRIMARY KEY " + identity + "," + 347 " queue " + longType + "," + 348 " client VARCHAR(255)," + 349 " name VARCHAR(255)," + 350 " expire " + longType + "," + 351 " read_id " + longType + "," + 352 " ack_id " + longType + 353 ")"); 354 355 stmt.executeUpdate(sql); 356 357 if (_consumerSequence != null) 358 stmt.executeUpdate(getMetaData().createSequenceSQL(_consumerSequence, 1)); 359 } finally { 360 conn.close(); 361 } 362 } 363 364 367 public int hashCode() 368 { 369 if (_dataSource == null) 370 return 0; 371 else 372 return _dataSource.hashCode(); 373 } 374 375 378 public boolean equals(Object o) 379 { 380 if (this == o) 381 return true; 382 else if (! (o instanceof JdbcManager)) 383 return false; 384 385 JdbcManager manager = (JdbcManager) o; 386 387 return _dataSource != null && _dataSource.equals(manager._dataSource); 388 } 389 } 390 391 | Popular Tags |