KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > dream > protocol > utobcast > REPImpl


1 /**
2  * Dream
3  * Copyright (C) 2003-2004 INRIA Rhone-Alpes
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Contact: dream@objectweb.org
20  *
21  * Initial developer(s): Vivien Quema
22  * Contributor(s):
23  */

24
25 package org.objectweb.dream.protocol.utobcast;
26
27 import java.util.Map JavaDoc;
28
29 import org.objectweb.dream.AbstractComponent;
30 import org.objectweb.dream.IOPushException;
31 import org.objectweb.dream.InterruptedPushException;
32 import org.objectweb.dream.Push;
33 import org.objectweb.dream.PushException;
34 import org.objectweb.dream.message.Message;
35 import org.objectweb.dream.message.manager.MessageManager;
36 import org.objectweb.dream.protocol.Process;
37 import org.objectweb.dream.protocol.utobcast.message.UTOBcastChunk;
38 import org.objectweb.fractal.api.NoSuchInterfaceException;
39 import org.objectweb.fractal.api.control.IllegalBindingException;
40 import org.objectweb.fractal.api.control.IllegalLifeCycleException;
41 import org.objectweb.util.monolog.api.BasicLevel;
42
43 /**
44  * Implementation of the REP component. This component handles REP messages.
45  * Upon message reception, it stores the message into the
46  * <code>PendingMessages</code> queue. Then it sends UTO messages all the
47  * processes but pleader, and an ACK message to pleader. Note that in case this
48  * component detects a failure of pleader, it becomes leader, and trigger a
49  * backup election.
50  */

51 public class REPImpl extends AbstractComponent implements Push
52 {
53
54   // ---------------------------------------------------------------------------
55
// Fields of this class
56
// ---------------------------------------------------------------------------
57

58   /**
59    * The commonly used name to refer to the <code>pendingMessageInItf</code>
60    * interface.
61    */

62   public static final String JavaDoc PENDING_MESSAGES_IN_ITF_NAME = "pending-messages-in";
63
64   /** The interface to store messages in the <code>PendingMessage</code> queue. */
65   protected Push pendingMessagesInItf;
66
67   /** The interface to send UTO and ACK messages. */
68   protected Push outPushItf;
69
70   /** The interface to retrieve process membership information. */
71   protected ProcessMembership processMembershipItf;
72
73   /** The interface to elect a new backup. */
74   protected BackupElection backupElectionItf;
75
76   /** The interface to manage message lifecycle. */
77   protected MessageManager messageManagerItf;
78
79   Object JavaDoc lock = new Object JavaDoc();
80
81   // ---------------------------------------------------------------------------
82
// Constructor
83
// ---------------------------------------------------------------------------
84

85   /**
86    * Constructor.
87    */

88   public REPImpl()
89   {
90   }
91
92   // ---------------------------------------------------------------------------
93
// Implementation of the Push interface
94
// ---------------------------------------------------------------------------
95

96   /**
97    * @see org.objectweb.dream.Push#push(org.objectweb.dream.message.Message,
98    * java.util.Map)
99    */

100   public void push(Message message, Map JavaDoc context) throws PushException
101   {
102     if (logger.isLoggable(BasicLevel.DEBUG))
103     {
104       logger.log(BasicLevel.DEBUG, "Received REP message " + message);
105     }
106     ///////////////////////////////////////////////////////////
107
// Add message to pendingMessages
108
///////////////////////////////////////////////////////////
109
if (logger.isLoggable(BasicLevel.DEBUG))
110     {
111       logger.log(BasicLevel.DEBUG, "Add REP message " + message
112           + " to PendingMessages");
113     }
114     messageManagerItf.duplicateMessage(message, false);
115     pendingMessagesInItf.push(message, null);
116
117     ///////////////////////////////////////////////////////////
118
// Send UTO messages to all processes but leader and backup
119
///////////////////////////////////////////////////////////
120
Process JavaDoc[] processes = processMembershipItf.getOtherProcesses();
121     UTOBcastChunk chunk = (UTOBcastChunk) message
122         .getChunk(UTOBcastChunk.DEFAULT_NAME);
123     chunk.setUTOBcastMessageType(UTOBcastChunk.UTO);
124     for (int i = 0; i < processes.length; i++)
125     {
126       // We do not clone messages. Be carefull, only works if messages are
127
// sent using this thread.
128
// Set the receiver process
129
chunk.setProcessTo(processes[i]);
130       if (logger.isLoggable(BasicLevel.DEBUG))
131       {
132         logger.log(BasicLevel.DEBUG, "Send UTO message " + message
133             + " to process " + processes[i]);
134       }
135       try
136       {
137         messageManagerItf.duplicateMessage(message, false);
138         outPushItf.push(message, null);
139       }
140       catch (InterruptedPushException e)
141       {
142         // Release resources: should delete duplicated messages
143
throw e;
144       }
145       catch (IOPushException e)
146       {
147         // The process processes[i] has crashed.
148
processMembershipItf.removeProcess(processes[i]);
149       }
150     }
151
152     ///////////////////////////////////////////////////////////
153
// Send ACK message to leader
154
///////////////////////////////////////////////////////////
155
chunk.setUTOBcastMessageType(UTOBcastChunk.ACK);
156
157     try
158     {
159       Process JavaDoc leader = processMembershipItf.getLeader();
160       if (logger.isLoggable(BasicLevel.DEBUG))
161       {
162         logger.log(BasicLevel.DEBUG, "Send ACK message " + message
163             + " to process " + leader);
164       }
165       chunk.setProcessTo(leader);
166     }
167     catch (InterruptedException JavaDoc e1)
168     {
169       throw new InterruptedPushException(e1);
170     }
171     try
172     {
173       // We do not duplicate the message
174
outPushItf.push(message, null);
175     }
176     catch (IOPushException e)
177     {
178       // We detect that the leader has crashed
179
// TODO
180
e.printStackTrace();
181     }
182
183   }
184
185   // ---------------------------------------------------------------------------
186
// Implementation of the BindingController interface
187
// ---------------------------------------------------------------------------
188

189   /**
190    * @see org.objectweb.fractal.api.control.BindingController#bindFc(java.lang.String,
191    * java.lang.Object)
192    */

193   public void bindFc(String JavaDoc clientItfName, Object JavaDoc serverItf)
194       throws NoSuchInterfaceException, IllegalBindingException,
195       IllegalLifeCycleException
196   {
197     super.bindFc(clientItfName, serverItf);
198     if (clientItfName.equals(PENDING_MESSAGES_IN_ITF_NAME))
199     {
200       pendingMessagesInItf = (Push) serverItf;
201     }
202     else if (clientItfName.equals(Push.OUT_PUSH_ITF_NAME))
203     {
204       outPushItf = (Push) serverItf;
205     }
206     else if (clientItfName.equals(ProcessMembership.ITF_NAME))
207     {
208       processMembershipItf = (ProcessMembership) serverItf;
209     }
210     else if (clientItfName.equals(BackupElection.ITF_NAME))
211     {
212       backupElectionItf = (BackupElection) serverItf;
213     }
214     else if (clientItfName.equals(MessageManager.ITF_NAME))
215     {
216       messageManagerItf = (MessageManager) serverItf;
217     }
218
219   }
220
221   /**
222    * @see org.objectweb.fractal.api.control.BindingController#listFc()
223    */

224   public String JavaDoc[] listFc()
225   {
226     return new String JavaDoc[]{PENDING_MESSAGES_IN_ITF_NAME, Push.OUT_PUSH_ITF_NAME,
227         ProcessMembership.ITF_NAME, BackupElection.ITF_NAME,
228         MessageManager.ITF_NAME};
229   }
230
231 }
Popular Tags