KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > server > cluster > FileBacking


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  *
23  * Free Software Foundation, Inc.
24  * 59 Temple Place, Suite 330
25  * Boston, MA 02111-1307 USA
26  *
27  * @author Scott Ferguson
28  */

29
30 package com.caucho.server.cluster;
31
32 import com.caucho.config.ConfigException;
33 import com.caucho.db.jdbc.DataSourceImpl;
34 import com.caucho.util.Alarm;
35 import com.caucho.util.FreeList;
36 import com.caucho.util.L10N;
37 import com.caucho.util.Log;
38 import com.caucho.vfs.Path;
39 import com.caucho.vfs.ReadStream;
40 import com.caucho.vfs.WriteStream;
41
42 import javax.sql.DataSource JavaDoc;
43 import java.io.IOException JavaDoc;
44 import java.io.InputStream JavaDoc;
45 import java.sql.Connection JavaDoc;
46 import java.sql.PreparedStatement JavaDoc;
47 import java.sql.ResultSet JavaDoc;
48 import java.sql.SQLException JavaDoc;
49 import java.sql.Statement JavaDoc;
50 import java.util.logging.Level JavaDoc;
51 import java.util.logging.Logger JavaDoc;
52
53
54 /**
55  * Manages the backing for the file store.
56  */

57 public class FileBacking {
58   private static final L10N L = new L10N(FileBacking.class);
59   private static final Logger JavaDoc log = Log.open(FileBacking.class);
60   
61   private FreeList<ClusterConnection> _freeConn
62     = new FreeList<ClusterConnection>(32);
63   
64   private String JavaDoc _name;
65   
66   private Path _path;
67
68   private DataSource JavaDoc _dataSource;
69
70   private String JavaDoc _tableName;
71   private String JavaDoc _loadQuery;
72   private String JavaDoc _updateQuery;
73   private String JavaDoc _accessQuery;
74   private String JavaDoc _setExpiresQuery;
75   private String JavaDoc _insertQuery;
76   private String JavaDoc _invalidateQuery;
77   private String JavaDoc _timeoutQuery;
78   private String JavaDoc _dumpQuery;
79   private String JavaDoc _countQuery;
80
81   /**
82    * Returns the path to the directory.
83    */

84   public Path getPath()
85   {
86     return _path;
87   }
88
89   /**
90    * Sets the path to the saved file.
91    */

92   public void setPath(Path path)
93   {
94     _path = path;
95   }
96
97   /**
98    * Sets the table name
99    */

100   public void setTableName(String JavaDoc table)
101   {
102     _tableName = table;
103   }
104
105   public boolean init(int clusterLength)
106     throws Exception JavaDoc
107   {
108     if (_path == null)
109       throw new ConfigException(L.l("file-backing needs path."));
110     
111     if (_tableName == null)
112       throw new ConfigException(L.l("file-backing needs tableName."));
113
114     int length = clusterLength;
115
116     if (length <= 0)
117       length = 1;
118     
119     _loadQuery = "SELECT access_time,data FROM " + _tableName + " WHERE id=?";
120     _insertQuery = ("INSERT into " + _tableName + " (id,data,mod_time,access_time,expire_interval,server1,server2,server3) " +
121             "VALUES(?,?,?,?,?,?,?,?)");
122     _updateQuery = "UPDATE " + _tableName + " SET data=?, mod_time=?, access_time=? WHERE id=?";
123     _accessQuery = "UPDATE " + _tableName + " SET access_time=? WHERE id=?";
124     _setExpiresQuery = "UPDATE " + _tableName + " SET expire_interval=? WHERE id=?";
125     _invalidateQuery = "DELETE FROM " + _tableName + " WHERE id=?";
126
127     // access window is 1/4 the expire interval
128
_timeoutQuery = "DELETE FROM " + _tableName + " WHERE access_time + 5 * expire_interval / 4 < ?";
129
130     _dumpQuery = ("SELECT id, expire_interval, data FROM " + _tableName +
131           " WHERE ? <= mod_time AND " +
132           " (?=server1 OR ?=server2 OR ?=server3)");
133     
134     _countQuery = "SELECT count(*) FROM " + _tableName;
135     
136     try {
137       _path.mkdirs();
138     } catch (IOException JavaDoc e) {
139     }
140
141     DataSourceImpl dataSource = new DataSourceImpl();
142     dataSource.setPath(_path);
143     dataSource.setRemoveOnError(true);
144     dataSource.init();
145     
146     _dataSource = dataSource;
147
148     initDatabase();
149
150     return true;
151   }
152
153   /**
154    * Returns the data source.
155    */

156   public DataSource JavaDoc getDataSource()
157   {
158     return _dataSource;
159   }
160
161   /**
162    * Create the database, initializing if necessary.
163    */

164   private void initDatabase()
165     throws Exception JavaDoc
166   {
167     Connection JavaDoc conn = _dataSource.getConnection();
168
169     try {
170       Statement JavaDoc stmt = conn.createStatement();
171       
172       boolean hasDatabase = false;
173
174       try {
175     String JavaDoc sql = "SELECT expire_interval FROM " + _tableName + " WHERE 1=0";
176
177     ResultSet JavaDoc rs = stmt.executeQuery(sql);
178     rs.next();
179     rs.close();
180
181     return;
182       } catch (Throwable JavaDoc e) {
183     log.finer(e.toString());
184       }
185
186       try {
187     stmt.executeQuery("DROP TABLE " + _tableName);
188       } catch (Throwable JavaDoc e) {
189     log.log(Level.FINEST, e.toString(), e);
190       }
191
192       String JavaDoc sql = ("CREATE TABLE " + _tableName + " (\n" +
193             " id VARCHAR(128) PRIMARY KEY,\n" +
194             " data BLOB,\n" +
195             " expire_interval INTEGER,\n" +
196             " access_time INTEGER,\n" +
197             " mod_time INTEGER,\n" +
198             " mod_count BIGINT,\n" +
199             " server1 INTEGER,\n" +
200             " server2 INTEGER,\n" +
201             " server3 INTEGER)");
202
203       log.fine(sql);
204
205       stmt.executeUpdate(sql);
206     } finally {
207       conn.close();
208     }
209   }
210
211   public long start()
212     throws Exception JavaDoc
213   {
214     long delta = - Alarm.getCurrentTime();
215
216     Connection JavaDoc conn = null;
217     try {
218       conn = _dataSource.getConnection();
219       
220       Statement JavaDoc stmt = conn.createStatement();
221
222       String JavaDoc sql = "SELECT MAX(access_time) FROM " + _tableName;
223
224       ResultSet JavaDoc rs = stmt.executeQuery(sql);
225
226       if (rs.next())
227     delta = rs.getInt(1) * 60000L - Alarm.getCurrentTime();
228     } finally {
229       if (conn != null)
230     conn.close();
231     }
232
233     return delta;
234   }
235
236   /**
237    * Clears the old objects.
238    */

239   public void clearOldObjects(long maxIdleTime)
240     throws SQLException JavaDoc
241   {
242     Connection JavaDoc conn = null;
243
244     try {
245       if (maxIdleTime > 0) {
246     conn = _dataSource.getConnection();
247
248     PreparedStatement JavaDoc pstmt = conn.prepareStatement(_timeoutQuery);
249   
250         long now = Alarm.getCurrentTime();
251         int nowMinute = (int) (now / 60000L);
252     
253     pstmt.setInt(1, nowMinute);
254
255         int count = pstmt.executeUpdate();
256
257     // System.out.println("OBSOLETE:" + count);
258

259     if (count > 0)
260       log.fine(this + " purged " + count + " old sessions");
261
262     pstmt.close();
263       }
264     } finally {
265       if (conn != null)
266     conn.close();
267     }
268   }
269
270   /**
271    * Load the session from the jdbc store.
272    *
273    * @param session the session to fill.
274    *
275    * @return true if the load was valid.
276    */

277   public boolean loadSelf(ClusterObject clusterObj, Object JavaDoc obj)
278     throws Exception JavaDoc
279   {
280     String JavaDoc uniqueId = clusterObj.getUniqueId();
281
282     ClusterConnection conn = getConnection();
283     try {
284       PreparedStatement JavaDoc stmt = conn.prepareLoad();
285       stmt.setString(1, uniqueId);
286
287       ResultSet JavaDoc rs = stmt.executeQuery();
288       boolean validLoad = false;
289
290       if (rs.next()) {
291     //System.out.println("LOAD: " + uniqueId);
292
long accessTime = rs.getInt(1) * 60000L;
293     
294         InputStream JavaDoc is = rs.getBinaryStream(2);
295
296         if (log.isLoggable(Level.FINE))
297           log.fine("load local object: " + uniqueId);
298       
299         validLoad = clusterObj.load(is, obj);
300
301     if (validLoad)
302       clusterObj.setAccessTime(accessTime);
303
304         is.close();
305       }
306       else if (log.isLoggable(Level.FINE))
307         log.fine("no local object loaded for " + uniqueId);
308       else {
309     // System.out.println("NO-LOAD: " + uniqueId);
310
}
311
312       rs.close();
313
314       return validLoad;
315     } finally {
316       conn.close();
317     }
318   }
319
320   /**
321    * Updates the object's access time.
322    *
323    * @param obj the object to store.
324    */

325   public void updateAccess(String JavaDoc uniqueId)
326     throws Exception JavaDoc
327   {
328     ClusterConnection conn = getConnection();
329     
330     try {
331       PreparedStatement JavaDoc stmt = conn.prepareAccess();
332
333       long now = Alarm.getCurrentTime();
334       int nowMinutes = (int) (now / 60000L);
335       stmt.setInt(1, nowMinutes);
336       stmt.setString(2, uniqueId);
337
338       int count = stmt.executeUpdate();
339
340       if (count > 0) {
341     if (log.isLoggable(Level.FINE))
342       log.fine("access cluster: " + uniqueId);
343     return;
344       }
345     } finally {
346       conn.close();
347     }
348   }
349
350   /**
351    * Sets the object's expire_interval.
352    *
353    * @param obj the object to store.
354    */

355   public void setExpireInterval(String JavaDoc uniqueId, long expireInterval)
356     throws Exception JavaDoc
357   {
358     ClusterConnection conn = getConnection();
359     
360     try {
361       PreparedStatement JavaDoc stmt = conn.prepareSetExpireInterval();
362
363       int expireMinutes = (int) (expireInterval / 60000L);
364       stmt.setInt(1, expireMinutes);
365       stmt.setString(2, uniqueId);
366
367       int count = stmt.executeUpdate();
368
369       if (count > 0) {
370     if (log.isLoggable(Level.FINE))
371       log.fine("set expire interval: " + uniqueId + " " + expireInterval);
372     return;
373       }
374     } finally {
375       conn.close();
376     }
377   }
378
379   /**
380    * Removes the named object from the store.
381    */

382   public void remove(String JavaDoc uniqueId)
383     throws Exception JavaDoc
384   {
385     ClusterConnection conn = getConnection();
386     
387     try {
388       PreparedStatement JavaDoc pstmt = conn.prepareInvalidate();
389       pstmt.setString(1, uniqueId);
390
391       int count = pstmt.executeUpdate();
392       
393       if (log.isLoggable(Level.FINE))
394         log.fine("invalidate: " + uniqueId);
395     } finally {
396       conn.close();
397     }
398   }
399
400   /**
401    * Reads from the store.
402    */

403   public long read(String JavaDoc uniqueId, WriteStream os)
404     throws IOException JavaDoc
405   {
406     Connection JavaDoc conn = null;
407     try {
408       conn = _dataSource.getConnection();
409
410       PreparedStatement JavaDoc pstmt = conn.prepareStatement(_loadQuery);
411       pstmt.setString(1, uniqueId);
412
413       ResultSet JavaDoc rs = pstmt.executeQuery();
414       if (rs.next()) {
415     long accessTime = rs.getInt(1) * 60000L;
416     
417     InputStream JavaDoc is = rs.getBinaryStream(2);
418
419     os.writeStream(is);
420
421     is.close();
422
423     return accessTime;
424       }
425     } catch (SQLException JavaDoc e) {
426       log.log(Level.FINE, e.toString(), e);
427     } finally {
428       try {
429     if (conn != null)
430       conn.close();
431       } catch (SQLException JavaDoc e) {
432       }
433     }
434
435     return -1;
436   }
437
438   /**
439    * Stores the cluster object on the local store.
440    *
441    * @param uniqueId the object's unique id.
442    * @param id the input stream to the serialized object
443    * @param length the length object the serialized object
444    * @param expireInterval how long the object lives w/o access
445    */

446   public void storeSelf(String JavaDoc uniqueId,
447             ReadStream is, int length,
448             long expireInterval,
449             int primary, int secondary, int tertiary)
450   {
451     ClusterConnection conn = null;
452
453     try {
454       conn = getConnection();
455       // Try to update first, and insert if fail.
456
// The binary stream can be reused because it won't actually be
457
// read on a failure
458

459       //System.out.println("SAVE: " + uniqueId);
460
if (storeSelfUpdate(conn, uniqueId, is, length)) {
461     //System.out.println("SAVE-UPDATE: " + uniqueId);
462
}
463       else if (storeSelfInsert(conn, uniqueId, is, length, expireInterval,
464                    primary, secondary, tertiary)) {
465     //System.out.println("SAVE-INSERT: " + uniqueId);
466
}
467       else if (storeSelfUpdate(conn, uniqueId, is, length)) {
468     // The second update is for the rare case where
469
// two threads try to update the database simultaneously
470
}
471       else {
472     log.warning(L.l("Can't store session {0}", uniqueId));
473       }
474     } catch (SQLException JavaDoc e) {
475       e.printStackTrace();
476       log.log(Level.FINE, e.toString(), e);
477     } finally {
478       if (conn != null)
479     conn.close();
480     }
481   }
482   
483   /**
484    * Stores the cluster object on the local store using an update query.
485    *
486    * @param conn the database connection
487    * @param uniqueId the object's unique id.
488    * @param id the input stream to the serialized object
489    * @param length the length object the serialized object
490    */

491   private boolean storeSelfUpdate(ClusterConnection conn, String JavaDoc uniqueId,
492                   ReadStream is, int length)
493   {
494     try {
495       PreparedStatement JavaDoc stmt = conn.prepareUpdate();
496       stmt.setBinaryStream(1, is, length);
497
498       long now = Alarm.getCurrentTime();
499       int nowMinutes = (int) (now / 60000L);
500       stmt.setInt(2, nowMinutes);
501       stmt.setInt(3, nowMinutes);
502       stmt.setString(4, uniqueId);
503
504       int count = stmt.executeUpdate();
505         
506       if (count > 0) {
507     if (log.isLoggable(Level.FINE))
508       log.fine("update cluster: " + uniqueId + " length:" + length);
509       
510     return true;
511       }
512     } catch (SQLException JavaDoc e) {
513       log.log(Level.WARNING, e.toString(), e);
514     }
515
516     return false;
517   }
518   
519   private boolean storeSelfInsert(ClusterConnection conn, String JavaDoc uniqueId,
520                   ReadStream is, int length,
521                   long expireInterval,
522                   int primary, int secondary, int tertiary)
523   {
524     try {
525       PreparedStatement JavaDoc stmt = conn.prepareInsert();
526         
527       stmt.setString(1, uniqueId);
528
529       stmt.setBinaryStream(2, is, length);
530       
531       int nowMinutes = (int) (Alarm.getCurrentTime() / 60000L);
532       
533       stmt.setInt(3, nowMinutes);
534       stmt.setInt(4, nowMinutes);
535       stmt.setInt(5, (int) (expireInterval / 60000L));
536
537       stmt.setInt(6, primary);
538       stmt.setInt(7, secondary);
539       stmt.setInt(8, tertiary);
540
541       stmt.executeUpdate();
542         
543       if (log.isLoggable(Level.FINE))
544     log.fine("insert cluster: " + uniqueId + " length:" + length);
545
546       return true;
547     } catch (SQLException JavaDoc e) {
548       System.out.print(e);
549       
550       log.log(Level.FINE, e.toString(), e);
551     }
552
553     return false;
554   }
555
556   //
557
// statistics
558
//
559

560   public long getObjectCount()
561     throws SQLException JavaDoc
562   {
563     ClusterConnection conn = getConnection();
564     
565     try {
566       PreparedStatement JavaDoc stmt = conn.prepareCount();
567       
568       ResultSet JavaDoc rs = stmt.executeQuery();
569
570       if (rs != null && rs.next()) {
571     long value = rs.getLong(1);
572     rs.close();
573     return value;
574       }
575       
576       return -1;
577     } catch (SQLException JavaDoc e) {
578       e.printStackTrace();
579       log.log(Level.FINE, e.toString(), e);
580     } finally {
581       conn.close();
582     }
583     
584     return -1;
585   }
586
587   public void destroy()
588   {
589     _dataSource = null;
590     _freeConn = null;
591   }
592
593   private ClusterConnection getConnection()
594     throws SQLException JavaDoc
595   {
596     ClusterConnection cConn = _freeConn.allocate();
597
598     if (cConn == null) {
599       Connection JavaDoc conn = _dataSource.getConnection();
600       cConn = new ClusterConnection(conn);
601     }
602
603     return cConn;
604   }
605
606   public String JavaDoc serverNameToTableName(String JavaDoc serverName)
607   {
608     if (serverName == null)
609       return "srun";
610     
611     StringBuilder JavaDoc cb = new StringBuilder JavaDoc();
612     cb.append("srun_");
613     
614     for (int i = 0; i < serverName.length(); i++) {
615       char ch = serverName.charAt(i);
616
617       if ('a' <= ch && ch <= 'z') {
618     cb.append(ch);
619       }
620       else if ('A' <= ch && ch <= 'Z') {
621     cb.append(ch);
622       }
623       else if ('0' <= ch && ch <= '9') {
624     cb.append(ch);
625       }
626       else if (ch == '_') {
627     cb.append(ch);
628       }
629       else
630     cb.append('_');
631     }
632
633     return cb.toString();
634   }
635
636   public String JavaDoc toString()
637   {
638     return "ClusterStore[" + _name + "]";
639   }
640
641   class ClusterConnection {
642     private Connection JavaDoc _conn;
643     
644     private PreparedStatement JavaDoc _loadStatement;
645     private PreparedStatement JavaDoc _updateStatement;
646     private PreparedStatement JavaDoc _insertStatement;
647     private PreparedStatement JavaDoc _accessStatement;
648     private PreparedStatement JavaDoc _setExpiresStatement;
649     private PreparedStatement JavaDoc _invalidateStatement;
650     private PreparedStatement JavaDoc _timeoutStatement;
651     private PreparedStatement JavaDoc _countStatement;
652
653     ClusterConnection(Connection JavaDoc conn)
654     {
655       _conn = conn;
656     }
657
658     PreparedStatement JavaDoc prepareLoad()
659       throws SQLException JavaDoc
660     {
661       if (_loadStatement == null)
662     _loadStatement = _conn.prepareStatement(_loadQuery);
663
664       return _loadStatement;
665     }
666
667     PreparedStatement JavaDoc prepareUpdate()
668       throws SQLException JavaDoc
669     {
670       if (_updateStatement == null)
671     _updateStatement = _conn.prepareStatement(_updateQuery);
672
673       return _updateStatement;
674     }
675
676     PreparedStatement JavaDoc prepareInsert()
677       throws SQLException JavaDoc
678     {
679       if (_insertStatement == null)
680     _insertStatement = _conn.prepareStatement(_insertQuery);
681
682       return _insertStatement;
683     }
684
685     PreparedStatement JavaDoc prepareAccess()
686       throws SQLException JavaDoc
687     {
688       if (_accessStatement == null)
689     _accessStatement = _conn.prepareStatement(_accessQuery);
690
691       return _accessStatement;
692     }
693
694     PreparedStatement JavaDoc prepareSetExpireInterval()
695       throws SQLException JavaDoc
696     {
697       if (_setExpiresStatement == null)
698     _setExpiresStatement = _conn.prepareStatement(_setExpiresQuery);
699
700       return _setExpiresStatement;
701     }
702
703     PreparedStatement JavaDoc prepareInvalidate()
704       throws SQLException JavaDoc
705     {
706       if (_invalidateStatement == null)
707     _invalidateStatement = _conn.prepareStatement(_invalidateQuery);
708
709       return _invalidateStatement;
710     }
711
712     PreparedStatement JavaDoc prepareCount()
713       throws SQLException JavaDoc
714     {
715       if (_countStatement == null)
716     _countStatement = _conn.prepareStatement(_countQuery);
717
718       return _countStatement;
719     }
720
721     void close()
722     {
723       _freeConn.free(this);
724     }
725   }
726 }
727
Popular Tags