1 17 18 package org.apache.geronimo.timer.jdbc; 19 20 import com.thoughtworks.xstream.XStream; 21 import org.apache.geronimo.timer.PersistenceException; 22 import org.apache.geronimo.timer.Playback; 23 import org.apache.geronimo.timer.WorkInfo; 24 import org.apache.geronimo.timer.WorkerPersistence; 25 26 import javax.sql.DataSource ; 27 import java.sql.Connection ; 28 import java.sql.PreparedStatement ; 29 import java.sql.ResultSet ; 30 import java.sql.SQLException ; 31 import java.sql.Types ; 32 import java.util.ArrayList ; 33 import java.util.Collection ; 34 import java.util.Date ; 35 36 41 public class JDBCWorkerPersistence implements WorkerPersistence { 42 43 private static final String createSequenceSQL = "create sequence timertasks_seq"; 44 private static final String createTableSQLWithSequence = "create table timertasks (id long primary key, serverid varchar(256) not null, timerkey varchar(256) not null, userid varchar(4096), userinfo varchar(4096), firsttime long not null, period long, atfixedrate boolean not null)"; 45 private static final String createTableSQLWithIdentity = "create table timertasks (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), serverid varchar(256) not null, timerkey varchar(256) not null, userid varchar(4096), userinfo varchar(4096), firsttime NUMERIC(18,0) not null, period NUMERIC(18, 0), atfixedrate CHAR(1))"; 46 private static final String sequenceSQL = "select timertasks_seq.nextval"; 47 private static final String identitySQL = "values IDENTITY_VAL_LOCAL()"; 48 private static final String insertSQLWithSequence = "insert into timertasks (id, serverid, timerkey, userid, userinfo, firsttime, period, atfixedrate) values (?, ?, ?, ?, ?, ?, ?, ?)"; 49 private static final String insertSQLWithIdentity = "insert into timertasks (serverid, timerkey, userid, userinfo, firsttime, period, atfixedrate) values (?, ?, ?, ?, ?, ?, ?)"; 50 private static final String deleteSQL = "delete from timertasks where id=?"; 51 private static final String selectSQL = "select id, userid, userinfo, firsttime, period, atfixedrate from timertasks where serverid = ? and timerkey=?"; 52 private static final String fixedRateUpdateSQL = "update timertasks set firsttime = firsttime + period where id = ?"; 53 private static final String intervalUpdateSQL = "update timertasks set firsttime = ? where id = ?"; 54 private static final String selectByKeySQL = "select id from timertasks where serverid = ? and timerkey = ? and (userid = ? or ? is null)"; 55 56 private final String serverUniqueId; 57 private final DataSource dataSource; 58 private boolean useSequence = false; 59 60 protected JDBCWorkerPersistence(String serverUniqueId, DataSource datasource, boolean useSequence) throws SQLException { 61 this.serverUniqueId = serverUniqueId; 62 this.dataSource = datasource; 63 this.useSequence = useSequence; 64 if (this.useSequence) { 65 execSQL(createSequenceSQL); 66 execSQL(createTableSQLWithSequence); 67 } else { 68 execSQL(createTableSQLWithIdentity); 69 } 70 } 71 72 73 public void save(WorkInfo workInfo) throws PersistenceException { 74 boolean threwException = false; 75 Connection c = getConnection(); 76 try { 77 if (useSequence) { 78 long id; 79 PreparedStatement seqStatement = c.prepareStatement(sequenceSQL); 80 try { 81 ResultSet seqRS = seqStatement.executeQuery(); 82 try { 83 seqRS.next(); 84 id = seqRS.getLong(1); 85 } finally { 86 seqRS.close(); 87 } 88 } finally { 89 seqStatement.close(); 90 } 91 workInfo.setId(id); 92 PreparedStatement insertStatement = c.prepareStatement(insertSQLWithSequence); 93 try { 94 String serializedUserId = serialize(workInfo.getUserId()); 95 String serializedUserKey = serialize(workInfo.getUserInfo()); 96 insertStatement.setLong(1, id); 97 insertStatement.setString(2, serverUniqueId); 98 insertStatement.setString(3, workInfo.getKey()); 99 insertStatement.setString(4, serializedUserId); 100 insertStatement.setString(5, serializedUserKey); 101 insertStatement.setLong(6, workInfo.getTime().getTime()); 102 if (workInfo.getPeriod() == null) { 103 insertStatement.setNull(7, Types.NUMERIC); 104 } else { 105 insertStatement.setLong(7, workInfo.getPeriod().longValue()); 106 } 107 insertStatement.setBoolean(8, workInfo.getAtFixedRate()); 108 int result = insertStatement.executeUpdate(); 109 if (result != 1) { 110 throw new PersistenceException("Could not insert!"); 111 } 112 } finally { 113 insertStatement.close(); 114 } 115 } else { 116 PreparedStatement insertStatement = c.prepareStatement(insertSQLWithIdentity); 117 try { 118 String serializedUserId = serialize(workInfo.getUserId()); 119 String serializedUserKey = serialize(workInfo.getUserInfo()); 120 insertStatement.setString(1, serverUniqueId); 121 insertStatement.setString(2, workInfo.getKey()); 122 insertStatement.setString(3, serializedUserId); 123 insertStatement.setString(4, serializedUserKey); 124 insertStatement.setLong(5, workInfo.getTime().getTime()); 125 if (workInfo.getPeriod() == null) { 126 insertStatement.setNull(6, Types.NUMERIC); 127 } else { 128 insertStatement.setLong(6, workInfo.getPeriod().longValue()); 129 } 130 insertStatement.setBoolean(7, workInfo.getAtFixedRate()); 131 int result = insertStatement.executeUpdate(); 132 if (result != 1) { 133 throw new PersistenceException("Could not insert!"); 134 } 135 } finally { 136 insertStatement.close(); 137 } 138 long id; 139 PreparedStatement identityStatement = c.prepareStatement(identitySQL); 140 try { 141 ResultSet seqRS = identityStatement.executeQuery(); 142 try { 143 seqRS.next(); 144 id = seqRS.getLong(1); 145 } finally { 146 seqRS.close(); 147 } 148 } finally { 149 identityStatement.close(); 150 } 151 workInfo.setId(id); 152 } 153 } catch (SQLException e) { 154 threwException = true; 155 throw new PersistenceException(e); 156 } finally { 157 close(c, !threwException); 158 } 159 } 160 161 public void cancel(long id) throws PersistenceException { 162 boolean threwException = false; 163 164 Connection c = getConnection(); 165 try { 166 PreparedStatement deleteStatement = c.prepareStatement(deleteSQL); 167 try { 168 deleteStatement.setLong(1, id); 169 deleteStatement.execute(); 170 } finally { 171 deleteStatement.close(); 172 } 173 } catch (SQLException e) { 174 threwException = true; 175 throw new PersistenceException(e); 176 } finally { 177 close(c, !threwException); 178 } 179 } 180 181 public void playback(String key, Playback playback) throws PersistenceException { 182 boolean threwException = false; 183 Connection c = getConnection(); 184 try { 185 PreparedStatement selectStatement = c.prepareStatement(selectSQL); 186 selectStatement.setString(1, serverUniqueId); 187 selectStatement.setString(2, key); 188 try { 189 ResultSet taskRS = selectStatement.executeQuery(); 190 try { 191 while (taskRS.next()) { 192 long id = taskRS.getLong(1); 193 String serizalizedUserId = taskRS.getString(2); 194 Object userId = deserialize(serizalizedUserId); 195 String serializedUserInfo = taskRS.getString(3); 196 Object userInfo = deserialize(serializedUserInfo); 197 long timeMillis = taskRS.getLong(4); 198 Date time = new Date (timeMillis); 199 Long period = null; 200 period = new Long (taskRS.getLong(5)); 201 if (!taskRS.wasNull()) { 202 period = null; 203 } 204 boolean atFixedRate = taskRS.getBoolean(6); 205 WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time, period, atFixedRate); 207 workInfo.setId(id); 208 playback.schedule(workInfo); 209 } 210 } finally { 211 taskRS.close(); 212 } 213 } finally { 214 selectStatement.close(); 215 } 216 } catch (SQLException e) { 217 threwException = true; 218 throw new PersistenceException(e); 219 } finally { 220 close(c, !threwException); 221 } 222 } 223 224 public void fixedRateWorkPerformed(long id) throws PersistenceException { 225 boolean threwException = false; 226 Connection c = getConnection(); 227 try { 228 PreparedStatement updateStatement = c.prepareStatement(fixedRateUpdateSQL); 229 try { 230 updateStatement.setLong(1, id); 231 updateStatement.execute(); 232 } finally { 233 updateStatement.close(); 234 } 235 } catch (SQLException e) { 236 threwException = true; 237 throw new PersistenceException(e); 238 } finally { 239 close(c, !threwException); 240 } 241 } 242 243 public void intervalWorkPerformed(long id, long period) throws PersistenceException { 244 boolean threwException = false; 245 long next = System.currentTimeMillis() + period; 246 Connection c = getConnection(); 247 try { 248 PreparedStatement updateStatement = c.prepareStatement(intervalUpdateSQL); 249 try { 250 updateStatement.setLong(1, next); 251 updateStatement.setLong(2, id); 252 updateStatement.execute(); 253 } finally { 254 updateStatement.close(); 255 } 256 } catch (SQLException e) { 257 threwException = true; 258 throw new PersistenceException(e); 259 } finally { 260 close(c, !threwException); 261 } 262 } 263 264 public Collection getIdsByKey(String key, Object userId) throws PersistenceException { 265 boolean threwException = false; 266 267 Collection ids = new ArrayList (); 268 Connection c = getConnection(); 269 try { 270 PreparedStatement selectStatement = c.prepareStatement(selectByKeySQL); 271 selectStatement.setString(1, serverUniqueId); 272 selectStatement.setString(2, key); 273 if (userId == null) { 274 selectStatement.setNull(3, Types.VARCHAR); 275 selectStatement.setNull(4, Types.VARCHAR); 276 } else { 277 String userIdString = serialize(userId); 278 selectStatement.setString(3, userIdString); 279 selectStatement.setString(4, userIdString); 280 } 281 try { 282 ResultSet taskRS = selectStatement.executeQuery(); 283 try { 284 while (taskRS.next()) { 285 long id = taskRS.getLong(1); 286 ids.add(new Long (id)); 287 } 288 } finally { 289 taskRS.close(); 290 } 291 } finally { 292 selectStatement.close(); 293 } 294 } catch (SQLException e) { 295 threwException = true; 296 throw new PersistenceException(e); 297 } finally { 298 close(c, !threwException); 299 } 300 301 return ids; 302 } 303 304 private String serialize(Object task) { 305 XStream xStream = new XStream(); 306 return xStream.toXML(task); 307 } 308 309 private Object deserialize(String serializedRunnable) { 310 XStream xStream = new XStream(); 311 return xStream.fromXML(serializedRunnable); 312 } 313 314 private void execSQL(String sql) throws SQLException { 315 Connection c = dataSource.getConnection(); 316 try { 317 PreparedStatement updateStatement = c.prepareStatement(sql); 318 try { 319 updateStatement.execute(); 320 } catch (SQLException e) { 321 } finally { 323 updateStatement.close(); 324 } 325 } finally { 326 c.close(); 327 } 328 } 329 330 private Connection getConnection() throws PersistenceException { 331 try { 332 return dataSource.getConnection(); 333 } catch (Exception e) { 334 throw new PersistenceException(e); 335 } 336 } 337 338 private void close(Connection c, boolean reportSqlException) throws PersistenceException { 339 try { 340 c.close(); 341 } catch (Exception e) { 342 if (!reportSqlException) { 343 throw new PersistenceException(e); 344 } 345 } 346 } 347 } 348 | Popular Tags |