KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > core > persistent > db > DBPersistencyProxy


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Uri Schneider.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46
47 package org.mr.core.persistent.db;
48
49 import java.io.ByteArrayInputStream JavaDoc;
50 import java.io.IOException JavaDoc;
51 import java.io.InputStream JavaDoc;
52 import java.nio.ByteBuffer JavaDoc;
53 import java.sql.Blob JavaDoc;
54 import java.sql.Connection JavaDoc;
55 import java.sql.DriverManager JavaDoc;
56 import java.sql.PreparedStatement JavaDoc;
57 import java.sql.ResultSet JavaDoc;
58 import java.sql.SQLException JavaDoc;
59 import java.sql.Statement JavaDoc;
60 import java.util.Set JavaDoc;
61 import java.util.HashSet JavaDoc;
62 import java.util.ArrayList JavaDoc;
63 import java.util.Iterator JavaDoc;
64
65 import org.apache.commons.logging.Log;
66 import org.apache.commons.logging.LogFactory;
67 import org.mr.MantaAgent;
68 import org.mr.kernel.services.topics.VirtualTopicManager;
69 import org.mr.core.configuration.ConfigManager;
70 import org.mr.core.persistent.PersistentConst;
71 import org.mr.core.persistent.PersistentManager;
72 import org.mr.core.util.byteable.ByteBufferFactory;
73 import org.mr.core.util.byteable.Byteable;
74 import org.mr.core.util.byteable.ByteableInputStream;
75 import org.mr.core.util.byteable.ByteableOutputStream;
76 import org.mr.core.util.StringUtils;
77
78 /**
79  * DBPersistencyProxy.java
80  *
81  *
82  * Created: Mon Mar 21 15:51:33 2005
83  *
84  * @author Uri Schneider
85  * @version 1.0
86  */

87 public class DBPersistencyProxy {
88     private static DBPersistencyProxy instance = null;
89
90     private Connection JavaDoc connection;
91     private Log log;
92     private String JavaDoc lookupTable;
93     private String JavaDoc dataTable;
94     private String JavaDoc tableSuffix;
95     private ByteBufferFactory pool;
96     private ByteableInputStream bistream;
97
98     private String JavaDoc dbUser;
99     private String JavaDoc dbPasswd;
100     private String JavaDoc dbURL;
101
102     // statements
103
private PreparedStatement JavaDoc stmtGetID;
104     private PreparedStatement JavaDoc stmtKeyCount;
105     private PreparedStatement JavaDoc stmtGetKeys;
106     private PreparedStatement JavaDoc stmtGetValue;
107     private PreparedStatement JavaDoc stmtInsertID;
108     private PreparedStatement JavaDoc stmtDeleteValue;
109     private PreparedStatement JavaDoc stmtSaveValue;
110     private PreparedStatement JavaDoc stmtDeleteAll;
111
112     private PreparedStatement JavaDoc stmtGetAllServices;
113
114     private DBPersistencyProxy() {
115         this.log = LogFactory.getLog("DBPersistencyProxy");
116         this.pool = PersistentConst.getPersistentByteBufferPool();
117         this.bistream = new ByteableInputStream();
118
119         ConfigManager config = MantaAgent.getInstance().getSingletonRepository().getConfigManager();
120         //String provider;
121
String JavaDoc dbDriver;
122
123         // check configuration
124
//provider = config.getStringProperty("persistency.db.provider",
125
// "generic");
126
dbUser = config.getStringProperty("persistency.db.user", "manta");
127         dbPasswd = config.getStringProperty("persistency.db.password", "manta");
128         dbDriver = config.getStringProperty("persistency.db.driver", "com.mysql.jdbc.Driver");
129         dbURL = null;
130
131         if (dbDriver == null) {
132             if (this.log.isErrorEnabled()) {
133                 this.log.error("Persistency DB Driver is not set (config " +
134                                "option persistency.db.driver)");
135                 this.log.error("DBPersistencyProxy is disabled.");
136             }
137             return;
138         }
139
140         //if (provider.equals("generic")) {
141
dbURL = config.getStringProperty("persistency.db.url",
142                                              "jdbc:mysql://localhost/");
143             if (dbURL == null) {
144                 if (this.log.isErrorEnabled()) {
145                     this.log.error("Persistency DB URL is not set " +
146                                "(config option persistency.db.url)");
147                     /*this.log.error("Persistency DB provider was set to " +
148                                         "generic, but no URL was provided " +
149                                         "(config option persistency.db.url)");*/

150                     this.log.error("DBPersistencyProxy is disabled.");
151                 }
152                 return;
153             }
154         /*} else {
155               if (this.log.isErrorEnabled()) {
156                   this.log.error("Unknown DB provider " + provider);
157                   this.log.error("DBPersistencyProxy is disabled.");
158               }
159               return;
160           }*/

161
162         // load driver
163
try {
164             Class.forName(dbDriver);
165         } catch (Exception JavaDoc e) {
166             if (this.log.isErrorEnabled()) {
167                 this.log.error("Error loading DB driver: " + e.getMessage());
168                 this.log.error("DBPersistencyProxy is disabled.");
169             }
170             return;
171         }
172
173         try {
174             this.connection = DriverManager.getConnection(dbURL, dbUser,
175                                                           dbPasswd);
176         } catch (SQLException JavaDoc e) {
177             if (this.log.isErrorEnabled()) {
178                 this.log.error("Error connecting to DB (" + dbURL + "): " +
179                                e.getMessage());
180                 this.log.error("DBPersistencyProxy is disabled.");
181             }
182             return;
183         }
184
185             MantaAgent agent = MantaAgent.getInstance();
186             String JavaDoc defaultVal = agent.getAgentName()+"_"+agent.getDomainName();
187
188             // . is illegal in SQL idenitifiers
189
defaultVal = defaultVal.replace('.', '_');
190
191             tableSuffix =
192                 config.getStringProperty("persistency.db.table_suffix",
193                                          defaultVal);
194             tableSuffix = tableSuffix.trim();
195             if (tableSuffix.equals("")) {
196                 tableSuffix = defaultVal;
197             }
198         try {
199             ensureSchema(tableSuffix);
200         } catch (SQLException JavaDoc e) {
201             if (this.log.isErrorEnabled()) {
202                 this.log.error("Error creating persistency tables: " +
203                                e.getMessage());
204             }
205             this.log.error("DBPersistencyProxy is disabled.");
206             this.connection = null;
207         }
208         try {
209             preparePreparedStatements();
210         } catch (SQLException JavaDoc e) {
211             if (this.log.isErrorEnabled()) {
212                 this.log.error("Error creating statements: " +
213                                e.getMessage());
214             }
215             this.log.error("DBPersistencyProxy is disabled.");
216             this.connection = null;
217         }
218     } // DBPersistencyProxy constructor
219

220     private void ensureSchema(String JavaDoc tableSuffix) throws SQLException JavaDoc {
221         if (this.dbURL.startsWith("jdbc:mysql")) {
222                 ensureSchemaMySQL(tableSuffix);
223         } else if (this.dbURL.startsWith("jdbc:oracle")) {
224             ensureSchemaOracle(tableSuffix);
225         } else if (this.dbURL.startsWith("jdbc:sqlserver")) {
226             ensureSchemaSQLServer(tableSuffix);
227         } else {
228             if (this.log.isErrorEnabled()) {
229                 this.log.error("Unsupported DB: " + this.dbURL);
230             }
231         }
232     }
233
234     /**
235      * This function does not try to create a database
236      */

237     private void ensureSchemaOracle(String JavaDoc tableSuffix) throws SQLException JavaDoc {
238         Statement JavaDoc stmt = null;
239
240         this.lookupTable = "prsl_" + tableSuffix;
241         this.dataTable = "prsd_" + tableSuffix;
242
243         // check existense
244
try {
245             stmt = this.connection.createStatement();
246             stmt.executeQuery("SELECT count(*) FROM " + lookupTable);
247         } catch (SQLException JavaDoc e) {
248             if (e.getErrorCode() == 942) { // table or view does not exist
249
createSchemaOracle(stmt);
250             }
251         } finally {
252             if (stmt != null) {
253                 stmt.close();
254             }
255         }
256     }
257
258     private void createSchemaOracle(Statement JavaDoc stmt) throws SQLException JavaDoc {
259         stmt.executeUpdate("CREATE TABLE " + lookupTable +
260                            "(object_id INTEGER NOT NULL " +
261                            " UNIQUE," +
262                            " object_name VARCHAR2(4000)," +
263                            " PRIMARY KEY (object_name))");
264         stmt.executeUpdate("CREATE SEQUENCE " + lookupTable + "_seq");
265         stmt.executeUpdate("CREATE TRIGGER " + lookupTable + "_tri " +
266                            "BEFORE INSERT ON " + lookupTable + " FOR EACH " +
267                            "ROW BEGIN SELECT " + lookupTable +
268                            "_seq.nextval INTO :new.object_id FROM DUAL; " +
269                            "END;");
270         stmt.executeUpdate("CREATE TABLE " + dataTable +
271                            "(object_id INTEGER NOT NULL," +
272                            " keyx INTEGER NOT NULL," +
273                            " value BLOB," +
274                            " PRIMARY KEY (object_id, keyx)," +
275                            " FOREIGN KEY (object_id) REFERENCES " +
276                            lookupTable + "(object_id))");
277     }
278
279     /**
280      * This function does not try to create a database
281      */

282     private void ensureSchemaSQLServer(String JavaDoc tableSuffix)
283         throws SQLException JavaDoc
284     {
285         Statement JavaDoc stmt = null;
286
287         this.lookupTable = "prsl_" + tableSuffix;
288         this.dataTable = "prsd_" + tableSuffix;
289
290         // check existense
291
try {
292             stmt = this.connection.createStatement();
293             stmt.executeQuery("SELECT count(*) FROM " + lookupTable);
294         } catch (SQLException JavaDoc e) {
295             if (e.getErrorCode() == 208) { // Invalid object name
296
createSchemaSQLServer(stmt);
297             }
298         } finally {
299             if (stmt != null) {
300                 stmt.close();
301             }
302         }
303     }
304
305     private void createSchemaSQLServer(Statement JavaDoc stmt) throws SQLException JavaDoc {
306         stmt.executeUpdate("CREATE TABLE " + lookupTable +
307                            "(object_id INTEGER IDENTITY NOT NULL " +
308                            " UNIQUE," +
309                            " object_name VARCHAR(4000)," +
310                            " PRIMARY KEY (object_name))");
311         stmt.executeUpdate("CREATE TABLE " + dataTable +
312                            "(object_id INTEGER NOT NULL," +
313                            " keyx INTEGER NOT NULL," +
314                            " value IMAGE," +
315                            " PRIMARY KEY (object_id, keyx)," +
316                            " FOREIGN KEY (object_id) REFERENCES " +
317                            lookupTable + "(object_id))");
318     }
319
320     private void ensureSchemaMySQL(String JavaDoc tableSuffix) throws SQLException JavaDoc {
321         Statement JavaDoc stmt = this.connection.createStatement();
322
323         this.lookupTable = "manta.persist_lookup_" + tableSuffix;
324         this.dataTable = "manta.persist_data_" + tableSuffix;
325
326         stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS manta;");
327
328         stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + lookupTable +
329                            "(object_id INT UNSIGNED NOT NULL AUTO_INCREMENT " +
330                            " UNIQUE," +
331                            " object_name TEXT," +
332                            " KEY (object_name(32)));");
333         stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + dataTable +
334                            "(object_id INT UNSIGNED NOT NULL," +
335                            " keyx INT NOT NULL," +
336                            " value BLOB," +
337                            " KEY (object_id, keyx));");
338     }
339
340     public static synchronized DBPersistencyProxy getInstance() {
341         if (instance == null) {
342             instance = new DBPersistencyProxy();
343         }
344         return instance;
345     }
346
347     public synchronized int[] getKeys(String JavaDoc name) {
348         if (this.connection == null) {
349             return null;
350         }
351
352         int[] keys = null;
353         try {
354             int objectID = getObjectID(name, false);
355             int rowCount = 0;
356
357             if (objectID == -1) {
358                 return new int[0];
359             }
360             this.stmtKeyCount.setInt(1, objectID);
361             ResultSet JavaDoc results = this.stmtKeyCount.executeQuery();
362             if (results.next()) {
363                 rowCount = results.getInt(1);
364             }
365
366             keys = new int[rowCount];
367             this.stmtGetKeys.setInt(1, objectID);
368             results = this.stmtGetKeys.executeQuery();
369
370             int i = 0;
371             while (results.next() && i < keys.length) {
372                 keys[i++] = results.getInt(1);
373             }
374         } catch (SQLException JavaDoc e) {
375             if (this.log.isErrorEnabled()) {
376                 this.log.error("Error in getKeys(" + name + "): " +
377                                e.getMessage() + " Trying to reconnect.");
378             }
379
380             // try to reconnect and redo the operation
381
if (ensureConnection()) {
382                 return getKeys(name);
383             }
384         }
385         return keys;
386     }
387
388     public synchronized Object JavaDoc getPersistentObject(String JavaDoc name, int i) {
389         if (this.connection == null) {
390             return null;
391         }
392
393         Byteable result = null;
394         try {
395             ByteBuffer JavaDoc buffer = getPersistentBuffer(name, i);
396             this.bistream.setUnderLine(buffer);
397             result = this.bistream.readByteable();
398             this.pool.release(buffer);
399         } catch (Exception JavaDoc e) {
400             if(log.isErrorEnabled()) {
401                 log.error("Could not recover " + name + "/" + i + ": " +
402                               e.getMessage() + " Trying to reconnect.");
403             }
404             // try to reconnect and redo the operation
405
if (ensureConnection()) {
406                 return getPersistentBuffer(name, i);
407             }
408         }
409         return result;
410     }
411
412     public synchronized ByteBuffer JavaDoc getPersistentBuffer(String JavaDoc name, int i) {
413         if (this.connection == null) {
414             return null;
415         }
416
417         try {
418             ResultSet JavaDoc results;
419             int objectID = getObjectID(name, false);
420
421             this.stmtGetValue.setInt(1, objectID);
422             this.stmtGetValue.setInt(2, i);
423             results = this.stmtGetValue.executeQuery();
424
425             if (results.next()) {
426                 Blob JavaDoc blob = results.getBlob(1);
427                 int length = (int) blob.length();
428                 InputStream JavaDoc istream = blob.getBinaryStream();
429                 ByteBuffer JavaDoc buffer = this.pool.getBuffer(length);
430                 byte[] array = buffer.array();
431                 int offset = buffer.arrayOffset();
432                 int bytesRead = 0;
433
434                 try {
435                     while (bytesRead < length) {
436                         int chunk = istream.read(array, offset + bytesRead,
437                                                  length - bytesRead);
438                         if (chunk < 0) {
439                             if (this.log.isErrorEnabled()) {
440                                 this.log.error("Permature EOF when reading " +
441                                                name + "/" + i);
442                             }
443                             this.pool.release(buffer);
444                             buffer = null;
445                             break;
446                         }
447                         bytesRead += chunk;
448                     }
449                 } catch (IOException JavaDoc e) {
450                     if (this.log.isErrorEnabled()) {
451                         this.log.error("I/O error while reading " + name +
452                                        "/" + i + ": " + e.getMessage());
453                     }
454                     this.pool.release(buffer);
455                     buffer = null;
456                     return buffer;
457                 }
458
459                 buffer.limit(length);
460                 buffer.position(0);
461                 return buffer;
462             } else {
463                 if (this.log.isErrorEnabled()) {
464                     this.log.error("Cannot find entry " + name + "/" + i);
465                 }
466                 return null;
467             }
468         } catch (SQLException JavaDoc e) {
469             if (this.log.isErrorEnabled()) {
470                 this.log.error("Error in getPersistentBuffer(" + name +
471                                "): " + e.getMessage() +
472                                " Trying to reconnect");
473             }
474             // try to reconnect and redo the operation
475
if (ensureConnection()) {
476                 return getPersistentBuffer(name, i);
477             } else {
478                 return null;
479             }
480         }
481     }
482
483     public synchronized void deletePersistentObject(String JavaDoc name, int i) {
484         if (this.connection == null) {
485             return;
486         }
487
488         try {
489             int objectID = getObjectID(name, false);
490             realDeletePersistentObject(objectID, i);
491         } catch (SQLException JavaDoc e) {
492             if (this.log.isErrorEnabled()) {
493                 this.log.error("Error in deletePersistentObject(" + name +
494                                "): " + e.getMessage() +
495                                " Trying to reconnect.");
496             }
497             // try to reconnect and redo the operation
498
if (ensureConnection()) {
499                 deletePersistentObject(name, i);
500             }
501         }
502     }
503
504     public synchronized void savePersistentBuffer(String JavaDoc name, int i,
505                                                   ByteBuffer JavaDoc buffer)
506         throws IOException JavaDoc
507     {
508         if (this.connection == null) {
509             return;
510         }
511         try {
512             int objectID = getObjectID(name, true);
513             InputStream JavaDoc istream =
514                 new ByteArrayInputStream JavaDoc(buffer.array(), buffer.arrayOffset(),
515                                          buffer.remaining());
516
517             realDeletePersistentObject(objectID, i);
518             this.stmtSaveValue.setInt(1, objectID);
519             this.stmtSaveValue.setInt(2, i);
520             this.stmtSaveValue.setBinaryStream(3, istream, buffer.remaining());
521             this.stmtSaveValue.executeUpdate();
522         } catch (SQLException JavaDoc e) {
523             if (this.log.isErrorEnabled()) {
524                 this.log.error("Error in savePersistentBuffer(" + name +
525                                "): " + e.getMessage() +
526                                " Trying to reconnect.");
527             }
528             // try to reconnect and redo the operation
529
if (ensureConnection()) {
530                 savePersistentBuffer(name, i, buffer);
531             } else {
532                 if (this.log.isErrorEnabled()) {
533                     this.log.error("Error in savePersistentBuffer(" + name +
534                                    "): " + e.getMessage());
535                 }
536                 throw new IOException JavaDoc(e.getMessage());
537             }
538         }
539     }
540
541     public synchronized void savePersistentObject(String JavaDoc name, int key,
542                                                   Byteable object)
543         throws IOException JavaDoc
544     {
545         if (this.connection == null) {
546             return;
547         }
548
549         try {
550             ByteableOutputStream ostream = new ByteableOutputStream(this.pool);
551             ostream.writeByteable(object);
552             savePersistentBuffer(name, key, ostream.getByteBuffer());
553             ostream.release();
554         } catch (IOException JavaDoc e) {
555             if (this.log.isErrorEnabled()) {
556                 this.log.error("Error in savePersistentBuffer(" + name +
557                                "): " + e.getMessage() +
558                                " Trying to reconnect.");
559             }
560             // try to reconnect and redo the operation
561
if (ensureConnection()) {
562                 savePersistentObject(name, key, object);
563             }
564         }
565     }
566
567     public synchronized void clearStorage(String JavaDoc name) throws IOException JavaDoc {
568         if (this.connection == null) {
569             return;
570         }
571
572         try {
573             int objectID = getObjectID(name, false);
574             this.stmtDeleteAll.setInt(1, objectID);
575             this.stmtDeleteAll.executeUpdate();
576         } catch (SQLException JavaDoc e) {
577             if (this.log.isErrorEnabled()) {
578                     this.log.error("Error in clearStorage(): " +
579                                    e.getMessage() + " Trying to reconnect.");
580                 }
581             // try to reconnect and redo the operation
582
if (ensureConnection()) {
583                 clearStorage(name);
584             } else {
585                 throw new IOException JavaDoc(e.getMessage());
586             }
587         }
588     }
589
590     public synchronized Set JavaDoc getNames() throws SQLException JavaDoc {
591         if (this.connection == null) {
592             return null;
593         }
594
595         Set JavaDoc names = new HashSet JavaDoc();
596         StringBuffer JavaDoc sql = new StringBuffer JavaDoc();
597         Statement JavaDoc stmt = null;
598
599         try {
600             sql.append("SELECT object_name FROM ").append(lookupTable);
601             stmt = this.connection.createStatement();
602             ResultSet JavaDoc results = stmt.executeQuery(sql.toString());
603             while (results.next()) {
604                 String JavaDoc name = results.getString(1);
605                 names.add(name);
606             }
607
608             return names;
609         } finally {
610             if (stmt != null) {
611                 stmt.close();
612             }
613         }
614     }
615
616     private int getObjectID(String JavaDoc objectName, boolean create)
617         throws SQLException JavaDoc
618     {
619         ResultSet JavaDoc results;
620
621         this.stmtGetID.setString(1, objectName);
622         results = this.stmtGetID.executeQuery();
623         if (results.next()) {
624             return results.getInt(1);
625         } else if (create) {
626             newObjectID(objectName);
627             return getObjectID(objectName, false);
628         } else {
629             return -1;
630         }
631     }
632
633     private void newObjectID(String JavaDoc objectName) throws SQLException JavaDoc {
634         this.stmtInsertID.setString(1, objectName);
635         this.stmtInsertID.executeUpdate();
636     }
637
638     private void realDeletePersistentObject(int objectID, int i)
639         throws SQLException JavaDoc
640     {
641         this.stmtDeleteValue.setInt(1, objectID);
642         this.stmtDeleteValue.setInt(2, i);
643         this.stmtDeleteValue.executeUpdate();
644     }
645
646     private void preparePreparedStatements() throws SQLException JavaDoc {
647         this.stmtGetID =
648             connection.prepareStatement("SELECT object_id FROM " +
649                                         this.lookupTable +
650                                         " WHERE object_name = ?");
651         this.stmtKeyCount =
652             connection.prepareStatement("SELECT count(*) FROM " +
653                                         this.dataTable +
654                                         " WHERE object_id = ?");
655         this.stmtGetKeys =
656             connection.prepareStatement("SELECT keyx FROM " + this.dataTable +
657                                         " WHERE object_id = ? ORDER BY keyx");
658         this.stmtGetValue =
659             connection.prepareStatement("SELECT value FROM " + this.dataTable +
660                                         " WHERE object_id = ? AND keyx = ?");
661         this.stmtInsertID =
662             connection.prepareStatement("INSERT INTO " + this.lookupTable +
663                                         " (object_name) VALUES (?)");
664         this.stmtDeleteValue =
665             connection.prepareStatement("DELETE FROM " + this.dataTable +
666                                         " WHERE object_id = ? AND keyx = ?");
667         this.stmtSaveValue =
668             connection.prepareStatement("INSERT INTO " + this.dataTable +
669                                         " (object_id, keyx, value) VALUES " +
670                                         "(?, ? ,?)");
671         this.stmtDeleteAll =
672             connection.prepareStatement("DELETE FROM " + this.dataTable +
673                                         " WHERE object_id = ?");
674
675         String JavaDoc lookupTable = "manta.persist_lookup_" + tableSuffix;
676         this.stmtGetAllServices =
677             connection.prepareStatement("SELECT object_name FROM " + lookupTable);
678     }
679
680     private boolean ensureConnection() {
681         final int MAX_CONNECT = 100;
682
683         // close the old connection, just in case
684
try {
685             this.connection.close();
686         } catch (SQLException JavaDoc e) {}
687
688         int counter;
689         long interval = 500;
690         if (this.log.isInfoEnabled()) {
691             this.log.info("Trying to reconnect to DB...");
692         }
693         for (counter = 1; counter <= MAX_CONNECT; counter++) {
694             try {
695                 this.connection =
696                     DriverManager.getConnection(dbURL, dbUser, dbPasswd);
697                 if (this.log.isInfoEnabled()) {
698                     this.log.info("DB reconnect succeeded (retries: " +
699                                   counter + ")");
700                 }
701                 preparePreparedStatements();
702                 return true;
703             } catch (SQLException JavaDoc e) {}
704             try {
705                 Thread.sleep(interval);
706             } catch (InterruptedException JavaDoc e) {}
707             interval = (long) (1.5 * interval);
708         }
709         if (this.log.isErrorEnabled()) {
710             this.log.error("DB reconnect FAILED (retries: " +
711                            counter + "). Persistency operation will FAIL.");
712         }
713
714         return false;
715     }
716
717     public String JavaDoc[] getAllServices() {
718         HashSet JavaDoc set = new HashSet JavaDoc();
719         try {
720             ResultSet JavaDoc results = this.stmtGetAllServices.executeQuery();
721             while (results.next()) {
722                 set.add(results.getString(1));
723             }
724
725         } catch (SQLException JavaDoc e) {
726             if (this.log.isErrorEnabled()) {
727                 log.error("failed to retrieve services",e);
728             }
729             return new String JavaDoc[0];
730
731         }
732         String JavaDoc delimiter = MantaAgent.getInstance().getSingletonRepository().
733                 getConfigManager().getStringProperty("persistency.hierarchy_delimiter","~");
734         Iterator JavaDoc itr = set.iterator();
735         HashSet JavaDoc ready = new HashSet JavaDoc();
736         while (itr.hasNext()) {
737             String JavaDoc tmp = (String JavaDoc) itr.next();
738             if (tmp.startsWith(PersistentManager.SUBSCRIBERS_PERSISTENT_PREFIX)){
739                 ready.add(StringUtils.replace(tmp.substring(PersistentManager.SUBSCRIBERS_PERSISTENT_PREFIX.length())
740                         ,delimiter, VirtualTopicManager.HIERARCHICAL_TOPIC_DELIMITER));
741             }
742         }
743         return (String JavaDoc[]) ready.toArray(new String JavaDoc[0]);
744     }
745 } // DBPersistencyProxy
746
Popular Tags