1 24 25 package org.objectweb.dream.pump; 26 27 import java.util.Collections ; 28 import java.util.HashMap ; 29 import java.util.Map ; 30 31 import org.objectweb.dream.AbstractComponent; 32 import org.objectweb.dream.Pull; 33 import org.objectweb.dream.PullException; 34 import org.objectweb.dream.Push; 35 import org.objectweb.dream.PushException; 36 import org.objectweb.dream.control.activity.ActiveComponentAttributeController; 37 import org.objectweb.dream.control.activity.Util; 38 import org.objectweb.dream.control.activity.task.AbstractTask; 39 import org.objectweb.dream.control.activity.task.Task; 40 import org.objectweb.dream.control.activity.task.TaskController; 41 import org.objectweb.dream.control.activity.task.thread.ThreadPoolController; 42 import org.objectweb.dream.message.Message; 43 import org.objectweb.dream.synchro.Mutex; 44 import org.objectweb.fractal.api.Component; 45 import org.objectweb.fractal.api.NoSuchInterfaceException; 46 import org.objectweb.fractal.api.control.IllegalBindingException; 47 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 48 import org.objectweb.fractal.julia.control.lifecycle.ChainedIllegalLifeCycleException; 49 50 55 public class PumpSynchronizedImpl extends AbstractComponent 56 implements 57 PumpAttributeController, 58 ActiveComponentAttributeController 59 { 60 61 62 protected Push outPushItf; 63 64 65 protected Pull inPullItf; 66 67 68 protected Mutex mutexItf; 69 70 protected Task pumpTask = new PumpTask(); 71 72 protected boolean pushNullPolicy = false; 73 74 protected boolean usePool = false; 75 76 protected int initialNumberOfThreads = 1; 77 78 protected int initialCapacity = 1; 79 80 84 87 public boolean getPushNullPolicy() 88 { 89 return pushNullPolicy; 90 } 91 92 95 public void setPushNullPolicy(boolean pushNull) 96 { 97 pushNullPolicy = pushNull; 98 } 99 100 104 class PumpTask extends AbstractTask 105 { 106 107 110 public PumpTask() 111 { 112 super("PumpTask"); 113 } 114 115 118 public Object execute(Object hints) throws InterruptedException 119 { 120 if (mutexItf != null) 121 { 122 mutexItf.lock(); 123 } 124 try 125 { 126 Message msg = inPullItf.pull(null); 127 if (msg != null || pushNullPolicy) 128 { 129 outPushItf.push(msg, null); 130 } 131 } 132 catch (PullException e) 133 { 134 throw new InterruptedException (e.getLocalizedMessage()); 135 } 136 catch (PushException e) 137 { 138 throw new InterruptedException (e.getLocalizedMessage()); 139 } 140 finally 141 { 142 if (mutexItf != null) 143 { 144 mutexItf.unlock(); 145 } 146 } 147 return EXECUTE_AGAIN; 148 } 149 } 150 151 155 158 public void setUsePool(boolean usePool) 159 { 160 this.usePool = usePool; 161 } 162 163 166 public boolean getUsePool() 167 { 168 return usePool; 169 } 170 171 174 public void setInitialCapacity(int i) 175 { 176 this.initialCapacity = i; 177 } 178 179 182 public int getInitialCapacity() 183 { 184 return initialCapacity; 185 } 186 187 190 public void setInitialNumberOfThreads(int i) 191 { 192 this.initialNumberOfThreads = i; 193 } 194 195 198 public int getInitialNumberOfThreads() 199 { 200 return initialNumberOfThreads; 201 } 202 203 207 210 public void startFc() throws IllegalLifeCycleException 211 { 212 super.startFc(); 213 if (usePool) 214 { 215 try 216 { 217 TaskController tc = (TaskController) weaveableC 218 .getFcInterface("task-controller"); 219 ThreadPoolController threadPoolController = (ThreadPoolController) tc 220 .getTaskControl(pumpTask); 221 threadPoolController.addThreads(initialNumberOfThreads); 222 } 223 catch (Exception e) 224 { 225 throw new ChainedIllegalLifeCycleException(e, null, 226 "An error occured while retrieving task controller interface"); 227 } 228 } 229 } 230 231 234 protected void beforeFirstStart(Component componentItf) 235 throws IllegalLifeCycleException 236 { 237 try 238 { 239 if (usePool) 240 { 241 Map m = new HashMap (); 242 m.put("thread", "pool"); 243 m.put("threadPool.capacity", new Integer (initialCapacity)); 244 Util.addTask(componentItf, pumpTask, m); 245 } 249 else 250 { 251 Util.addTask(componentItf, pumpTask, Collections.EMPTY_MAP); 252 } 253 } 254 catch (Exception e) 255 { 256 throw new IllegalLifeCycleException("Can't add task"); 257 } 258 } 259 260 264 267 public String [] listFc() 268 { 269 return new String []{Push.OUT_PUSH_ITF_NAME, Pull.IN_PULL_ITF_NAME, 270 Mutex.MUTEX_ITF_NAME}; 271 } 272 273 277 public synchronized void bindFc(String clientItfName, Object serverItf) 278 throws NoSuchInterfaceException, IllegalBindingException, 279 IllegalLifeCycleException 280 { 281 super.bindFc(clientItfName, serverItf); 282 if (clientItfName.equals(Push.OUT_PUSH_ITF_NAME)) 283 { 284 outPushItf = (Push) serverItf; 285 } 286 else if (clientItfName.equals(Pull.IN_PULL_ITF_NAME)) 287 { 288 inPullItf = (Pull) serverItf; 289 } 290 else if (clientItfName.equals(Mutex.MUTEX_ITF_NAME)) 291 { 292 mutexItf = (Mutex) serverItf; 293 } 294 } 295 296 } | Popular Tags |