KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > dream > protocol > causality > InMessageSorterImpl


1 /**
2  * Dream
3  * Copyright (C) 2003-2004 INRIA Rhone-Alpes
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Contact : dream@objectweb.org
20  *
21  * Initial developer(s): Matthieu Leclercq
22  * Contributor(s):
23  */

24
25 package org.objectweb.dream.protocol.causality;
26
27 import java.util.Iterator JavaDoc;
28 import java.util.LinkedList JavaDoc;
29 import java.util.List JavaDoc;
30 import java.util.Map JavaDoc;
31
32 import org.objectweb.dream.AbstractComponent;
33 import org.objectweb.dream.Push;
34 import org.objectweb.dream.PushException;
35 import org.objectweb.dream.message.ExtensibleMessage;
36 import org.objectweb.dream.message.Message;
37 import org.objectweb.dream.message.manager.MessageManager;
38 import org.objectweb.dream.protocol.ArrowChunk;
39 import org.objectweb.dream.util.Error;
40 import org.objectweb.fractal.api.NoSuchInterfaceException;
41 import org.objectweb.fractal.api.control.IllegalBindingException;
42 import org.objectweb.fractal.api.control.IllegalLifeCycleException;
43 import org.objectweb.util.monolog.api.BasicLevel;
44
45 /**
46  * Incomming message sorter component. This component can be seen as a passive
47  * Push/Push queue.
48  */

49 public class InMessageSorterImpl extends AbstractComponent
50     implements
51       Push,
52       CausalityTransformerAttributeController
53 {
54
55   private MatrixClock matrixClockItf;
56   private Push outPushItf;
57   private MessageManager messageManagerItf;
58
59   private List JavaDoc waitingToDeliver;
60   private String JavaDoc arrowChunkName;
61   private String JavaDoc causalityChunkName;
62
63   /**
64    * Default constructor
65    */

66   public InMessageSorterImpl()
67   {
68     waitingToDeliver = new LinkedList JavaDoc();
69   }
70
71   /**
72    * @see Push#push(Message, Map)
73    */

74   public synchronized void push(Message message, Map JavaDoc context)
75       throws PushException
76   {
77     ArrowChunk arrowChunk = (ArrowChunk) message.getChunk(arrowChunkName);
78
79     if (arrowChunk == null)
80     {
81       logger.log(BasicLevel.ERROR, "Unable to find arrow chunk named "
82           + arrowChunkName + ". The message is dropped");
83       return;
84     }
85
86     CausalityChunk csltChunk = (CausalityChunk) message
87         .getChunk(causalityChunkName);
88
89     if (csltChunk == null)
90     {
91       throw new PushException("Unable to find causality chunk named "
92           + causalityChunkName);
93     }
94
95     int todo = matrixClockItf.testRecvMatrix(csltChunk.getCausalityStamp(),
96         arrowChunk.getProcessIdFrom(), arrowChunk.getProcessIdTo());
97     if (todo == MatrixClock.DELIVER)
98     {
99       deliverMessage(message);
100
101       // scan waiting to deliver message if they can be delivred.
102
boolean loop = true;
103       while (loop)
104       {
105         loop = false;
106         Iterator JavaDoc iter = waitingToDeliver.iterator();
107         while (iter.hasNext())
108         {
109           Message msg = (Message) iter.next();
110           ArrowChunk arrowChunk1 = (ArrowChunk) msg.getChunk(arrowChunkName);
111           CausalityChunk csltChunk1 = (CausalityChunk) msg
112               .getChunk(causalityChunkName);
113
114           if (logger.isLoggable(BasicLevel.DEBUG))
115           {
116             logger.log(BasicLevel.DEBUG, "Try to deliver message : " + msg);
117           }
118           todo = matrixClockItf.testRecvMatrix(csltChunk1.getCausalityStamp(),
119               arrowChunk1.getProcessIdFrom(), arrowChunk1.getProcessIdTo());
120           if (todo == MatrixClock.DELIVER)
121           {
122             iter.remove();
123             deliverMessage(msg);
124             // Matrix clock has changed, we have to rescan the list.
125
loop = true;
126             break;
127           }
128           else if (todo == MatrixClock.ALREADY_DELIVERED)
129           {
130             Error.bug(logger);
131           }
132           logger.log(BasicLevel.DEBUG, "Message can't be delivered yet");
133         }
134       }
135     }
136     else if (todo == MatrixClock.WAIT_TO_DELIVER)
137     {
138       // insert in the waitting list
139
waitingToDeliver.add(message);
140       if (logger.isLoggable(BasicLevel.DEBUG))
141       {
142         logger.log(BasicLevel.DEBUG, "block message " + message);
143       }
144     }
145     else
146     {
147       // message already delivred.
148
if (logger.isLoggable(BasicLevel.DEBUG))
149       {
150         logger.log(BasicLevel.DEBUG, "message already delivred " + message);
151       }
152       messageManagerItf.deleteMessage(message);
153     }
154   }
155
156   private void deliverMessage(Message message)
157   {
158     // try to remove causality chunk
159
if (message instanceof ExtensibleMessage)
160     {
161       Object JavaDoc causalityChunk = ((ExtensibleMessage) message)
162           .removeChunk(causalityChunkName);
163       messageManagerItf.deleteChunk(causalityChunk);
164     }
165
166     if (logger.isLoggable(BasicLevel.DEBUG))
167     {
168       logger.log(BasicLevel.DEBUG, "deliver message " + message);
169     }
170     try
171     {
172       outPushItf.push(message, null);
173     }
174     catch (PushException e)
175     {
176       logger.log(BasicLevel.ERROR, "A push exception occurs, drop message", e);
177       messageManagerItf.deleteMessage(message);
178     }
179   }
180
181   // ---------------------------------------------------------------------------
182
// Implementation of CausalityTransformerAttributeController interface
183
// ---------------------------------------------------------------------------
184

185   /**
186    * @see CausalityTransformerAttributeController#getArrowChunkName()
187    */

188   public String JavaDoc getArrowChunkName()
189   {
190     return arrowChunkName;
191   }
192
193   /**
194    * @see CausalityTransformerAttributeController#setArrowChunkName(String)
195    */

196   public void setArrowChunkName(String JavaDoc arrowChunkName)
197   {
198     this.arrowChunkName = arrowChunkName;
199   }
200
201   /**
202    * @see CausalityTransformerAttributeController#getCausalityChunkName()
203    */

204   public String JavaDoc getCausalityChunkName()
205   {
206     return causalityChunkName;
207   }
208
209   /**
210    * @see CausalityTransformerAttributeController#setCausalityChunkName(String)
211    */

212   public void setCausalityChunkName(String JavaDoc causalityChunkName)
213   {
214     this.causalityChunkName = causalityChunkName;
215   }
216
217   // ---------------------------------------------------------------------------
218
// Implementation of BindingController interface
219
// ---------------------------------------------------------------------------
220

221   /**
222    * @see org.objectweb.fractal.api.control.BindingController#listFc()
223    */

224   public String JavaDoc[] listFc()
225   {
226     return new String JavaDoc[]{Push.OUT_PUSH_ITF_NAME,
227         MatrixClock.MATRIX_CLOCK_ITF_NAME, MessageManager.ITF_NAME};
228   }
229
230   /**
231    * @see org.objectweb.fractal.api.control.BindingController#bindFc(String,
232    * Object)
233    */

234   public synchronized void bindFc(String JavaDoc clientItfName, Object JavaDoc serverItf)
235       throws NoSuchInterfaceException, IllegalBindingException,
236       IllegalLifeCycleException
237   {
238     super.bindFc(clientItfName, serverItf);
239     if (clientItfName.equals(MatrixClock.MATRIX_CLOCK_ITF_NAME))
240     {
241       matrixClockItf = (MatrixClock) serverItf;
242     }
243     else if (clientItfName.equals(Push.OUT_PUSH_ITF_NAME))
244     {
245       outPushItf = (Push) serverItf;
246     }
247     else if (clientItfName.equals(MessageManager.ITF_NAME))
248     {
249       messageManagerItf = (MessageManager) serverItf;
250     }
251   }
252
253 }
Popular Tags