KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > tribe > adapters > MulticastRequestAdapter


1 /**
2  * Tribe: Group communication library.
3  * Copyright (C) 2004-2005 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: tribe@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): ______________________.
23  */

24
25 package org.objectweb.tribe.adapters;
26
27 import java.io.Serializable JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.HashMap JavaDoc;
30
31 import org.objectweb.tribe.channel.ReliableGroupChannel;
32 import org.objectweb.tribe.common.Member;
33 import org.objectweb.tribe.common.log.Trace;
34 import org.objectweb.tribe.exceptions.ChannelException;
35 import org.objectweb.tribe.exceptions.NotConnectedException;
36 import org.objectweb.tribe.exceptions.TimeoutException;
37 import org.objectweb.tribe.messages.MessageListener;
38
39 /**
40  * This class defines a MulticastRequestAdapter which is similar to JGroups
41  * MessageDispatcher. It is a sort of multicast RPC (Remote Procedure Call)
42  * where a message is multicasted and each member return a response.
43  *
44  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
45  * @version 1.0
46  */

47 public class MulticastRequestAdapter implements MessageListener
48 {
49   /** Do not wait for any response when multicasting a message (asynchronous) */
50   public static final int WAIT_NONE = 0;
51   /** Wait for the first reponse before returning of a multicast send */
52   public static final int WAIT_FIRST = 1;
53   /** Wait for a majority of responses before returning of a multicast send */
54   public static final int WAIT_MAJORITY = 2;
55   /** Wait for all responses before returning of a multicast send */
56   public static final int WAIT_ALL = 3;
57
58   private ReliableGroupChannel channel;
59   private Member me;
60   private MessageListener msgListener;
61   private MulticastRequestListener multicastRequestListener;
62   private HashMap JavaDoc pendingQueries;
63   private PullPushAdapter pullPushAdapter = null;
64   private int sequenceNumber = 0;
65   private static Trace logger = Trace
66                                                        .getLogger("org.objectweb.tribe.blocks.multicastadapter");
67
68   /**
69    * Creates a new <code>MulticastRequestAdapter</code> object
70    */

71   public MulticastRequestAdapter(ReliableGroupChannel channel,
72       MessageListener msgListener, MulticastRequestListener dispatcherListener)
73   {
74     pullPushAdapter = new PullPushAdapter(channel, this);
75     this.channel = channel;
76     this.msgListener = msgListener;
77     this.multicastRequestListener = dispatcherListener;
78     this.me = channel.getLocalMembership();
79     this.pendingQueries = new HashMap JavaDoc();
80   }
81
82   /**
83    * Returns the channel value.
84    *
85    * @return Returns the channel.
86    */

87   public ReliableGroupChannel getChannel()
88   {
89     return channel;
90   }
91
92   /**
93    * Stop this MulticastRequestAdapter. This does not close the underlying
94    * channel but terminates the PullPushAdapter create for this
95    * MulticastRequestAdapter.
96    */

97   public void stop()
98   {
99     pullPushAdapter.stop();
100   }
101
102   /**
103    * @see org.objectweb.tribe.messages.MessageListener#receive(java.io.Serializable)
104    */

105   public void receive(Serializable JavaDoc msg)
106   {
107     if (msg instanceof MulticastRequestAdapterMessage)
108     {
109       MulticastRequestAdapterMessage requestReply = (MulticastRequestAdapterMessage) msg;
110       if (requestReply.isReply())
111       { // It's a reply
112
MulticastResponse resp;
113         Integer JavaDoc key = new Integer JavaDoc(requestReply.getUid());
114         synchronized (pendingQueries)
115         {
116           resp = (MulticastResponse) pendingQueries.get(key);
117         }
118         if (resp == null)
119         { // No MulticastReponse object, it has probably been removed because
120
// wait mode policy is not synchronous.
121
if (logger.isDebugEnabled())
122             logger.debug("Dropping response to request "
123                 + requestReply.getUid());
124           return;
125         }
126         // Add the reply
127
if (logger.isDebugEnabled())
128           logger.debug("Received reply from " + requestReply.getSender()
129               + " to message " + requestReply.getUid());
130         resp.addResult(requestReply.getSender(), requestReply.getMessage());
131       }
132       else
133       { // It's a request, forward first to single threaded callback
134
Object JavaDoc callback1 = multicastRequestListener
135             .handleMessageSingleThreaded(requestReply.getMessage(),
136                 requestReply.getSender());
137
138         // Now, let the message be handled in a separate thread
139
new MulticastRequestAdapterThread(multicastRequestListener,
140             requestReply, callback1, pullPushAdapter, me).start();
141       }
142     }
143     else
144       // Regular message, forward to listener
145
msgListener.receive(msg);
146   }
147
148   /**
149    * Multicast a message to the given list of member.
150    *
151    * @param dests members to send the message to1
152    * @param msg message to send
153    * @param waitMode one of WAIT_NONE, WAIT_FIRST, WAIT_MAJORITY or WAIT_ALL
154    * @param timeout time in ms to wait for responses (0 means no timeout)
155    * @return the response list
156    * @throws TimeoutException if the timeout expires
157    * @throws NotConnectedException if the underlying channel is not connected to
158    * a group
159    * @throws ChannelException if an error occurs during transmission
160    */

161   public MulticastResponse multicastMessage(ArrayList JavaDoc dests, Serializable JavaDoc msg,
162       int waitMode, long timeout) throws TimeoutException, ChannelException,
163       NotConnectedException
164   {
165     // Create request
166
int seq = nextSequenceNumber();
167     // Object for collecting responses
168
MulticastResponse reply = new MulticastResponse(dests, waitMode);
169     MulticastRequestAdapterMessage toSend;
170     if (waitMode == WAIT_NONE)
171     { // No answer required, just report failed nodes
172
toSend = new MulticastRequestAdapterMessage(msg, me, seq,
173           MulticastRequestAdapterMessage.REQUEST_ONLY);
174       if (logger.isDebugEnabled())
175         logger.debug("Sending request " + seq + " to " + dests.size()
176             + " members.");
177       ArrayList JavaDoc failed = pullPushAdapter.send(toSend, dests);
178       reply.setFailedMembers(failed);
179       return reply;
180     }
181     else
182     { // We are waiting for an answer
183
toSend = new MulticastRequestAdapterMessage(msg, me, seq,
184           MulticastRequestAdapterMessage.REQUEST);
185
186       Integer JavaDoc key = new Integer JavaDoc(seq);
187       synchronized (pendingQueries)
188       {
189         pendingQueries.put(key, reply);
190       }
191
192       // Send the message and wait for responses
193
if (logger.isDebugEnabled())
194         logger.debug("Sending message " + key + " to " + dests.size()
195             + " members.");
196       ArrayList JavaDoc failed = pullPushAdapter.send(toSend, dests);
197       reply.setFailedMembers(failed);
198       reply.waitForCompletion(timeout);
199
200       // Release the MulticastResponse object
201
synchronized (pendingQueries)
202       {
203         pendingQueries.remove(key);
204       }
205       return reply;
206     }
207   }
208
209   private synchronized int nextSequenceNumber()
210   {
211     return sequenceNumber++;
212   }
213
214 }
Popular Tags