KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > dream > pushwithreturn > SynchronizerImpl


1 /**
2  * Dream
3  * Copyright (C) 2003-2004 INRIA Rhone-Alpes
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Contact : dream@objectweb.org
20  *
21  * Initial developer(s): Vivien Quema
22  * Contributor(s):
23  */

24
25 package org.objectweb.dream.pushwithreturn;
26
27 import java.util.HashMap JavaDoc;
28 import java.util.Iterator JavaDoc;
29 import java.util.Map JavaDoc;
30
31 import org.objectweb.dream.AbstractComponent;
32 import org.objectweb.dream.InterruptedPushException;
33 import org.objectweb.dream.Push;
34 import org.objectweb.dream.Push1;
35 import org.objectweb.dream.PushException;
36 import org.objectweb.dream.PushWithReturn;
37 import org.objectweb.dream.message.Message;
38 import org.objectweb.dream.message.manager.MessageManager;
39 import org.objectweb.dream.pool.ObjectPool;
40 import org.objectweb.dream.pool.Recyclable;
41 import org.objectweb.fractal.api.NoSuchInterfaceException;
42 import org.objectweb.fractal.api.control.IllegalBindingException;
43 import org.objectweb.fractal.api.control.IllegalLifeCycleException;
44 import org.objectweb.util.monolog.api.BasicLevel;
45
46 /**
47  * This component handles calls to the
48  * {@link PushWithReturn#pushWithReturn(Message, Map)}method (i.e. outgoing
49  * messages). It has one {@link PushWithReturn}input, one {@link Push}output,
50  * and one {@link Push1}input.
51  * <p>
52  * It receives calls to the <code>pushWithReturn</code> method. It generates a
53  * key for the message, then it pushes the message on its output, and waits for
54  * a message with the same key to arrive on its <code>Push1</code> input. This
55  * message is the returned message of the <code>pushWithReturn</code> method.
56  * <p>
57  * <i>Note: </i> if a message with an "All" key is received on the input, it is
58  * used as a return message of all the <code>pushWithReturn</code> method
59  * calls currently waiting for a return message.
60  */

61 public class SynchronizerImpl extends AbstractComponent
62     implements
63       PushWithReturn,
64       Push1,
65       SynchronizerAttributeController
66 {
67
68   /** An hash map containing waiting keys. */
69   HashMap JavaDoc waitingKeysMap = new HashMap JavaDoc();
70
71   // Client interfaces
72
/** The client interface used to access the message manager. */
73   MessageManager messageManagerItf;
74
75   /** The client interface used to access the WaitingKey pool. */
76   ObjectPool waitingKeyPoolItf;
77
78   /** The KeyGenerator client interface. */
79   KeyGenerator keyGeneratorItf;
80
81   /** The Push output interface. */
82   Push outPushItf;
83
84   boolean mustClone;
85
86   // ---------------------------------------------------------------------------
87
// Implementation of the PushWithReturn interface.
88
// ---------------------------------------------------------------------------
89

90   /**
91    * @see org.objectweb.dream.PushWithReturn#pushWithReturn(org.objectweb.dream.message.Message,
92    * java.util.Map)
93    */

94   public Message pushWithReturn(Message message, Map JavaDoc context)
95       throws PushException
96   {
97     return doPush(message);
98   }
99
100   /**
101    * @see Push#push(Message, Map)
102    */

103   public void push(Message message, Map JavaDoc context) throws PushException
104   {
105     doPush(message);
106   }
107
108   // ---------------------------------------------------------------------------
109
// Implementation of the Push1 interface.
110
// ---------------------------------------------------------------------------
111

112   /**
113    * @see org.objectweb.dream.Push1#push1(org.objectweb.dream.message.Message,
114    * java.util.Map)
115    */

116   public void push1(Message message, Map JavaDoc context) throws PushException
117   {
118     Key key = null;
119     try
120     {
121       // Generate or retrieve a key for the message
122
key = keyGeneratorItf.generateKey(message);
123     }
124     catch (Exception JavaDoc e)
125     {
126       logger.log(BasicLevel.ERROR, e);
127       throw new PushException(e);
128     }
129
130     if (key.isAll())
131     {
132       synchronized (waitingKeysMap)
133       {
134         Iterator JavaDoc iter = waitingKeysMap.keySet().iterator();
135         while (iter.hasNext())
136         {
137           Object JavaDoc keyTemp = iter.next();
138           WaitingKey waitingKey = (WaitingKey) waitingKeysMap.get(keyTemp);
139           waitingKey.setReturnMessage(message);
140           synchronized (waitingKey)
141           {
142             waitingKey.setCanPass(true);
143             waitingKey.notifyAll();
144           }
145         }
146       }
147     }
148     else
149     {
150       WaitingKey waitingKey = (WaitingKey) waitingKeysMap.get(key);
151       if (waitingKey != null)
152       {
153         waitingKey.setReturnMessage(message);
154         synchronized (waitingKey)
155         {
156           waitingKey.setCanPass(true);
157           waitingKey.notifyAll();
158         }
159       }
160       else
161       {
162         messageManagerItf.deleteMessage(message);
163       }
164     }
165   }
166
167   // ---------------------------------------------------------------------------
168
// Utility methods.
169
// ---------------------------------------------------------------------------
170

171   private Message doPush(Message message) throws PushException
172   {
173     Key key = null;
174     try
175     {
176       // Generate a key for the message
177
key = keyGeneratorItf.generateKey(message);
178     }
179     catch (Exception JavaDoc e)
180     {
181       logger.log(BasicLevel.ERROR, e);
182       throw new PushException(e);
183     }
184
185     //Add the key to the HashMap
186
WaitingKey waitingKey;
187     synchronized (waitingKeysMap)
188     {
189       waitingKey = (WaitingKey) waitingKeysMap.get(key);
190       if (waitingKey == null)
191       {
192         waitingKey = (WaitingKey) waitingKeyPoolItf.newInstance();
193         waitingKeysMap.put(key, waitingKey);
194       }
195       synchronized (waitingKey)
196       {
197         waitingKey.incrementNbWaitings();
198       }
199     }
200
201     // Push the message on the output
202
outPushItf.push(message, null);
203
204     // Wait for the return message
205
Message returnMessage;
206     synchronized (waitingKey)
207     {
208       if (!waitingKey.canPass())
209       {
210         try
211         {
212           waitingKey.wait();
213         }
214         catch (InterruptedException JavaDoc e)
215         {
216           throw new InterruptedPushException(e);
217         }
218       }
219       returnMessage = waitingKey.getReturnMessage();
220       waitingKey.decrementNbWaitings();
221       if (waitingKey.getNbWaitings() > 0)
222       {
223         // There are other calls waiting
224
return messageManagerItf.duplicateMessage(returnMessage, mustClone);
225       }
226     }
227     synchronized (waitingKeysMap)
228     {
229       synchronized (waitingKey)
230       {
231         if (waitingKey.getNbWaitings() <= 0)
232         {
233           waitingKeyPoolItf.recycleInstance((Recyclable) waitingKeysMap
234               .remove(key));
235         }
236       }
237     }
238     return returnMessage;
239   }
240
241   //---------------------------------------------------------------------------
242
// Implementation of the SynchronizerAttributeController
243
// interface.
244
// ---------------------------------------------------------------------------
245

246   /**
247    * @see org.objectweb.dream.pushwithreturn.SynchronizerAttributeController#setMustClone(boolean)
248    */

249   public void setMustClone(boolean mustClone)
250   {
251     this.mustClone = mustClone;
252   }
253
254   /**
255    * @see org.objectweb.dream.pushwithreturn.SynchronizerAttributeController#getMustClone()
256    */

257   public boolean getMustClone()
258   {
259     return mustClone;
260   }
261
262   // ---------------------------------------------------------------------------
263
// Implementation of the BindingController interface.
264
// ---------------------------------------------------------------------------
265

266   /**
267    * @see org.objectweb.fractal.api.control.BindingController#bindFc(java.lang.String,
268    * java.lang.Object)
269    */

270   public void bindFc(String JavaDoc clientItfName, Object JavaDoc serverItf)
271       throws NoSuchInterfaceException, IllegalBindingException,
272       IllegalLifeCycleException
273   {
274     super.bindFc(clientItfName, serverItf);
275     if (clientItfName.equals(Push.OUT_PUSH_ITF_NAME))
276     {
277       outPushItf = (Push) serverItf;
278     }
279     else if (clientItfName.equals(KeyGenerator.ITF_NAME))
280     {
281       keyGeneratorItf = (KeyGenerator) serverItf;
282     }
283     else if (clientItfName.equals(ObjectPool.ITF_NAME))
284     {
285       waitingKeyPoolItf = (ObjectPool) serverItf;
286     }
287     else if (clientItfName.equals(MessageManager.ITF_NAME))
288     {
289       messageManagerItf = (MessageManager) serverItf;
290     }
291   }
292
293   /**
294    * @see org.objectweb.fractal.api.control.BindingController#listFc()
295    */

296   public String JavaDoc[] listFc()
297   {
298     return new String JavaDoc[]{KeyGenerator.ITF_NAME, Push.OUT_PUSH_ITF_NAME,
299         ObjectPool.ITF_NAME, MessageManager.ITF_NAME};
300   }
301
302   // ---------------------------------------------------------------------------
303
// Inner class.
304
// ---------------------------------------------------------------------------
305

306   /**
307    * This class represents a waiting key.
308    */

309   public static class WaitingKey implements Recyclable
310   {
311     /** The return message. */
312     private Message returnMessage;
313
314     /** The number of waiting calls. */
315     private int nbWaitings = 0;
316
317     private boolean canPass = false;
318
319     /**
320      * Returns the return message.
321      *
322      * @return Returns the message.
323      */

324     public Message getReturnMessage()
325     {
326       return returnMessage;
327     }
328
329     /**
330      * Sets the return message.
331      *
332      * @param returnMessage the return message to set.
333      */

334     public void setReturnMessage(Message returnMessage)
335     {
336       this.returnMessage = returnMessage;
337     }
338
339     /**
340      * Increments the number of waiting calls.
341      */

342     public void incrementNbWaitings()
343     {
344       nbWaitings++;
345     }
346
347     /**
348      * Decrements the number of waiting calls.
349      */

350     public void decrementNbWaitings()
351     {
352       nbWaitings--;
353     }
354
355     /**
356      * Returns the number of waiting calls.
357      *
358      * @return the number of waiting calls.
359      */

360     public int getNbWaitings()
361     {
362       return nbWaitings;
363     }
364
365     /**
366      * Sets canPass to the given value.
367      *
368      * @param canPass the value to which canPass must be set.
369      */

370     public void setCanPass(boolean canPass)
371     {
372       this.canPass = canPass;
373     }
374
375     /**
376      * Returns the value of canPass.
377      *
378      * @return the value of canPass.
379      */

380     public boolean canPass()
381     {
382       return this.canPass;
383     }
384
385     // ---------------------------------------------------------------------------
386
// Implementation of the Recyclable interface.
387
// ---------------------------------------------------------------------------
388

389     /**
390      * @see org.objectweb.dream.pool.Recyclable#recycle()
391      */

392     public void recycle()
393     {
394       this.returnMessage = null;
395       this.nbWaitings = 0;
396       this.canPass = false;
397     }
398   }
399
400 }
Popular Tags