KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > dream > protocol > utobcast > DATImpl


1 /**
2  * Dream
3  * Copyright (C) 2003-2004 INRIA Rhone-Alpes
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Contact: dream@objectweb.org
20  *
21  * Initial developer(s): Vivien Quema
22  * Contributor(s):
23  */

24
25 package org.objectweb.dream.protocol.utobcast;
26
27 import java.util.Map JavaDoc;
28
29 import org.objectweb.dream.AbstractComponent;
30 import org.objectweb.dream.IOPushException;
31 import org.objectweb.dream.InterruptedPushException;
32 import org.objectweb.dream.Push;
33 import org.objectweb.dream.PushException;
34 import org.objectweb.dream.message.Message;
35 import org.objectweb.dream.message.manager.MessageManager;
36 import org.objectweb.dream.protocol.utobcast.message.UTOBcastChunk;
37 import org.objectweb.fractal.api.NoSuchInterfaceException;
38 import org.objectweb.fractal.api.control.IllegalBindingException;
39 import org.objectweb.fractal.api.control.IllegalLifeCycleException;
40 import org.objectweb.util.monolog.api.BasicLevel;
41
42 /**
43  * Implementation of the DAT component. This component handles DAT messages.
44  * Upon message reception, it stores the message into the
45  * <code>PendingMessages</code> queue, and then it sends a REP message to
46  * pbackup. Finally, it increments the sequence number. Note that this component
47  * may trigger a backup election if it detects its failure.
48  */

49 public class DATImpl extends AbstractComponent implements Push
50 {
51
52   // ---------------------------------------------------------------------------
53
// Fields of this class
54
// ---------------------------------------------------------------------------
55

56   /**
57    * The commonly used name to refer to the <code>pendingMessageInItf</code>
58    * interface.
59    */

60   public static final String JavaDoc PENDING_MESSAGES_IN_ITF_NAME = "pending-messages-in";
61
62   /** The interface to store messages in the <code>PendingMessage</code> queue. */
63   protected Push pendingMessagesInItf;
64
65   /** The interface to send REP messages to pbackup. */
66   protected Push outPushItf;
67
68   /** The interface to retrieve process membership information. */
69   protected ProcessMembership processMembershipItf;
70
71   /** The interface to get/set the sequence number. */
72   protected SequenceNumber sequenceNumberItf;
73
74   /** The interface to elect a new backup. */
75   protected BackupElection backupElectionItf;
76
77   /** The interface to manage message lifecycle. */
78   protected MessageManager messageManagerItf;
79
80   final Object JavaDoc lock = new Object JavaDoc();
81
82   // ---------------------------------------------------------------------------
83
// Constructor
84
// ---------------------------------------------------------------------------
85

86   /**
87    * Constructor.
88    */

89   public DATImpl()
90   {
91   }
92
93   // ---------------------------------------------------------------------------
94
// Implementation of the Push interface
95
// ---------------------------------------------------------------------------
96

97   /**
98    * @see org.objectweb.dream.Push#push(org.objectweb.dream.message.Message,
99    * java.util.Map)
100    */

101   public void push(Message message, Map JavaDoc context) throws PushException
102   {
103     UTOBcastChunk chunk = (UTOBcastChunk) message
104         .getChunk(UTOBcastChunk.DEFAULT_NAME);
105     synchronized (lock)
106     {
107       if (logger.isLoggable(BasicLevel.DEBUG))
108       {
109         logger.log(BasicLevel.DEBUG, "Received DAT message " + message);
110       }
111       // Set the sequence number
112
chunk.setSequenceNumber(sequenceNumberItf.getSequenceNumber());
113       // Add the message to pendingMessages
114
if (logger.isLoggable(BasicLevel.DEBUG))
115       {
116         logger.log(BasicLevel.DEBUG, "Add message " + message
117             + " to pending messages");
118       }
119       messageManagerItf.duplicateMessage(message, false);
120       pendingMessagesInItf.push(message, null);
121       // Set the new message type
122
chunk.setUTOBcastMessageType(UTOBcastChunk.REP);
123       // Set the process To (backup)
124
try
125       {
126         chunk.setProcessTo(processMembershipItf.getBackup());
127       }
128       catch (InterruptedException JavaDoc e)
129       {
130         // Release resources
131
messageManagerItf.deleteMessage(message);
132         throw new InterruptedPushException(e);
133       }
134       // Send REP message to backup
135
try
136       {
137         if (logger.isLoggable(BasicLevel.DEBUG))
138         {
139           logger.log(BasicLevel.DEBUG, "Send REP message " + message
140               + " to backup");
141         }
142         outPushItf.push(message, null);
143       }
144       catch (InterruptedPushException e)
145       {
146         // Release resources
147
messageManagerItf.deleteMessage(message);
148         throw e;
149       }
150       catch (IOPushException e)
151       {
152         // We have detected that backup has crashed
153
backupElectionItf.elect();
154         return;
155       }
156       // Increment the sequence number
157
if (logger.isLoggable(BasicLevel.DEBUG))
158       {
159         logger.log(BasicLevel.DEBUG, "Increment sequence number");
160       }
161       sequenceNumberItf.incrementSequenceNumber();
162     }
163   }
164
165   // ---------------------------------------------------------------------------
166
// Implementation of the BindingController interface
167
// ---------------------------------------------------------------------------
168

169   /**
170    * @see org.objectweb.fractal.api.control.BindingController#bindFc(java.lang.String,
171    * java.lang.Object)
172    */

173   public void bindFc(String JavaDoc clientItfName, Object JavaDoc serverItf)
174       throws NoSuchInterfaceException, IllegalBindingException,
175       IllegalLifeCycleException
176   {
177     super.bindFc(clientItfName, serverItf);
178     if (clientItfName.equals(PENDING_MESSAGES_IN_ITF_NAME))
179     {
180       pendingMessagesInItf = (Push) serverItf;
181     }
182     else if (clientItfName.equals(Push.OUT_PUSH_ITF_NAME))
183     {
184       outPushItf = (Push) serverItf;
185     }
186     else if (clientItfName.equals(SequenceNumber.ITF_NAME))
187     {
188       sequenceNumberItf = (SequenceNumber) serverItf;
189     }
190     else if (clientItfName.equals(ProcessMembership.ITF_NAME))
191     {
192       processMembershipItf = (ProcessMembership) serverItf;
193     }
194     else if (clientItfName.equals(BackupElection.ITF_NAME))
195     {
196       backupElectionItf = (BackupElection) serverItf;
197     }
198     else if (clientItfName.equals(MessageManager.ITF_NAME))
199     {
200       messageManagerItf = (MessageManager) serverItf;
201     }
202
203   }
204
205   /**
206    * @see org.objectweb.fractal.api.control.BindingController#listFc()
207    */

208   public String JavaDoc[] listFc()
209   {
210     return new String JavaDoc[]{PENDING_MESSAGES_IN_ITF_NAME, SequenceNumber.ITF_NAME,
211         Push.OUT_PUSH_ITF_NAME, ProcessMembership.ITF_NAME,
212         BackupElection.ITF_NAME, MessageManager.ITF_NAME};
213   }
214
215 }
Popular Tags