1 24 25 package org.objectweb.dream.multiplexer; 26 27 import java.util.ArrayList ; 28 import java.util.Collections ; 29 import java.util.Hashtable ; 30 import java.util.Iterator ; 31 import java.util.Map ; 32 33 import org.objectweb.dream.AbstractComponent; 34 import org.objectweb.dream.Pull; 35 import org.objectweb.dream.Push; 36 import org.objectweb.dream.control.activity.Util; 37 import org.objectweb.dream.control.activity.task.AbstractTask; 38 import org.objectweb.dream.message.Message; 39 import org.objectweb.dream.message.manager.MessageManager; 40 import org.objectweb.dream.time.SetTimeStamp; 41 import org.objectweb.fractal.api.Component; 42 import org.objectweb.fractal.api.NoSuchInterfaceException; 43 import org.objectweb.fractal.api.control.IllegalBindingException; 44 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 45 46 51 public class PullPushMultiplexerImpl extends AbstractComponent 52 implements 53 PullPushMultiplexer 54 { 55 56 protected ArrayList attachments; 57 58 protected Map inputs; 61 protected Map outputs; 64 protected MessageManager messageManagerItf; 65 66 protected SetTimeStamp setTimeStamp; 67 68 protected int attachmentId = 0; 69 70 73 public PullPushMultiplexerImpl() 74 { 75 this.attachments = new ArrayList (); 76 this.inputs = new Hashtable (); 77 this.outputs = new Hashtable (); 78 } 79 80 84 118 public Attachment attach(String [] inputNames, Map [] inputContexts, 119 String [] outputNames, Map [] outputContexts, Map parameters) 120 throws NoSuchInterfaceException 121 { 122 123 long startingDate = 0; 124 if (parameters.get(PeriodicAttachment.STARTING_DATE) != null) 125 { 126 startingDate = ((Long ) parameters.get(PeriodicAttachment.STARTING_DATE)) 127 .longValue(); 128 } 129 long endDate = 0; 130 if (parameters.get(PeriodicAttachment.END_DATE) != null) 131 { 132 endDate = ((Long ) parameters.get(PeriodicAttachment.END_DATE)) 133 .longValue(); 134 } 135 long pullingFrequency = 0; 136 if (parameters.get(PeriodicAttachment.PULLING_FREQUENCY) != null) 137 { 138 pullingFrequency = ((Long ) parameters 139 .get(PeriodicAttachment.PULLING_FREQUENCY)).longValue(); 140 } 141 Pull[] attachmentInputs = new Pull[inputs.size()]; 142 for (int i = 0; i < inputNames.length; i++) 143 { 144 attachmentInputs[i] = (Pull) inputs.get(inputNames[i]); 145 } 146 Push[] attachmentOutputs = new Push[outputs.size()]; 147 for (int i = 0; i < outputNames.length; i++) 148 { 149 attachmentOutputs[i] = (Push) outputs.get(outputNames[i]); 150 } 151 PeriodicAttachment attachment = new PeriodicAttachment(attachmentId++, 152 inputNames, attachmentInputs, inputContexts, outputNames, 153 attachmentOutputs, outputContexts, startingDate, pullingFrequency, 154 endDate); 155 synchronized (attachments) 156 { 157 attachments.add(attachment); 158 attachments.notifyAll(); 159 } 160 return (Attachment) attachment.clone(); 162 163 } 164 165 168 public void detach(Attachment attachment) 169 { 170 synchronized (attachments) 171 { 172 ((PeriodicAttachment) attachment).setEndDate(System.currentTimeMillis()); 174 attachments.remove(attachment); 175 } 176 } 177 178 181 public void update(Attachment attachment) 182 { 183 synchronized (attachments) 184 { 185 attachments.remove(attachment); 186 Attachment newAttachment = (Attachment) attachment.clone(); 187 Pull[] attachmentInputs = new Pull[inputs.size()]; 189 for (int i = 0; i < newAttachment.inputNames.length; i++) 190 { 191 attachmentInputs[i] = (Pull) inputs.get(newAttachment.inputNames[i]); 192 } 193 Push[] attachmentOutputs = new Push[outputs.size()]; 194 for (int i = 0; i < newAttachment.outputNames.length; i++) 195 { 196 attachmentOutputs[i] = (Push) outputs.get(newAttachment.outputNames[i]); 197 } 198 newAttachment.setInputs(attachmentInputs); 199 newAttachment.setOutputs(attachmentOutputs); 200 attachments.add(newAttachment); 201 } 202 } 203 204 207 public ArrayList getAttachments() 208 { 209 ArrayList returnedAttachments = new ArrayList (); 210 Iterator iter = attachments.iterator(); 211 while (iter.hasNext()) 212 { 213 returnedAttachments.add(((PeriodicAttachment) iter.next()).clone()); 214 } 215 return returnedAttachments; 216 } 217 218 222 226 public synchronized void bindFc(String clientItfName, Object serverItf) 227 throws NoSuchInterfaceException, IllegalBindingException, 228 IllegalLifeCycleException 229 { 230 super.bindFc(clientItfName, serverItf); 231 if (clientItfName.startsWith(Push.OUT_PUSH_ITF_NAME)) 232 { 233 outputs.put(clientItfName, serverItf); 234 } 235 else if (clientItfName.startsWith(Pull.IN_PULL_ITF_NAME)) 236 { 237 inputs.put(clientItfName, serverItf); 238 } 239 else if (clientItfName.equals(MessageManager.ITF_NAME)) 240 { 241 messageManagerItf = (MessageManager) serverItf; 242 } 243 else if (clientItfName.equals(SetTimeStamp.ITF_NAME)) 244 { 245 setTimeStamp = (SetTimeStamp) serverItf; 246 } 247 } 248 249 252 public synchronized void unbindFc(String clientItfName) 253 throws NoSuchInterfaceException, IllegalBindingException, 254 IllegalLifeCycleException 255 { 256 super.unbindFc(clientItfName); 257 if (clientItfName.startsWith(Push.OUT_PUSH_ITF_NAME)) 258 { 259 outputs.remove(clientItfName); 260 } 261 else if (clientItfName.startsWith(Pull.IN_PULL_ITF_NAME)) 262 { 263 inputs.remove(clientItfName); 264 } 265 } 266 267 270 public synchronized String [] listFc() 271 { 272 int size = outputs.size() + inputs.size() + 2; 273 String [] tab = new String [size]; 274 outputs.keySet().toArray(tab); 275 String [] inputsArr = new String [inputs.size()]; 276 inputs.keySet().toArray(inputsArr); 277 int j = 0; 278 for (int i = outputs.size(); i < outputs.size() + inputs.size(); i++) 279 { 280 tab[i] = inputsArr[j]; 281 j++; 282 } 283 tab[inputs.size() + outputs.size()] = MessageManager.ITF_NAME; 284 tab[inputs.size() + outputs.size() + 1] = SetTimeStamp.ITF_NAME; 285 return tab; 286 } 287 288 292 295 protected void beforeFirstStart(Component componentItf) 296 throws IllegalLifeCycleException 297 { 298 try 299 { 300 Util.addTask(componentItf, new MultiplexerTask("MultiplexerTask"), 301 Collections.EMPTY_MAP); 302 } 303 catch (Exception e) 304 { 305 throw new IllegalLifeCycleException("Can't add task"); 306 } 307 } 308 309 313 private class MultiplexerTask extends AbstractTask 314 { 315 318 public MultiplexerTask(String name) 319 { 320 super(name); 321 } 322 323 326 public Object execute(Object hints) throws InterruptedException 327 { 328 long nextWakeup = Long.MAX_VALUE; 329 long currentTime = setTimeStamp.setTimeStamp(); 330 synchronized (attachments) 331 { 332 int size = attachments.size(); 333 for (int i = 0; i < size; i++) 334 { 335 PeriodicAttachment attachment = (PeriodicAttachment) attachments 336 .get(i); 337 long endDate = attachment.getEndDate(); 338 if ((endDate != 0) && (endDate < currentTime)) 339 { attachments.remove(i); 343 i--; 344 size--; 345 continue; 346 } 347 if (attachment.getNextDeadline() <= currentTime) 348 { try 352 { Pull[] attachmentInputs = attachment.getInputs(); 354 Map [] inputContexts = attachment.getInputContexts(); 355 Push[] attachmentOutputs = attachment.getOutputs(); 356 Map [] outputContexts = attachment.getOutputContexts(); 357 for (int j = 0; j < attachmentInputs.length; j++) 358 { 359 Message m = attachmentInputs[j].pull(inputContexts[j]); 360 for (int k = 0; k < attachmentOutputs.length; k++) 361 { 362 attachmentOutputs[k].push(m, outputContexts[k]); 364 } 365 } 366 } 367 catch (Exception e) 368 { e.printStackTrace(); 372 attachments.remove(i); 373 i--; 374 size--; 375 continue; 376 } 377 attachment.setNextDeadline(currentTime 379 + attachment.pullingFrequencyInMillis()); 380 } 381 382 if (nextWakeup > attachment.getNextDeadline()) 384 { 385 nextWakeup = attachment.getNextDeadline(); 386 } 387 } 388 389 if (nextWakeup == Long.MAX_VALUE) 391 { 392 attachments.wait(); 394 } 395 else 396 { 397 long timeToSleep = nextWakeup - currentTime; 398 if (timeToSleep > 0) 399 { 400 attachments.wait(timeToSleep); 401 } 402 } 403 } 404 return EXECUTE_AGAIN; 405 } 406 } 407 408 } | Popular Tags |