1 22 23 package org.continuent.sequoia.controller.loadbalancer; 24 25 import java.sql.SQLException ; 26 import java.sql.Statement ; 27 28 import org.continuent.sequoia.common.i18n.Translate; 29 import org.continuent.sequoia.common.log.Trace; 30 import org.continuent.sequoia.controller.backend.DatabaseBackend; 31 32 38 public class BackendWorkerThread extends Thread 39 { 40 48 private AbstractLoadBalancer loadBalancer; 49 private DatabaseBackend backend; 50 private boolean isKilled = false; 51 52 private Trace logger = null; 53 private BackendTaskQueues queues; 54 private Statement currentStatement; 55 56 57 private boolean playingCommitRollbackOnly = false; 58 59 62 63 69 public BackendWorkerThread(DatabaseBackend backend, 70 AbstractLoadBalancer loadBalancer) 71 { 72 this(loadBalancer.vdb.getVirtualDatabaseName() 73 + " - BackendWorkerThread for backend '" + backend.getName() 74 + "' with RAIDb level:" + loadBalancer.getRAIDbLevel(), backend, 75 loadBalancer); 76 } 77 78 85 public BackendWorkerThread(String name, DatabaseBackend backend, 86 AbstractLoadBalancer loadBalancer) 87 { 88 super(name); 89 if (backend == null) 91 { 92 String msg = Translate.get("backendworkerthread.null.backend"); 93 logger = Trace 94 .getLogger("org.continuent.sequoia.controller.backend.DatabaseBackend"); 95 logger.error(msg); 96 throw new NullPointerException (msg); 97 } 98 99 logger = Trace 100 .getLogger("org.continuent.sequoia.controller.backend.DatabaseBackend." 101 + backend.getName()); 102 103 if (loadBalancer == null) 104 { 105 String msg = Translate.get("backendworkerthread.null.loadbalancer"); 106 logger.error(msg); 107 throw new NullPointerException (msg); 108 } 109 110 this.backend = backend; 111 this.loadBalancer = loadBalancer; 112 this.queues = backend.getTaskQueues(); 113 } 114 115 120 public void killAndForceDisable() 121 { 122 kill(true); 123 } 124 125 129 public void killWithoutDisablingBackend() 130 { 131 kill(false); 132 } 133 134 141 private void kill(boolean forceDisable) 142 { 143 synchronized (this) 144 { 145 if (logger.isDebugEnabled()) 146 logger.debug(this.getName() + " is shutting down"); 147 148 isKilled = true; 149 notify(); } 151 if (forceDisable) 152 { 153 try 154 { 155 loadBalancer.disableBackend(backend, true); 158 } 159 catch (SQLException ignore) 160 { 161 } 162 } 163 } 164 165 169 public void run() 170 { 171 BackendTaskQueueEntry currentlyProcessingEntry = null; 172 173 while (!isKilled) 174 { 175 try 176 { 177 if (isPlayingCommitRollbackOnly()) 179 currentlyProcessingEntry = queues 180 .getNextCommitRollbackToExecute(this); 181 else 182 currentlyProcessingEntry = queues.getNextEntryToExecute(this); 183 184 if (currentlyProcessingEntry == null) 185 { 186 logger.warn("Null task in BackendWorkerThread"); 187 continue; 188 } 189 190 if (logger.isDebugEnabled()) 192 logger.debug(Translate.get("backendworkerthread.execute.task", 193 currentlyProcessingEntry.getTask().toString())); 194 currentlyProcessingEntry.getTask().execute(this); 195 } 196 catch (SQLException e) 197 { 198 logger.warn(Translate.get("backendworkerthread.task.failed", e)); 200 } 201 catch (RuntimeException re) 202 { 203 logger.fatal(Translate.get( 204 "backendworkerthread.task.runtime.exception", 205 currentlyProcessingEntry == null 206 ? "null task" 207 : currentlyProcessingEntry.toString()), re); 208 209 try 212 { 213 currentlyProcessingEntry.getTask().notifyFailure(this, 1, 214 new SQLException (re.getMessage())); 215 } 216 catch (SQLException e1) 217 { 218 } 220 } 221 finally 222 { 223 setCurrentStatement(null); 224 try 225 { 226 if (currentlyProcessingEntry != null) 227 { 228 queues.completedEntryExecution(currentlyProcessingEntry); 229 if (logger.isDebugEnabled()) 230 logger.debug(Translate.get( 231 "backendworkerthread.execute.task.completed", 232 currentlyProcessingEntry.getTask().toString())); 233 } 234 } 235 catch (RuntimeException e) 236 { 237 logger.warn( 238 Translate.get("backendworkerthread.remove.task.error", e), e); 239 } 240 } 241 } } 243 244 247 248 253 public DatabaseBackend getBackend() 254 { 255 return backend; 256 } 257 258 263 public final Statement getCurrentStatement() 264 { 265 return currentStatement; 266 } 267 268 273 public final void setCurrentStatement(Statement currentStatement) 274 { 275 this.currentStatement = currentStatement; 276 } 277 278 283 public final AbstractLoadBalancer getLoadBalancer() 284 { 285 return loadBalancer; 286 } 287 288 293 public Trace getLogger() 294 { 295 return logger; 296 } 297 298 303 public boolean isPlayingCommitRollbackOnly() 304 { 305 return playingCommitRollbackOnly; 306 } 307 308 313 public void setPlayCommitRollbackOnly(boolean playCommitRollbackOnly) 314 { 315 this.playingCommitRollbackOnly = playCommitRollbackOnly; 316 } 317 318 321 public boolean equals(Object obj) 322 { 323 if (obj instanceof BackendWorkerThread) 324 { 325 return backend.equals(((BackendWorkerThread) obj).getBackend()); 326 } 327 return false; 328 } 329 330 333 public int hashCode() 334 { 335 return backend.hashCode(); 336 } 337 338 } | Popular Tags |