1 24 25 package org.objectweb.tribe.gms; 26 27 import java.util.ArrayList ; 28 import java.util.HashMap ; 29 30 import org.objectweb.tribe.channel.AbstractChannelPool; 31 import org.objectweb.tribe.channel.AbstractServerChannel; 32 import org.objectweb.tribe.channel.ReceiveBuffer; 33 import org.objectweb.tribe.channel.ReliableGroupChannelWithGms; 34 import org.objectweb.tribe.common.Address; 35 import org.objectweb.tribe.common.Group; 36 import org.objectweb.tribe.common.GroupIdentifier; 37 import org.objectweb.tribe.common.IpAddress; 38 import org.objectweb.tribe.common.Member; 39 import org.objectweb.tribe.common.log.Trace; 40 import org.objectweb.tribe.exceptions.AlreadyMemberException; 41 import org.objectweb.tribe.exceptions.ChannelException; 42 import org.objectweb.tribe.exceptions.NotConnectedException; 43 import org.objectweb.tribe.gms.discovery.DiscoveryListener; 44 import org.objectweb.tribe.gms.discovery.DiscoveryService; 45 import org.objectweb.tribe.gms.discovery.UdpDiscoveryService; 46 import org.objectweb.tribe.gms.protocol.GroupCompositionMessage; 47 import org.objectweb.tribe.gms.protocol.QuitMessage; 48 import org.objectweb.tribe.messages.PingMessage; 49 50 56 public class GroupMembershipService 57 implements 58 DiscoveryListener, 59 GroupMembershipListener 60 { 61 62 public static String GMS_KEY = "tribe.gms"; 63 64 protected HashMap groupMemberships; 66 private GroupMembershipListenerThread membershipThread; 67 private IpAddress replyAddress; 68 private AbstractServerChannel serverChannel; 69 private DiscoveryService discovery; 70 protected ArrayList listeners; 71 private AbstractChannelPool channelPool; 72 private ReceiveBuffer receiveBuffer; 73 protected static Trace logger = Trace 74 .getLogger("org.objectweb.tribe.gms"); 75 76 84 public GroupMembershipService(IpAddress replyAddress, 85 AbstractChannelPool channelPool, DiscoveryService discovery) 86 throws ChannelException 87 { 88 groupMemberships = new HashMap (); 89 listeners = new ArrayList (); 90 this.channelPool = channelPool; 91 this.discovery = discovery; 92 if (channelPool != null) 93 { 94 receiveBuffer = new ReceiveBuffer(GroupMembershipService.GMS_KEY); 95 membershipThread = new GroupMembershipListenerThread(this); 96 membershipThread.start(); 97 channelPool.registerReceiveBuffer(receiveBuffer); 98 serverChannel = channelPool.getServerChannel(replyAddress); 99 try 100 { 101 this.replyAddress = (IpAddress) serverChannel.getBindAddress(); 103 if (this.replyAddress.getAddress().getHostAddress().equals("0.0.0.0")) 104 this.replyAddress.setAddress(replyAddress.getAddress()); 105 } 106 catch (NotConnectedException e) 107 { 108 throw new ChannelException("Unable to bind server channel."); 109 } 110 111 discovery.registerDiscoveryListener(this); 112 ((UdpDiscoveryService) discovery).setReplyAddress(this.replyAddress); 113 } 114 } 115 116 121 protected ReceiveBuffer getReceiveBuffer() 122 { 123 return receiveBuffer; 124 } 125 126 131 protected static Trace getLogger() 132 { 133 return logger; 134 } 135 136 146 public Member join(ReliableGroupChannelWithGms channel, GroupIdentifier gid) 147 throws AlreadyMemberException, NotConnectedException, ChannelException 148 { 149 Member me = new Member(replyAddress, channel.toString()); 150 if (logger.isDebugEnabled()) 151 logger.debug("Member " + me.getUid() + " joins group " + gid); 152 Group g; 153 synchronized (groupMemberships) 154 { 155 g = getGroup(gid); 156 if (g == null) 157 { g = new Group(gid); 159 groupMemberships.put(gid, g); 160 } 161 else if (g.hasMember(me)) 162 throw new AlreadyMemberException(); 163 164 g.addMember(me); 166 } 167 168 if (discovery != null) 170 discovery.sendGroupDiscovery(gid); 171 172 synchronized (listeners) 174 { 175 int size = listeners.size(); 176 for (int i = 0; i < size; i++) 177 ((GroupMembershipListener) listeners.get(i)).joinMember(me, gid); 178 } 179 180 return me; 181 } 182 183 191 public void quit(ReliableGroupChannelWithGms channel, GroupIdentifier gid) 192 throws ChannelException, NotConnectedException 193 { 194 Group g = getGroup(gid); 195 if (g == null) 196 throw new NotConnectedException( 197 "Trying to leave a group we do not belong to."); 198 Member me = new Member(replyAddress, channel.toString()); 199 if (logger.isDebugEnabled()) 200 logger.debug("Member " + me.getUid() + " quits group " + gid); 201 if (channelPool != null) 202 { 203 ArrayList failed = channelPool.send(new QuitMessage(gid, me), g 204 .getMembers()); 205 if (failed != null) 206 throw new ChannelException(failed.size() 207 + " member did not receive the quit message."); 208 } 209 } 210 211 218 public Group getGroup(GroupIdentifier gid) 219 { 220 synchronized (groupMemberships) 221 { 222 return (Group) groupMemberships.get(gid); 223 } 224 } 225 226 231 public void registerGroupMembershipListener(GroupMembershipListener listener) 232 { 233 synchronized (listeners) 234 { 235 listeners.add(listener); 236 } 237 } 238 239 245 public boolean unregisterGroupMembershipListener( 246 GroupMembershipListener listener) 247 { 248 synchronized (listeners) 249 { 250 return listeners.remove(listener); 251 } 252 } 253 254 258 262 public void discoveryRequest(GroupIdentifier gid, Address sender) 263 { 264 Group g = getGroup(gid); 265 if (g != null) 266 { 267 if (logger.isDebugEnabled()) 268 logger.debug("Sending GroupCompositionMessage for group " 269 + g.getGroupIdentifier() + " to " + sender); 270 271 try 272 { 273 channelPool.getChannel(sender).send( 274 new GroupCompositionMessage(g, replyAddress)); 275 } 276 catch (ChannelException e) 277 { 278 logger.error("Failed to send GroupCompositionMessage for group " + gid 279 + " to " + sender, e); 280 } 281 catch (NotConnectedException e) 282 { 283 logger.error("Unable to read " + sender + " to send group " + gid 284 + " composition."); 285 } 286 } 287 else if (logger.isDebugEnabled()) 288 logger.debug("No information to send for group " + gid); 289 290 } 291 292 296 300 public void groupComposition(Group g, Address sender) 301 { 302 306 GroupIdentifier gid = g.getGroupIdentifier(); 307 synchronized (groupMemberships) 308 { 309 Group localGroup = getGroup(gid); 310 if (localGroup == null) 311 { 312 if (logger.isDebugEnabled()) 313 logger.debug("New group " + gid + " composition"); 314 groupMemberships.put(gid, g); 315 } 316 else 317 { 318 if (logger.isDebugEnabled()) 319 logger 320 .debug("Merging local group composition with the one received from " 321 + sender); 322 if (localGroup.merge(g)) 323 { 326 if (logger.isDebugEnabled()) 328 logger.debug("Checking new group composition:" 329 + localGroup.getStringMembers()); 330 ArrayList failed = channelPool.send(new PingMessage(), localGroup 331 .getMembers()); 332 if (failed != null) 333 { ArrayList remainingMembers = localGroup.getMembers(); 335 remainingMembers.removeAll(failed); 336 for (int i = 0; i < failed.size(); i++) 338 { 339 channelPool.send(new QuitMessage(gid, (Member) failed.get(i)), 340 remainingMembers); 341 } 342 } 343 344 if (logger.isDebugEnabled()) 346 logger.debug("Broacasting new group composition:" 347 + localGroup.getStringMembers()); 348 failed = channelPool.send(new GroupCompositionMessage(localGroup, 349 replyAddress), localGroup.getMembers()); 350 if (failed != null) 351 { ArrayList remainingMembers = localGroup.getMembers(); 353 remainingMembers.removeAll(failed); 354 for (int i = 0; i < failed.size(); i++) 356 { 357 channelPool.send(new QuitMessage(gid, (Member) failed.get(i)), 358 remainingMembers); 359 } 360 } 361 } 362 else if (logger.isDebugEnabled()) 363 { 364 logger.debug("Group composition has not changed."); 365 } 366 } 367 } 368 369 synchronized (listeners) 371 { 372 int size = listeners.size(); 373 for (int i = 0; i < size; i++) 374 ((GroupMembershipListener) listeners.get(i)) 375 .groupComposition(g, sender); 376 } 377 } 378 379 383 public void joinMember(Member m, GroupIdentifier gid) 384 { 385 synchronized (groupMemberships) 386 { 387 Group localGroup = getGroup(gid); 388 if (localGroup == null) 389 { 390 if (logger.isDebugEnabled()) 391 logger.debug("Ignoring member " + m + " joining group " + gid); 392 } 393 else 394 { 395 if (logger.isDebugEnabled()) 396 logger.debug("Member " + m + " joins group " + gid); 397 localGroup.addMember(m); 398 } 399 } 400 401 synchronized (listeners) 403 { 404 int size = listeners.size(); 405 for (int i = 0; i < size; i++) 406 ((GroupMembershipListener) listeners.get(i)).joinMember(m, gid); 407 } 408 } 409 410 414 public void quitMember(Member m, GroupIdentifier gid) 415 { 416 synchronized (groupMemberships) 417 { 418 Group localGroup = getGroup(gid); 419 if (localGroup == null) 420 { 421 if (logger.isDebugEnabled()) 422 logger.debug("Ignoring member " + m + " leaving group " + gid); 423 } 424 else 425 { 426 if (logger.isDebugEnabled()) 427 logger.debug("Member " + m + " leaves group " + gid); 428 localGroup.removeMember(m); 429 } 430 } 431 432 synchronized (listeners) 434 { 435 int size = listeners.size(); 436 for (int i = 0; i < size; i++) 437 ((GroupMembershipListener) listeners.get(i)).quitMember(m, gid); 438 } 439 } 440 441 446 public void failedMember(Member failed, GroupIdentifier gid, Member sender) 447 { 448 synchronized (groupMemberships) 449 { 450 Group localGroup = getGroup(gid); 451 if (localGroup == null) 452 { 453 if (logger.isDebugEnabled()) 454 logger.debug("Ignoring member " + failed + " failed in group " + gid); 455 } 456 else 457 { 458 if (logger.isInfoEnabled()) 459 logger.info("Member " + failed + " failed in group " + gid); 460 localGroup.removeMember(failed); 461 } 462 } 463 464 synchronized (listeners) 466 { 467 int size = listeners.size(); 468 for (int i = 0; i < size; i++) 469 ((GroupMembershipListener) listeners.get(i)).failedMember(failed, gid, 470 sender); 471 } 472 } 473 474 478 public void stop() 479 { 480 groupMemberships.clear(); 481 if (channelPool != null) 482 { 483 membershipThread.kill(); 484 channelPool.removeServerChannelFromPool(serverChannel); 485 try 486 { 487 membershipThread.join(1000); 488 } 489 catch (InterruptedException ignore) 490 { 491 } 492 } 493 } 494 495 } | Popular Tags |