1 package org.jacorb.notification.engine; 2 3 23 24 import java.util.ArrayList ; 25 import java.util.Iterator ; 26 import java.util.List ; 27 28 import org.jacorb.notification.interfaces.FilterStage; 29 30 34 public abstract class AbstractFilterTask extends AbstractMessageTask 35 { 36 private final TaskFactory taskFactory_; 37 38 41 private static final boolean STRICT_CHECKING = true; 42 43 46 protected static final FilterStage[] EMPTY_FILTERSTAGE = new FilterStage[0]; 47 48 51 protected FilterStage[] arrayCurrentFilterStage_; 52 53 58 private final List listOfFilterStageToBeProcessed_ = new ArrayList (); 59 60 62 protected AbstractFilterTask(TaskFactory taskFactory, TaskExecutor taskExecutor) 63 { 64 setTaskExecutor(taskExecutor); 65 66 taskFactory_ = taskFactory; 67 68 arrayCurrentFilterStage_ = EMPTY_FILTERSTAGE; 69 } 70 71 73 public final void doWork() throws InterruptedException 74 { 75 if (arrayCurrentFilterStage_.length > 0) 76 { 77 doFilter(); 78 } 79 } 80 81 protected abstract void doFilter() throws InterruptedException ; 82 83 protected TaskFactory getTaskFactory() 84 { 85 return taskFactory_; 86 } 87 88 protected boolean isFilterStageListEmpty() 89 { 90 return listOfFilterStageToBeProcessed_.isEmpty(); 91 } 92 93 protected void addFilterStage(FilterStage s) 94 { 95 listOfFilterStageToBeProcessed_.add(s); 96 } 97 98 protected void addFilterStage(List s) 99 { 100 if (STRICT_CHECKING) 101 { 102 Iterator i = s.iterator(); 103 104 while (i.hasNext()) 105 { 106 if (!(i.next() instanceof FilterStage)) 107 { 108 throw new IllegalArgumentException (); 109 } 110 } 111 } 112 listOfFilterStageToBeProcessed_.addAll(s); 113 } 114 115 118 public void setCurrentFilterStage(FilterStage[] currentFilterStage) 119 { 120 arrayCurrentFilterStage_ = currentFilterStage; 121 } 122 123 126 public FilterStage[] getFilterStageToBeProcessed() 127 { 128 return (FilterStage[]) listOfFilterStageToBeProcessed_.toArray(EMPTY_FILTERSTAGE); 129 } 130 131 134 public void clearFilterStageToBeProcessed() 135 { 136 listOfFilterStageToBeProcessed_.clear(); 137 } 138 139 public synchronized void reset() 140 { 141 super.reset(); 142 143 clearFilterStageToBeProcessed(); 144 arrayCurrentFilterStage_ = EMPTY_FILTERSTAGE; 145 } 146 147 public void handleTaskError(AbstractTask task, Throwable error) 148 { 149 logger_.fatalError("Error while Filtering in Task:" + task, error); 150 } 151 152 155 public void schedule() throws InterruptedException 156 { 157 schedule(!getTaskExecutor().isTaskQueued()); 161 } 162 } | Popular Tags |