1 46 50 package org.mr.core.util; 51 52 import org.apache.commons.logging.LogFactory; 53 import org.mr.core.persistent.PersistentQueue; 54 55 64 public class ComplexStage extends AbstractStage { 65 Queue[] queues; 66 67 71 public ComplexStage(ComplexStageParams params){ 72 super(params); 73 queues= new Queue[params.numberOfQueues]; 74 for (int i = 0; i < params.numberOfQueues; i++) { 75 if(params.isPersistent()){ 76 queues[i] = new PersistentQueue(params.getStageName()+i,true, params.isBlocking()); 77 }else{ 78 queues[i] = new SynchronizedQueue(); 79 } 81 82 } 84 for(int index =0 ; index < params.getNumberOfStartThreads() ; index++){ 85 StageExecutionThread exe = new StageExecutionThread(this); 86 exe.setName(params.getStageName()+"ExecutionThread"+(index+1)); 87 exe.start(); 88 stageExecutionThreads.add(exe); 89 } 90 } 91 92 95 protected synchronized Object dequeue(){ 96 98 Queue result = null; 99 100 while((result=allQueuesEmpty())==null) { 101 try { 102 wait(); 103 } catch (InterruptedException ex){ 105 if(LogFactory.getLog("ComplexStage").isFatalEnabled()){ 106 LogFactory.getLog("ComplexStage").fatal("Stage Queue has got an Exception." , ex); 107 } } } return result.dequeue(); 112 } 115 140 143 private final Queue allQueuesEmpty(){ 144 for (int i = 0; i < queues.length; i++) { 145 if(!queues[i].isEmpty()) { 146 return queues[i]; 147 } } return null; 150 } 152 157 public synchronized void enqueue(Object event , int queueNumber){ 158 queues[queueNumber].enqueue(event); 159 notifyAll(); 160 } 161 162 } 163 | Popular Tags |