KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > tribe > gms > GroupMembershipService


1 /**
2  * Tribe: Group communication library.
3  * Copyright (C) 2004-2005 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: tribe@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): ______________________.
23  */

24
25 package org.objectweb.tribe.gms;
26
27 import java.util.ArrayList JavaDoc;
28 import java.util.HashMap JavaDoc;
29
30 import org.objectweb.tribe.channel.AbstractChannelPool;
31 import org.objectweb.tribe.channel.AbstractServerChannel;
32 import org.objectweb.tribe.channel.ReceiveBuffer;
33 import org.objectweb.tribe.channel.ReliableGroupChannelWithGms;
34 import org.objectweb.tribe.common.Address;
35 import org.objectweb.tribe.common.Group;
36 import org.objectweb.tribe.common.GroupIdentifier;
37 import org.objectweb.tribe.common.IpAddress;
38 import org.objectweb.tribe.common.Member;
39 import org.objectweb.tribe.common.log.Trace;
40 import org.objectweb.tribe.exceptions.AlreadyMemberException;
41 import org.objectweb.tribe.exceptions.ChannelException;
42 import org.objectweb.tribe.exceptions.NotConnectedException;
43 import org.objectweb.tribe.gms.discovery.DiscoveryListener;
44 import org.objectweb.tribe.gms.discovery.DiscoveryService;
45 import org.objectweb.tribe.gms.discovery.UdpDiscoveryService;
46 import org.objectweb.tribe.gms.protocol.GroupCompositionMessage;
47 import org.objectweb.tribe.gms.protocol.QuitMessage;
48 import org.objectweb.tribe.messages.PingMessage;
49
50 /**
51  * This class defines a GroupMembershipService
52  *
53  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
54  * @version 1.0
55  */

56 public class GroupMembershipService
57     implements
58       DiscoveryListener,
59       GroupMembershipListener
60 {
61   /** Destination key for GMS messages */
62   public static String JavaDoc GMS_KEY = "tribe.gms";
63
64   // HashMap: GroupIdentifier -> Group
65
protected HashMap JavaDoc groupMemberships;
66   private GroupMembershipListenerThread membershipThread;
67   private IpAddress replyAddress;
68   private AbstractServerChannel serverChannel;
69   private DiscoveryService discovery;
70   protected ArrayList JavaDoc listeners;
71   private AbstractChannelPool channelPool;
72   private ReceiveBuffer receiveBuffer;
73   protected static Trace logger = Trace
74                                                     .getLogger("org.objectweb.tribe.gms");
75
76   /**
77    * Creates a new <code>GroupMembershipService</code> object
78    *
79    * @param replyAddress address to send replies to
80    * @param channelPool channel pool for group communications
81    * @param discovery the discovery service to use
82    * @throws ChannelException if no channel can be bound to the reply address
83    */

84   public GroupMembershipService(IpAddress replyAddress,
85       AbstractChannelPool channelPool, DiscoveryService discovery)
86       throws ChannelException
87   {
88     groupMemberships = new HashMap JavaDoc();
89     listeners = new ArrayList JavaDoc();
90     this.channelPool = channelPool;
91     this.discovery = discovery;
92     if (channelPool != null)
93     {
94       receiveBuffer = new ReceiveBuffer(GroupMembershipService.GMS_KEY);
95       membershipThread = new GroupMembershipListenerThread(this);
96       membershipThread.start();
97       channelPool.registerReceiveBuffer(receiveBuffer);
98       serverChannel = channelPool.getServerChannel(replyAddress);
99       try
100       {
101         // Get the real bind address (especially if port was set to 0)
102
this.replyAddress = (IpAddress) serverChannel.getBindAddress();
103         if (this.replyAddress.getAddress().getHostAddress().equals("0.0.0.0"))
104           this.replyAddress.setAddress(replyAddress.getAddress());
105       }
106       catch (NotConnectedException e)
107       {
108         throw new ChannelException("Unable to bind server channel.");
109       }
110
111       discovery.registerDiscoveryListener(this);
112       ((UdpDiscoveryService) discovery).setReplyAddress(this.replyAddress);
113     }
114   }
115
116   /**
117    * Returns the receiveBuffer value.
118    *
119    * @return Returns the receiveBuffer.
120    */

121   protected ReceiveBuffer getReceiveBuffer()
122   {
123     return receiveBuffer;
124   }
125
126   /**
127    * Returns the logger value.
128    *
129    * @return Returns the logger.
130    */

131   protected static Trace getLogger()
132   {
133     return logger;
134   }
135
136   /**
137    * Joins the group that has the given group identifier.
138    *
139    * @param channel client channel to join the group
140    * @param gid the identifier of the group to join
141    * @return the Member corresponding to the newly joining member
142    * @throws AlreadyMemberException if we are already member of the group
143    * @throws NotConnectedException if the channel is not connected
144    * @throws ChannelException if an error is reported by the channel
145    */

146   public Member join(ReliableGroupChannelWithGms channel, GroupIdentifier gid)
147       throws AlreadyMemberException, NotConnectedException, ChannelException
148   {
149     Member me = new Member(replyAddress, channel.toString());
150     if (logger.isDebugEnabled())
151       logger.debug("Member " + me.getUid() + " joins group " + gid);
152     Group g;
153     synchronized (groupMemberships)
154     {
155       g = getGroup(gid);
156       if (g == null)
157       { // Group does not exist, create it
158
g = new Group(gid);
159         groupMemberships.put(gid, g);
160       }
161       else if (g.hasMember(me))
162         throw new AlreadyMemberException();
163
164       // Add ourselves to the group
165
g.addMember(me);
166     }
167
168     // Ask for the group composition
169
if (discovery != null)
170       discovery.sendGroupDiscovery(gid);
171
172     // Notify listeners
173
synchronized (listeners)
174     {
175       int size = listeners.size();
176       for (int i = 0; i < size; i++)
177         ((GroupMembershipListener) listeners.get(i)).joinMember(me, gid);
178     }
179
180     return me;
181   }
182
183   /**
184    * Leaves the group that has the given group identifier.
185    *
186    * @param channel client channel to leave the group
187    * @param gid the identifier of the group to quit
188    * @throws ChannelException if an error occurs
189    * @throws NotConnectedException if m does not belong to this group
190    */

191   public void quit(ReliableGroupChannelWithGms channel, GroupIdentifier gid)
192       throws ChannelException, NotConnectedException
193   {
194     Group g = getGroup(gid);
195     if (g == null)
196       throw new NotConnectedException(
197           "Trying to leave a group we do not belong to.");
198     Member me = new Member(replyAddress, channel.toString());
199     if (logger.isDebugEnabled())
200       logger.debug("Member " + me.getUid() + " quits group " + gid);
201     if (channelPool != null)
202     {
203       ArrayList JavaDoc failed = channelPool.send(new QuitMessage(gid, me), g
204           .getMembers());
205       if (failed != null)
206         throw new ChannelException(failed.size()
207             + " member did not receive the quit message.");
208     }
209   }
210
211   /**
212    * Get the list of members in a group.
213    *
214    * @param gid the group identifier
215    * @return the <code>Group</code> corresponding to this gid or null if no
216    * such group is registered in the GMS.
217    */

218   public Group getGroup(GroupIdentifier gid)
219   {
220     synchronized (groupMemberships)
221     {
222       return (Group) groupMemberships.get(gid);
223     }
224   }
225
226   /**
227    * Register a new GroupMembershipListener.
228    *
229    * @param listener the listener to add
230    */

231   public void registerGroupMembershipListener(GroupMembershipListener listener)
232   {
233     synchronized (listeners)
234     {
235       listeners.add(listener);
236     }
237   }
238
239   /**
240    * Unregister a GroupMembershipListener.
241    *
242    * @param listener the listener to remove
243    * @return true if the listener was registered, false otherwise
244    */

245   public boolean unregisterGroupMembershipListener(
246       GroupMembershipListener listener)
247   {
248     synchronized (listeners)
249     {
250       return listeners.remove(listener);
251     }
252   }
253
254   //
255
// Discovery Service related functions
256
//
257

258   /**
259    * @see org.objectweb.tribe.gms.discovery.DiscoveryListener#discoveryRequest(org.objectweb.tribe.common.GroupIdentifier,
260    * org.objectweb.tribe.common.Address)
261    */

262   public void discoveryRequest(GroupIdentifier gid, Address sender)
263   {
264     Group g = getGroup(gid);
265     if (g != null)
266     {
267       if (logger.isDebugEnabled())
268         logger.debug("Sending GroupCompositionMessage for group "
269             + g.getGroupIdentifier() + " to " + sender);
270
271       try
272       {
273         channelPool.getChannel(sender).send(
274             new GroupCompositionMessage(g, replyAddress));
275       }
276       catch (ChannelException e)
277       {
278         logger.error("Failed to send GroupCompositionMessage for group " + gid
279             + " to " + sender, e);
280       }
281       catch (NotConnectedException e)
282       {
283         logger.error("Unable to read " + sender + " to send group " + gid
284             + " composition.");
285       }
286     }
287     else if (logger.isDebugEnabled())
288       logger.debug("No information to send for group " + gid);
289
290   }
291
292   //
293
// Group Membership Listener related functions
294
//
295

296   /**
297    * @see org.objectweb.tribe.gms.GroupMembershipListener#groupComposition(Group,
298    * Address)
299    */

300   public void groupComposition(Group g, Address sender)
301   {
302     // Discard local messages
303
// if (replyAddress.equals(sender))
304
// return;
305

306     GroupIdentifier gid = g.getGroupIdentifier();
307     synchronized (groupMemberships)
308     {
309       Group localGroup = getGroup(gid);
310       if (localGroup == null)
311       {
312         if (logger.isDebugEnabled())
313           logger.debug("New group " + gid + " composition");
314         groupMemberships.put(gid, g);
315       }
316       else
317       {
318         if (logger.isDebugEnabled())
319           logger
320               .debug("Merging local group composition with the one received from "
321                   + sender);
322         if (localGroup.merge(g))
323         { // Group composition has changed, broadcast the new group composition
324
// to everybody in the group.
325

326           // First check that all members in the new group are reachable
327
if (logger.isDebugEnabled())
328             logger.debug("Checking new group composition:"
329                 + localGroup.getStringMembers());
330           ArrayList JavaDoc failed = channelPool.send(new PingMessage(), localGroup
331               .getMembers());
332           if (failed != null)
333           { // Removed all failed members from group
334
ArrayList JavaDoc remainingMembers = localGroup.getMembers();
335             remainingMembers.removeAll(failed);
336             // Annonced that these members failed
337
for (int i = 0; i < failed.size(); i++)
338             {
339               channelPool.send(new QuitMessage(gid, (Member) failed.get(i)),
340                   remainingMembers);
341             }
342           }
343
344           // Now broadcast the real group composition with only alive members
345
if (logger.isDebugEnabled())
346             logger.debug("Broacasting new group composition:"
347                 + localGroup.getStringMembers());
348           failed = channelPool.send(new GroupCompositionMessage(localGroup,
349               replyAddress), localGroup.getMembers());
350           if (failed != null)
351           { // Removed all failed members from group
352
ArrayList JavaDoc remainingMembers = localGroup.getMembers();
353             remainingMembers.removeAll(failed);
354             // Annonced that these members failed
355
for (int i = 0; i < failed.size(); i++)
356             {
357               channelPool.send(new QuitMessage(gid, (Member) failed.get(i)),
358                   remainingMembers);
359             }
360           }
361         }
362         else if (logger.isDebugEnabled())
363         {
364           logger.debug("Group composition has not changed.");
365         }
366       }
367     }
368
369     // Notify listeners
370
synchronized (listeners)
371     {
372       int size = listeners.size();
373       for (int i = 0; i < size; i++)
374         ((GroupMembershipListener) listeners.get(i))
375             .groupComposition(g, sender);
376     }
377   }
378
379   /**
380    * @see org.objectweb.tribe.gms.GroupMembershipListener#joinMember(org.objectweb.tribe.common.Member,
381    * org.objectweb.tribe.common.GroupIdentifier)
382    */

383   public void joinMember(Member m, GroupIdentifier gid)
384   {
385     synchronized (groupMemberships)
386     {
387       Group localGroup = getGroup(gid);
388       if (localGroup == null)
389       {
390         if (logger.isDebugEnabled())
391           logger.debug("Ignoring member " + m + " joining group " + gid);
392       }
393       else
394       {
395         if (logger.isDebugEnabled())
396           logger.debug("Member " + m + " joins group " + gid);
397         localGroup.addMember(m);
398       }
399     }
400
401     // Notify listeners
402
synchronized (listeners)
403     {
404       int size = listeners.size();
405       for (int i = 0; i < size; i++)
406         ((GroupMembershipListener) listeners.get(i)).joinMember(m, gid);
407     }
408   }
409
410   /**
411    * @see org.objectweb.tribe.gms.GroupMembershipListener#quitMember(org.objectweb.tribe.common.Member,
412    * org.objectweb.tribe.common.GroupIdentifier)
413    */

414   public void quitMember(Member m, GroupIdentifier gid)
415   {
416     synchronized (groupMemberships)
417     {
418       Group localGroup = getGroup(gid);
419       if (localGroup == null)
420       {
421         if (logger.isDebugEnabled())
422           logger.debug("Ignoring member " + m + " leaving group " + gid);
423       }
424       else
425       {
426         if (logger.isDebugEnabled())
427           logger.debug("Member " + m + " leaves group " + gid);
428         localGroup.removeMember(m);
429       }
430     }
431
432     // Notify listeners
433
synchronized (listeners)
434     {
435       int size = listeners.size();
436       for (int i = 0; i < size; i++)
437         ((GroupMembershipListener) listeners.get(i)).quitMember(m, gid);
438     }
439   }
440
441   /**
442    * @see org.objectweb.tribe.gms.GroupMembershipListener#failedMember(org.objectweb.tribe.common.Member,
443    * org.objectweb.tribe.common.GroupIdentifier,
444    * org.objectweb.tribe.common.Member)
445    */

446   public void failedMember(Member failed, GroupIdentifier gid, Member sender)
447   {
448     synchronized (groupMemberships)
449     {
450       Group localGroup = getGroup(gid);
451       if (localGroup == null)
452       {
453         if (logger.isDebugEnabled())
454           logger.debug("Ignoring member " + failed + " failed in group " + gid);
455       }
456       else
457       {
458         if (logger.isInfoEnabled())
459           logger.info("Member " + failed + " failed in group " + gid);
460         localGroup.removeMember(failed);
461       }
462     }
463
464     // Notify listeners
465
synchronized (listeners)
466     {
467       int size = listeners.size();
468       for (int i = 0; i < size; i++)
469         ((GroupMembershipListener) listeners.get(i)).failedMember(failed, gid,
470             sender);
471     }
472   }
473
474   /**
475    * Stop this GMS service and its associated thread (note that the discovery
476    * service must be stopped separately).
477    */

478   public void stop()
479   {
480     groupMemberships.clear();
481     if (channelPool != null)
482     {
483       membershipThread.kill();
484       channelPool.removeServerChannelFromPool(serverChannel);
485       try
486       {
487         membershipThread.join(1000);
488       }
489       catch (InterruptedException JavaDoc ignore)
490       {
491       }
492     }
493   }
494
495 }
Popular Tags