1 package org.jacorb.notification.engine; 2 3 23 24 import org.apache.avalon.framework.configuration.Configurable; 25 import org.apache.avalon.framework.configuration.Configuration; 26 import org.jacorb.notification.interfaces.Disposable; 27 import org.jacorb.notification.interfaces.FilterStage; 28 import org.jacorb.notification.interfaces.Message; 29 import org.jacorb.notification.interfaces.MessageConsumer; 30 import org.jacorb.notification.util.AbstractPoolablePool; 31 32 36 37 public class DefaultTaskFactory implements Disposable, Configurable, TaskFactory 38 { 39 final TaskProcessor taskProcessor_; 40 41 private final AbstractPoolablePool filterProxyConsumerTaskPool_ = new AbstractPoolablePool( 42 "FilterProxyConsumerTaskPool") 43 { 44 public Object newInstance() 45 { 46 return new FilterProxyConsumerTask(DefaultTaskFactory.this, 47 taskProcessor_.getFilterTaskExecutor()); 48 } 49 }; 50 51 private final AbstractPoolablePool filterSupplierAdminTaskPool_ = new AbstractPoolablePool( 52 "FilterSupplierAdminTaskPool") 53 { 54 public Object newInstance() 55 { 56 return new FilterSupplierAdminTask(DefaultTaskFactory.this, 57 taskProcessor_.getFilterTaskExecutor()); 58 } 59 }; 60 61 private final AbstractPoolablePool filterConsumerAdminTaskPool_ = new AbstractPoolablePool( 62 "FilterConsumerAdminTaskPool") 63 { 64 public Object newInstance() 65 { 66 return new FilterConsumerAdminTask(DefaultTaskFactory.this, 67 taskProcessor_.getFilterTaskExecutor()); 68 } 69 }; 70 71 private final AbstractPoolablePool filterProxySupplierTaskPool_ = new AbstractPoolablePool( 72 "FilterProxySupplierTaskPool") 73 { 74 public Object newInstance() 75 { 76 return new FilterProxySupplierTask(DefaultTaskFactory.this, 77 taskProcessor_.getFilterTaskExecutor()); 78 } 79 }; 80 81 83 public DefaultTaskFactory(TaskProcessor taskProcessor) 84 { 85 taskProcessor_ = taskProcessor; 86 } 87 88 90 public void configure(Configuration conf) 91 { 92 filterProxyConsumerTaskPool_.configure(conf); 93 filterProxySupplierTaskPool_.configure(conf); 94 filterConsumerAdminTaskPool_.configure(conf); 95 filterSupplierAdminTaskPool_.configure(conf); 96 } 97 98 public void dispose() 99 { 100 filterProxyConsumerTaskPool_.dispose(); 101 filterProxySupplierTaskPool_.dispose(); 102 filterConsumerAdminTaskPool_.dispose(); 103 filterSupplierAdminTaskPool_.dispose(); 104 } 105 106 109 private FilterProxyConsumerTask newFilterProxyConsumerTask() 110 { 111 return (FilterProxyConsumerTask) filterProxyConsumerTaskPool_.lendObject(); 112 } 113 114 public Schedulable newFilterProxyConsumerTask(Message message) 115 { 116 FilterProxyConsumerTask task = newFilterProxyConsumerTask(); 117 118 task.setMessage(message); 119 120 task.setCurrentFilterStage(new FilterStage[] { message.getInitialFilterStage() }); 121 122 return task; 123 } 124 125 127 131 private FilterSupplierAdminTask newFilterSupplierAdminTask() 132 { 133 return (FilterSupplierAdminTask) filterSupplierAdminTaskPool_.lendObject(); 134 } 135 136 public Schedulable newFilterSupplierAdminTask(FilterProxyConsumerTask oldTask) 137 { 138 FilterSupplierAdminTask _newTask = newFilterSupplierAdminTask(); 139 140 if (oldTask.getFilterStageToBeProcessed().length != 1) 142 { 143 throw new RuntimeException (); 144 } 145 146 _newTask.setMessage(oldTask.removeMessage()); 147 148 _newTask.setCurrentFilterStage(oldTask.getFilterStageToBeProcessed()); 149 150 _newTask.setSkip(oldTask.getSkip()); 151 152 return _newTask; 153 } 154 155 157 161 private FilterConsumerAdminTask newFilterConsumerAdminTask() 162 { 163 return (FilterConsumerAdminTask) filterConsumerAdminTaskPool_.lendObject(); 164 } 165 166 public Schedulable newFilterConsumerAdminTask(FilterSupplierAdminTask oldTask) 167 { 168 FilterConsumerAdminTask _newTask = newFilterConsumerAdminTask(); 169 170 _newTask.setMessage(oldTask.removeMessage()); 171 172 _newTask.setCurrentFilterStage(oldTask.getFilterStageToBeProcessed()); 173 174 return _newTask; 175 } 176 177 179 183 private FilterProxySupplierTask newFilterProxySupplierTask() 184 { 185 return (FilterProxySupplierTask) filterProxySupplierTaskPool_.lendObject(); 186 } 187 188 public Schedulable newFilterProxySupplierTask(FilterConsumerAdminTask task) 189 { 190 FilterProxySupplierTask _newTask = newFilterProxySupplierTask(); 191 192 _newTask.setMessage(task.removeMessage()); 193 194 FilterStage[] _filterStageList = task.getFilterStageToBeProcessed(); 195 196 _newTask.setCurrentFilterStage(_filterStageList); 197 198 return _newTask; 199 } 200 201 203 207 public void enqueueMessage(FilterStage[] nodes, Message mesg) 208 { 209 enqueueMessage(nodes, mesg, FilterProxySupplierTask.EMPTY_MAP); 210 } 211 212 224 private void enqueueMessage(FilterStage[] filterStagesWithMessageConsumer, Message mesg, 225 FilterProxySupplierTask.AlternateMessageMap map) 226 { 227 for (int x = 0; x < filterStagesWithMessageConsumer.length; ++x) 228 { 229 MessageConsumer consumer = filterStagesWithMessageConsumer[x].getMessageConsumer(); 230 231 Message alternateMessage = map.getAlternateMessage(filterStagesWithMessageConsumer[x]); 232 233 if (alternateMessage != null) 234 { 235 consumer.deliverMessage(alternateMessage); 236 } 237 else 238 { 239 consumer.deliverMessage(mesg); 240 } 241 } 242 } 243 244 248 public void enqueueMessage(FilterProxySupplierTask task) 249 { 250 Message _message = task.removeMessage(); 251 252 FilterStage[] _seqFilterStageToBeProcessed = task.getFilterStageToBeProcessed(); 253 254 enqueueMessage(_seqFilterStageToBeProcessed, _message, task.changedMessages_); 255 } 256 } 257 | Popular Tags |