1 24 25 package org.objectweb.cjdbc.controller.loadbalancer.raidb1; 26 27 import java.sql.SQLException ; 28 import java.util.ArrayList ; 29 30 import org.objectweb.cjdbc.common.i18n.Translate; 31 import org.objectweb.cjdbc.common.log.Trace; 32 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 33 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 34 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread; 35 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy; 36 import org.objectweb.cjdbc.controller.loadbalancer.policies.errorchecking.ErrorCheckingPolicy; 37 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask; 38 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase; 39 40 50 public abstract class RAIDb1ec extends RAIDb1 51 { 52 56 57 protected ArrayList backendReadThreads; 58 protected int nbOfConcurrentReads; 59 protected ErrorCheckingPolicy errorCheckingPolicy; 60 61 protected static Trace logger = Trace 62 .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb1ec"); 63 64 67 68 79 public RAIDb1ec(VirtualDatabase vdb, 80 WaitForCompletionPolicy waitForCompletionPolicy, 81 ErrorCheckingPolicy errorCheckingPolicy, int nbOfConcurrentReads) 82 throws Exception 83 { 84 super(vdb, waitForCompletionPolicy); 85 backendReadThreads = new ArrayList (); 86 this.errorCheckingPolicy = errorCheckingPolicy; 87 this.nbOfConcurrentReads = nbOfConcurrentReads; 88 } 89 90 93 94 106 public void enableBackend(DatabaseBackend db, boolean writeEnabled) 107 throws SQLException 108 { 109 BackendWorkerThread blockingThread = new BackendWorkerThread(db, this); 111 BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this); 112 113 try 115 { 116 backendBlockingThreadsRWLock.acquireWrite(); 117 } 118 catch (InterruptedException e) 119 { 120 String msg = Translate.get( 121 "loadbalancer.backendlist.acquire.writelock.failed", e); 122 logger.error(msg); 123 throw new SQLException (msg); 124 } 125 backendBlockingThreads.add(blockingThread); 126 backendBlockingThreadsRWLock.releaseWrite(); 127 blockingThread.start(); 128 logger.info(Translate.get("loadbalancer.backend.workerthread.blocking.add", 129 db.getName())); 130 131 try 133 { 134 backendNonBlockingThreadsRWLock.acquireWrite(); 135 } 136 catch (InterruptedException e) 137 { 138 String msg = Translate.get( 139 "loadbalancer.backendlist.acquire.writelock.failed", e); 140 logger.error(msg); 141 throw new SQLException (msg); 142 } 143 backendNonBlockingThreads.add(nonBlockingThread); 144 backendNonBlockingThreadsRWLock.releaseWrite(); 145 nonBlockingThread.start(); 146 logger.info(Translate.get( 147 "loadbalancer.backend.workerthread.non.blocking.add", db.getName())); 148 149 if (!db.isInitialized()) 150 db.initializeConnections(); 151 db.enableRead(); 152 if (writeEnabled) 153 db.enableWrite(); 154 } 155 156 167 public synchronized void disableBackend(DatabaseBackend db) 168 throws SQLException 169 { 170 int nbOfThreads = backendBlockingThreads.size(); 171 172 for (int i = 0; i < nbOfThreads; i++) 174 { 175 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 176 .get(i); 177 if (thread.getBackend().equals(db)) 178 { 179 logger.info(Translate.get( 180 "loadbalancer.backend.workerthread.blocking.remove", db.getName())); 181 182 try 184 { 185 backendBlockingThreadsRWLock.acquireWrite(); 186 } 187 catch (InterruptedException e) 188 { 189 String msg = Translate.get( 190 "loadbalancer.backendlist.acquire.writelock.failed", e); 191 logger.error(msg); 192 throw new SQLException (msg); 193 } 194 backendBlockingThreads.remove(thread); 195 backendBlockingThreadsRWLock.releaseWrite(); 196 197 synchronized (thread) 198 { 199 thread.addPriorityTask(new KillThreadTask(1, 1)); 201 thread.notify(); 202 } 203 break; 204 } 205 } 206 207 nbOfThreads = backendNonBlockingThreads.size(); 209 for (int i = 0; i < nbOfThreads; i++) 210 { 211 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 212 .get(i); 213 if (thread.getBackend().equals(db)) 214 { 215 logger.info(Translate.get( 216 "loadbalancer.backend.workerthread.non.blocking.remove", db 217 .getName())); 218 219 try 221 { 222 backendNonBlockingThreadsRWLock.acquireWrite(); 223 } 224 catch (InterruptedException e) 225 { 226 String msg = Translate.get( 227 "loadbalancer.backendlist.acquire.writelock.failed", e); 228 logger.error(msg); 229 throw new SQLException (msg); 230 } 231 backendNonBlockingThreads.remove(thread); 232 backendNonBlockingThreadsRWLock.releaseWrite(); 233 234 synchronized (thread) 235 { 236 thread.addPriorityTask(new KillThreadTask(1, 1)); 238 thread.notify(); 239 } 240 break; 241 } 242 } 243 244 db.disable(); 245 if (db.isInitialized()) 246 db.finalizeConnections(); 247 } 248 249 252 public String getXmlImpl() 253 { 254 StringBuffer info = new StringBuffer (); 255 info.append("<" + DatabasesXmlTags.ELT_RAIDb_1ec + " " 256 + DatabasesXmlTags.ATT_nbOfConcurrentReads + "=\"" 257 + this.nbOfConcurrentReads + "\">"); 258 this.getRaidb1Xml(); 259 if (waitForCompletionPolicy != null) 260 info.append(waitForCompletionPolicy.getXml()); 261 info.append("</" + DatabasesXmlTags.ELT_RAIDb_1ec + ">"); 262 return info.toString(); 263 } 264 } | Popular Tags |