KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > dream > multiplexer > PullPushMultiplexerImpl


1 /**
2  * Dream
3  * Copyright (C) 2003-2004 INRIA Rhone-Alpes
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Contact : dream@objectweb.org
20  *
21  * Initial developer(s): Vivien Quema
22  * Contributor(s):
23  */

24
25 package org.objectweb.dream.multiplexer;
26
27 import java.util.ArrayList JavaDoc;
28 import java.util.Collections JavaDoc;
29 import java.util.Hashtable JavaDoc;
30 import java.util.Iterator JavaDoc;
31 import java.util.Map JavaDoc;
32
33 import org.objectweb.dream.AbstractComponent;
34 import org.objectweb.dream.Pull;
35 import org.objectweb.dream.Push;
36 import org.objectweb.dream.control.activity.Util;
37 import org.objectweb.dream.control.activity.task.AbstractTask;
38 import org.objectweb.dream.message.Message;
39 import org.objectweb.dream.message.manager.MessageManager;
40 import org.objectweb.dream.time.SetTimeStamp;
41 import org.objectweb.fractal.api.Component;
42 import org.objectweb.fractal.api.NoSuchInterfaceException;
43 import org.objectweb.fractal.api.control.IllegalBindingException;
44 import org.objectweb.fractal.api.control.IllegalLifeCycleException;
45
46 /**
47  * Implementation of the <code>PullPushMultiplexer</code> interface. This
48  * multiplexers periodically pulls its inputs according to the parameters given
49  * to the attach method.
50  */

51 public class PullPushMultiplexerImpl extends AbstractComponent
52     implements
53       PullPushMultiplexer
54 {
55   /** The registered attachments. */
56   protected ArrayList JavaDoc attachments;
57   /** An hashtable storing the intputs of this component. */
58   protected Map JavaDoc inputs; // name ->
59
// input
60
/** An hashtable storing the outputs of this component. */
61   protected Map JavaDoc outputs; // name ->
62
// output
63
/** The message manager client interface of this component */
64   protected MessageManager messageManagerItf;
65   /** The SetTimeStamp client interface of this component. */
66   protected SetTimeStamp setTimeStamp;
67   /** An integer used to generate attachment ids. */
68   protected int attachmentId = 0;
69
70   /**
71    * Constructor.
72    */

73   public PullPushMultiplexerImpl()
74   {
75     this.attachments = new ArrayList JavaDoc();
76     this.inputs = new Hashtable JavaDoc();
77     this.outputs = new Hashtable JavaDoc();
78   }
79
80   // -------------------------------------------------------------------------
81
// Implementation of the PullPushMultiplexer interface
82
// -------------------------------------------------------------------------
83

84   /**
85    * Attaches a set of inputs to a set of outputs.
86    *
87    * @param inputNames the names of the inputs to be attached. These inputs must
88    * have been previously bound (with the same name) using the
89    * <code>BindingController</code> interface.
90    * @param inputContexts the contexts to be passed when pulling the inputs.
91    * @param outputNames the names of the outputs to wich intputs must be
92    * attached. These outputs must have been previously bound (with the
93    * same name) using the <code>BindingController</code> interface.
94    * @param outputContexts the contexts to be passed when pushing the outputs.
95    * @param parameters the attachement parameters. The <code>parameters</code>
96    * Map can have the following <code>String</code> as keys:
97    * <ul>
98    * <li><i>startingDate </i>: mapped to a <code>Long</code> that
99    * specifies in how many ms should the attachement be started. (0
100    * means now).</li>
101    * <li><i>endDate </i>: mapped to a <code>Long</code> that
102    * specifies in how many ms should the attachement be started. (0
103    * means now).</li>
104    * <li><i>pullingFrequency </i>: mapped to a <code>Long</code>
105    * that specifies the time to sleep between two pulling of the
106    * attached inputs.</li>
107    * <li><i>endDate </i>: mapped to a <code>Long</code> that
108    * specifies in how many ms should the attachement be stop. (must be >
109    * startingDate)</li>
110    * </ul>
111    * @return an <code>Attachment</code> object
112    * @throws NoSuchInterfaceException if inputNames or outputNames contains the
113    * name of an interface to which the component has not been bound.
114    * @see PeriodicAttachment
115    * @see org.objectweb.dream.multiplexer.PullPushMultiplexer#attach(String[],
116    * Map[], String[], Map[], Map)
117    */

118   public Attachment attach(String JavaDoc[] inputNames, Map JavaDoc[] inputContexts,
119       String JavaDoc[] outputNames, Map JavaDoc[] outputContexts, Map JavaDoc parameters)
120       throws NoSuchInterfaceException
121   {
122
123     long startingDate = 0;
124     if (parameters.get(PeriodicAttachment.STARTING_DATE) != null)
125     {
126       startingDate = ((Long JavaDoc) parameters.get(PeriodicAttachment.STARTING_DATE))
127           .longValue();
128     }
129     long endDate = 0;
130     if (parameters.get(PeriodicAttachment.END_DATE) != null)
131     {
132       endDate = ((Long JavaDoc) parameters.get(PeriodicAttachment.END_DATE))
133           .longValue();
134     }
135     long pullingFrequency = 0;
136     if (parameters.get(PeriodicAttachment.PULLING_FREQUENCY) != null)
137     {
138       pullingFrequency = ((Long JavaDoc) parameters
139           .get(PeriodicAttachment.PULLING_FREQUENCY)).longValue();
140     }
141     Pull[] attachmentInputs = new Pull[inputs.size()];
142     for (int i = 0; i < inputNames.length; i++)
143     {
144       attachmentInputs[i] = (Pull) inputs.get(inputNames[i]);
145     }
146     Push[] attachmentOutputs = new Push[outputs.size()];
147     for (int i = 0; i < outputNames.length; i++)
148     {
149       attachmentOutputs[i] = (Push) outputs.get(outputNames[i]);
150     }
151     PeriodicAttachment attachment = new PeriodicAttachment(attachmentId++,
152         inputNames, attachmentInputs, inputContexts, outputNames,
153         attachmentOutputs, outputContexts, startingDate, pullingFrequency,
154         endDate);
155     synchronized (attachments)
156     {
157       attachments.add(attachment);
158       attachments.notifyAll();
159     }
160     // Clone does not clone the inputs and outputs arrays
161
return (Attachment) attachment.clone();
162
163   }
164
165   /**
166    * @see PullPushMultiplexer#detach(Attachment)
167    */

168   public void detach(Attachment attachment)
169   {
170     synchronized (attachments)
171     {
172       // TODO the following instruction must not be necessary
173
((PeriodicAttachment) attachment).setEndDate(System.currentTimeMillis());
174       attachments.remove(attachment);
175     }
176   }
177
178   /**
179    * @see PullPushMultiplexer#update(Attachment)
180    */

181   public void update(Attachment attachment)
182   {
183     synchronized (attachments)
184     {
185       attachments.remove(attachment);
186       Attachment newAttachment = (Attachment) attachment.clone();
187       // Set the inputs and outputs
188
Pull[] attachmentInputs = new Pull[inputs.size()];
189       for (int i = 0; i < newAttachment.inputNames.length; i++)
190       {
191         attachmentInputs[i] = (Pull) inputs.get(newAttachment.inputNames[i]);
192       }
193       Push[] attachmentOutputs = new Push[outputs.size()];
194       for (int i = 0; i < newAttachment.outputNames.length; i++)
195       {
196         attachmentOutputs[i] = (Push) outputs.get(newAttachment.outputNames[i]);
197       }
198       newAttachment.setInputs(attachmentInputs);
199       newAttachment.setOutputs(attachmentOutputs);
200       attachments.add(newAttachment);
201     }
202   }
203
204   /**
205    * @see PullPushMultiplexer#getAttachments()
206    */

207   public ArrayList JavaDoc getAttachments()
208   {
209     ArrayList JavaDoc returnedAttachments = new ArrayList JavaDoc();
210     Iterator JavaDoc iter = attachments.iterator();
211     while (iter.hasNext())
212     {
213       returnedAttachments.add(((PeriodicAttachment) iter.next()).clone());
214     }
215     return returnedAttachments;
216   }
217
218   // -------------------------------------------------------------------------
219
// Implementation of the BindingController interface
220
// -------------------------------------------------------------------------
221

222   /**
223    * @see org.objectweb.fractal.api.control.BindingController#bindFc(String,
224    * Object)
225    */

226   public synchronized void bindFc(String JavaDoc clientItfName, Object JavaDoc serverItf)
227       throws NoSuchInterfaceException, IllegalBindingException,
228       IllegalLifeCycleException
229   {
230     super.bindFc(clientItfName, serverItf);
231     if (clientItfName.startsWith(Push.OUT_PUSH_ITF_NAME))
232     {
233       outputs.put(clientItfName, serverItf);
234     }
235     else if (clientItfName.startsWith(Pull.IN_PULL_ITF_NAME))
236     {
237       inputs.put(clientItfName, serverItf);
238     }
239     else if (clientItfName.equals(MessageManager.ITF_NAME))
240     {
241       messageManagerItf = (MessageManager) serverItf;
242     }
243     else if (clientItfName.equals(SetTimeStamp.ITF_NAME))
244     {
245       setTimeStamp = (SetTimeStamp) serverItf;
246     }
247   }
248
249   /**
250    * @see org.objectweb.fractal.api.control.BindingController#unbindFc(String)
251    */

252   public synchronized void unbindFc(String JavaDoc clientItfName)
253       throws NoSuchInterfaceException, IllegalBindingException,
254       IllegalLifeCycleException
255   {
256     super.unbindFc(clientItfName);
257     if (clientItfName.startsWith(Push.OUT_PUSH_ITF_NAME))
258     {
259       outputs.remove(clientItfName);
260     }
261     else if (clientItfName.startsWith(Pull.IN_PULL_ITF_NAME))
262     {
263       inputs.remove(clientItfName);
264     }
265   }
266
267   /**
268    * @see org.objectweb.fractal.api.control.BindingController#listFc()
269    */

270   public synchronized String JavaDoc[] listFc()
271   {
272     int size = outputs.size() + inputs.size() + 2;
273     String JavaDoc[] tab = new String JavaDoc[size];
274     outputs.keySet().toArray(tab);
275     String JavaDoc[] inputsArr = new String JavaDoc[inputs.size()];
276     inputs.keySet().toArray(inputsArr);
277     int j = 0;
278     for (int i = outputs.size(); i < outputs.size() + inputs.size(); i++)
279     {
280       tab[i] = inputsArr[j];
281       j++;
282     }
283     tab[inputs.size() + outputs.size()] = MessageManager.ITF_NAME;
284     tab[inputs.size() + outputs.size() + 1] = SetTimeStamp.ITF_NAME;
285     return tab;
286   }
287
288   // -------------------------------------------------------------------------
289
// Overriden methods
290
// -------------------------------------------------------------------------
291

292   /**
293    * @see org.objectweb.dream.AbstractComponent#beforeFirstStart(org.objectweb.fractal.api.Component)
294    */

295   protected void beforeFirstStart(Component componentItf)
296       throws IllegalLifeCycleException
297   {
298     try
299     {
300       Util.addTask(componentItf, new MultiplexerTask("MultiplexerTask"),
301           Collections.EMPTY_MAP);
302     }
303     catch (Exception JavaDoc e)
304     {
305       throw new IllegalLifeCycleException("Can't add task");
306     }
307   }
308
309   // -------------------------------------------------------------------------
310
// Private class defining the task of this component
311
// -------------------------------------------------------------------------
312

313   private class MultiplexerTask extends AbstractTask
314   {
315     /**
316      * @param name the name of the task.
317      */

318     public MultiplexerTask(String JavaDoc name)
319     {
320       super(name);
321     }
322
323     /**
324      * @see org.objectweb.dream.control.activity.task.Task#execute(Object)
325      */

326     public Object JavaDoc execute(Object JavaDoc hints) throws InterruptedException JavaDoc
327     {
328       long nextWakeup = Long.MAX_VALUE;
329       long currentTime = setTimeStamp.setTimeStamp();
330       synchronized (attachments)
331       {
332         int size = attachments.size();
333         for (int i = 0; i < size; i++)
334         {
335           PeriodicAttachment attachment = (PeriodicAttachment) attachments
336               .get(i);
337           long endDate = attachment.getEndDate();
338           if ((endDate != 0) && (endDate < currentTime))
339           { // Attachment has
340
// expired,
341
// remove it.
342
attachments.remove(i);
343             i--;
344             size--;
345             continue;
346           }
347           if (attachment.getNextDeadline() <= currentTime)
348           { // We need to probe
349
// this
350
// one
351
try
352             { // Probe the resources and insert the values in the buffer
353
Pull[] attachmentInputs = attachment.getInputs();
354               Map JavaDoc[] inputContexts = attachment.getInputContexts();
355               Push[] attachmentOutputs = attachment.getOutputs();
356               Map JavaDoc[] outputContexts = attachment.getOutputContexts();
357               for (int j = 0; j < attachmentInputs.length; j++)
358               {
359                 Message m = attachmentInputs[j].pull(inputContexts[j]);
360                 for (int k = 0; k < attachmentOutputs.length; k++)
361                 {
362                   // TODO add the clone and aggregate tests
363
attachmentOutputs[k].push(m, outputContexts[k]);
364                 }
365               }
366             }
367             catch (Exception JavaDoc e)
368             { // An error occured while monitoring,
369
// remove
370
// this attachment
371
e.printStackTrace();
372               attachments.remove(i);
373               i--;
374               size--;
375               continue;
376             }
377             // Update the next deadline for this attachment
378
attachment.setNextDeadline(currentTime
379                 + attachment.pullingFrequencyInMillis());
380           }
381
382           // Update next wake up time
383
if (nextWakeup > attachment.getNextDeadline())
384           {
385             nextWakeup = attachment.getNextDeadline();
386           }
387         }
388
389         // Wait until next deadline
390
if (nextWakeup == Long.MAX_VALUE)
391         {
392           // No attachment, wait until a new attachment is added
393
attachments.wait();
394         }
395         else
396         {
397           long timeToSleep = nextWakeup - currentTime;
398           if (timeToSleep > 0)
399           {
400             attachments.wait(timeToSleep);
401           }
402         }
403       }
404       return EXECUTE_AGAIN;
405     }
406   }
407
408 }
Popular Tags