1 46 50 package org.mr.core.util; 51 52 53 import org.mr.core.persistent.PersistentQueue; 54 55 61 public class Stage extends AbstractStage{ 62 private Queue queue; 63 64 65 69 public Stage(StageParams params){ 70 super(params); 71 if(params.isPersistent()){ 72 queue = new PersistentQueue(params.getStageName(),true, params.isBlocking()); 73 }else{ 74 queue = new SynchronizedQueue(); 75 } 76 77 for(int index =0 ; index < params.getNumberOfStartThreads() ; index++){ 78 StageExecutionThread exe = new StageExecutionThread(this); 79 exe.setName(params.getStageName()+"ExecThread"+(index+1)); 80 exe.setPriority(Thread.NORM_PRIORITY+ params.getStagePriority()); 81 exe.start(); 82 stageExecutionThreads.add(exe); 83 } 84 85 } 86 87 88 92 protected Object dequeue(){ 93 return queue.dequeue(); 94 } 95 96 100 public void enqueue(Object event){ 101 queue.enqueue(event); 102 } 103 104 107 public int size(){ 108 return queue.size(); 109 } 110 111 114 public void stop(){ 115 for(int index =0 ; index < stageExecutionThreads.size();index++){ 116 StageExecutionThread exe = (StageExecutionThread) stageExecutionThreads.get(index); 117 exe.setUp(false); 118 this.enqueue(exe.stopEvent); 119 } 120 } 121 122 123 } 124 | Popular Tags |