1 46 50 package org.mr.core.util; 51 52 import org.apache.commons.logging.Log; 53 import org.apache.commons.logging.LogFactory; 54 55 60 public class StageExecutionThread extends Thread { 61 private boolean up = true; 62 private AbstractStage stage; 63 private static Log log; 64 65 protected Object stopEvent = new Object (); 66 67 71 StageExecutionThread(AbstractStage stage){ 72 this.stage = stage; 73 log = LogFactory.getLog("StageExecutionThread"); 74 } 75 76 77 public void run(){ 78 while (up) { 79 try { 80 Object e = stage.dequeue(); 81 if(e == this.stopEvent || !this.up){ 82 return; 84 } 85 boolean cont = stage.sendToHandler(e); 86 if(!cont){ 87 up = cont; 88 } 89 } catch (ThreadDeath e) { 92 setUp(false); 94 throw e; 95 } catch (Throwable ex) { 97 if(log.isFatalEnabled()) 98 log.fatal("Stage thread error. ",ex); 99 } } if(log.isDebugEnabled()){ 102 log.debug("Handler returned false. Exiting stage execution thread."); 103 } 104 105 } 106 107 110 public boolean isUp() { 111 return up; 112 } 113 114 117 public void setUp(boolean up) { 118 this.up = up; 119 } 120 121 122 123 } 124 | Popular Tags |