1 24 25 package org.objectweb.cjdbc.controller.loadbalancer.raidb2; 26 27 import java.sql.SQLException ; 28 import java.util.ArrayList ; 29 30 import org.objectweb.cjdbc.common.i18n.Translate; 31 import org.objectweb.cjdbc.common.log.Trace; 32 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 33 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 34 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread; 35 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy; 36 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy; 37 import org.objectweb.cjdbc.controller.loadbalancer.policies.errorchecking.ErrorCheckingPolicy; 38 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask; 39 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase; 40 41 51 public abstract class RAIDb2ec extends RAIDb2 52 { 53 57 58 protected ArrayList backendReadThreads; 59 protected int nbOfConcurrentReads; 60 protected ErrorCheckingPolicy errorCheckingPolicy; 61 62 protected static Trace logger = Trace 63 .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb2ec"); 64 65 68 69 82 public RAIDb2ec(VirtualDatabase vdb, 83 WaitForCompletionPolicy waitForCompletionPolicy, 84 CreateTablePolicy createTablePolicy, 85 ErrorCheckingPolicy errorCheckingPolicy, int nbOfConcurrentReads) 86 throws Exception 87 { 88 super(vdb, waitForCompletionPolicy, createTablePolicy); 89 backendReadThreads = new ArrayList (); 90 this.errorCheckingPolicy = errorCheckingPolicy; 91 this.nbOfConcurrentReads = nbOfConcurrentReads; 92 } 93 94 97 98 110 public void enableBackend(DatabaseBackend db, boolean writeEnabled) 111 throws SQLException 112 { 113 BackendWorkerThread blockingThread = new BackendWorkerThread(db, this); 115 BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this); 116 117 try 119 { 120 backendBlockingThreadsRWLock.acquireWrite(); 121 } 122 catch (InterruptedException e) 123 { 124 String msg = Translate.get( 125 "loadbalancer.backendlist.acquire.writelock.failed", e); 126 logger.error(msg); 127 throw new SQLException (msg); 128 } 129 backendBlockingThreads.add(blockingThread); 130 backendBlockingThreadsRWLock.releaseWrite(); 131 blockingThread.start(); 132 logger.info(Translate.get("loadbalancer.backend.workerthread.blocking.add", 133 db.getName())); 134 135 try 137 { 138 backendNonBlockingThreadsRWLock.acquireWrite(); 139 } 140 catch (InterruptedException e) 141 { 142 String msg = Translate.get( 143 "loadbalancer.backendlist.acquire.writelock.failed", e); 144 logger.error(msg); 145 throw new SQLException (msg); 146 } 147 backendNonBlockingThreads.add(nonBlockingThread); 148 backendNonBlockingThreadsRWLock.releaseWrite(); 149 nonBlockingThread.start(); 150 logger.info(Translate.get( 151 "loadbalancer.backend.workerthread.non.blocking.add", db.getName())); 152 153 if (!db.isInitialized()) 154 db.initializeConnections(); 155 db.enableRead(); 156 if (writeEnabled) 157 db.enableWrite(); 158 } 159 160 171 public synchronized void disableBackend(DatabaseBackend db) 172 throws SQLException 173 { 174 int nbOfThreads = backendBlockingThreads.size(); 175 176 for (int i = 0; i < nbOfThreads; i++) 178 { 179 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 180 .get(i); 181 if (thread.getBackend().equals(db)) 182 { 183 logger.info(Translate.get( 184 "loadbalancer.backend.workerthread.blocking.remove", db.getName())); 185 186 try 188 { 189 backendBlockingThreadsRWLock.acquireWrite(); 190 } 191 catch (InterruptedException e) 192 { 193 String msg = Translate.get( 194 "loadbalancer.backendlist.acquire.writelock.failed", e); 195 logger.error(msg); 196 throw new SQLException (msg); 197 } 198 backendBlockingThreads.remove(thread); 199 backendBlockingThreadsRWLock.releaseWrite(); 200 201 synchronized (thread) 202 { 203 thread.addPriorityTask(new KillThreadTask(1, 1)); 205 thread.notify(); 206 } 207 break; 208 } 209 } 210 211 nbOfThreads = backendNonBlockingThreads.size(); 213 for (int i = 0; i < nbOfThreads; i++) 214 { 215 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 216 .get(i); 217 if (thread.getBackend().equals(db)) 218 { 219 logger.info(Translate.get( 220 "loadbalancer.backend.workerthread.non.blocking.remove", db 221 .getName())); 222 223 try 225 { 226 backendNonBlockingThreadsRWLock.acquireWrite(); 227 } 228 catch (InterruptedException e) 229 { 230 String msg = Translate.get( 231 "loadbalancer.backendlist.acquire.writelock.failed", e); 232 logger.error(msg); 233 throw new SQLException (msg); 234 } 235 backendNonBlockingThreads.remove(thread); 236 backendNonBlockingThreadsRWLock.releaseWrite(); 237 238 synchronized (thread) 239 { 240 thread.addPriorityTask(new KillThreadTask(1, 1)); 242 thread.notify(); 243 } 244 break; 245 } 246 } 247 248 db.disable(); 249 if (db.isInitialized()) 250 db.finalizeConnections(); 251 } 252 253 256 public String getXmlImpl() 257 { 258 StringBuffer info = new StringBuffer (); 259 info.append("<" + DatabasesXmlTags.ELT_RAIDb_2ec + " " 260 + DatabasesXmlTags.ATT_nbOfConcurrentReads + "=\"" 261 + this.nbOfConcurrentReads + "\">"); 262 this.getRaidb2Xml(); 263 if (waitForCompletionPolicy != null) 264 info.append(waitForCompletionPolicy.getXml()); 265 info.append("</" + DatabasesXmlTags.ELT_RAIDb_2ec + ">"); 266 return info.toString(); 267 } 268 } | Popular Tags |