KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > dream > protocol > utobcast > BroadcastImpl


1 /**
2  * Dream
3  * Copyright (C) 2003-2004 INRIA Rhone-Alpes
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Contact: dream@objectweb.org
20  *
21  * Initial developer(s): Vivien Quema
22  * Contributor(s):
23  */

24
25 package org.objectweb.dream.protocol.utobcast;
26
27 import java.util.Map JavaDoc;
28
29 import org.objectweb.dream.AbstractComponent;
30 import org.objectweb.dream.IOPushException;
31 import org.objectweb.dream.InterruptedPushException;
32 import org.objectweb.dream.Push;
33 import org.objectweb.dream.PushException;
34 import org.objectweb.dream.PushWithReturn;
35 import org.objectweb.dream.message.ChunkAlreadyExistsException;
36 import org.objectweb.dream.message.ExtensibleMessage;
37 import org.objectweb.dream.message.Message;
38 import org.objectweb.dream.message.manager.MessageManager;
39 import org.objectweb.dream.protocol.Process;
40 import org.objectweb.dream.protocol.utobcast.message.CrashedLeaderChunk;
41 import org.objectweb.dream.protocol.utobcast.message.UTOBcastChunk;
42 import org.objectweb.dream.util.Error;
43 import org.objectweb.fractal.api.NoSuchInterfaceException;
44 import org.objectweb.fractal.api.control.IllegalBindingException;
45 import org.objectweb.fractal.api.control.IllegalLifeCycleException;
46 import org.objectweb.util.monolog.api.BasicLevel;
47
48 /**
49  * Implementation of the UTOBroadcast component. This component sends messages
50  * to the leader. If it detects a failure of the leader, it sends the message to
51  * the backup.
52  */

53 public class BroadcastImpl extends AbstractComponent implements Push
54 {
55
56   // ---------------------------------------------------------------------------
57
// Fields of this class
58
// ---------------------------------------------------------------------------
59

60   /** The interface to send messages to be delivered. */
61   protected Push outPushItf;
62
63   /** The interface to send messages to be UTO broadcast. */
64   protected PushWithReturn outPushWithReturnItf;
65
66   /** The interface to retrieve process membership information. */
67   protected ProcessMembership processMembershipItf;
68
69   /** The interface to manage message lifecycle. */
70   protected MessageManager messageManagerItf;
71
72   Process JavaDoc myProcess;
73   boolean configured = false;
74
75   // ---------------------------------------------------------------------------
76
// Constructor
77
// ---------------------------------------------------------------------------
78

79   /**
80    * Constructor.
81    */

82   public BroadcastImpl()
83   {
84   }
85
86   // ---------------------------------------------------------------------------
87
// Implementation of the Push interface
88
// ---------------------------------------------------------------------------
89

90   /**
91    * @see org.objectweb.dream.Push#push(org.objectweb.dream.message.Message,
92    * java.util.Map)
93    */

94   public void push(Message message, Map JavaDoc context) throws PushException
95   {
96     if (logger.isLoggable(BasicLevel.DEBUG))
97     {
98       logger.log(BasicLevel.DEBUG, "Received message " + message
99           + " to be broadcast");
100     }
101     if (!configured)
102     {
103       try
104       {
105         configure();
106       }
107       catch (InterruptedException JavaDoc e1)
108       {
109         throw new InterruptedPushException(e1);
110       }
111     }
112     // Retrieve the UTOBcast chunk
113
UTOBcastChunk utobcastChunk = (UTOBcastChunk) message
114         .getChunk(UTOBcastChunk.DEFAULT_NAME);
115     if (utobcastChunk == null)
116     {
117       // UTOBcast chunk not present -> create it
118
if (message instanceof ExtensibleMessage)
119       {
120         utobcastChunk = (UTOBcastChunk) messageManagerItf
121             .createChunk(UTOBcastChunk.TYPE);
122         try
123         {
124           ((ExtensibleMessage) message).addChunk(UTOBcastChunk.DEFAULT_NAME,
125               UTOBcastChunk.TYPE, utobcastChunk);
126         }
127         catch (ChunkAlreadyExistsException e2)
128         {
129           // Cannot happen
130
Error.bug(logger, e2);
131         }
132       }
133       else
134       {
135         // The message is not extensible -> unable to add a chunk
136
Error.error("Message is not extensible: unable to add UTOBcast chunk",
137             logger);
138       }
139     }
140
141     // Fill the UTOBcast chunk (message type and sender)
142
utobcastChunk.setUTOBcastMessageType(UTOBcastChunk.DAT);
143     utobcastChunk.setProcessFrom(myProcess);
144
145     Message returnMessage = null;
146
147     while (returnMessage == null)
148     {
149       // Retrieve the leader process
150
Process JavaDoc leader;
151       try
152       {
153         leader = processMembershipItf.getLeader();
154       }
155       catch (InterruptedException JavaDoc e1)
156       {
157         throw new InterruptedPushException(e1);
158       }
159       utobcastChunk.setProcessTo(leader);
160       try
161       {
162         //Send the message
163
if (logger.isLoggable(BasicLevel.DEBUG))
164         {
165           logger.log(BasicLevel.DEBUG, "Send DAT message " + message
166               + " to leader");
167         }
168         returnMessage = outPushWithReturnItf.pushWithReturn(message, null);
169         if (returnMessage.getChunk(CrashedLeaderChunk.DEFAULT_NAME) != null)
170         {
171           // We are notified that the leader has crashed
172
// new leader should already have been elected
173
if (logger.isLoggable(BasicLevel.DEBUG))
174           {
175             logger.log(BasicLevel.DEBUG,
176                 "Received AllKey -> the leader has crahsed");
177           }
178           returnMessage = null;
179           continue;
180         }
181         else
182         {
183           // Deliver the message to myself
184
if (logger.isLoggable(BasicLevel.DEBUG))
185           {
186             logger.log(BasicLevel.DEBUG, "Received message to be delivered "
187                 + returnMessage);
188           }
189           outPushItf.push(returnMessage, null);
190         }
191       }
192       catch (IOPushException e)
193       {
194         // We detect that the leader has crashed (returnMessage still null)
195
processMembershipItf.electBackupAsLeader(leader);
196       }
197     }
198   }
199
200   // ---------------------------------------------------------------------------
201
// Utility methods.
202
// ---------------------------------------------------------------------------
203

204   void configure() throws InterruptedException JavaDoc
205   {
206     myProcess = processMembershipItf.getMyself();
207     configured = true;
208   }
209
210   // ---------------------------------------------------------------------------
211
// Implementation of the BindingController interface
212
// ---------------------------------------------------------------------------
213

214   /**
215    * @see org.objectweb.fractal.api.control.BindingController#bindFc(java.lang.String,
216    * java.lang.Object)
217    */

218   public void bindFc(String JavaDoc clientItfName, Object JavaDoc serverItf)
219       throws NoSuchInterfaceException, IllegalBindingException,
220       IllegalLifeCycleException
221   {
222     super.bindFc(clientItfName, serverItf);
223     if (clientItfName.equals(Push.OUT_PUSH_ITF_NAME))
224     {
225       outPushItf = (Push) serverItf;
226     }
227     else if (clientItfName.equals(PushWithReturn.OUT_PUSH_WITH_RETURN_ITF_NAME))
228     {
229       outPushWithReturnItf = (PushWithReturn) serverItf;
230     }
231     else if (clientItfName.equals(ProcessMembership.ITF_NAME))
232     {
233       processMembershipItf = (ProcessMembership) serverItf;
234     }
235     else if (clientItfName.equals(MessageManager.ITF_NAME))
236     {
237       messageManagerItf = (MessageManager) serverItf;
238     }
239   }
240
241   /**
242    * @see org.objectweb.fractal.api.control.BindingController#listFc()
243    */

244   public String JavaDoc[] listFc()
245   {
246     return new String JavaDoc[]{Push.OUT_PUSH_ITF_NAME,
247         PushWithReturn.OUT_PUSH_ITF_NAME, ProcessMembership.ITF_NAME,
248         MessageManager.ITF_NAME};
249   }
250
251 }
Popular Tags