1 24 25 package org.objectweb.tribe.gms; 26 27 import java.net.URL ; 28 import java.util.Vector ; 29 30 import org.jgroups.ChannelClosedException; 31 import org.jgroups.JChannel; 32 import org.jgroups.MembershipListener; 33 import org.jgroups.View; 34 import org.objectweb.tribe.channel.ReliableGroupChannelWithGms; 35 import org.objectweb.tribe.common.Group; 36 import org.objectweb.tribe.common.GroupIdentifier; 37 import org.objectweb.tribe.common.IpAddress; 38 import org.objectweb.tribe.common.Member; 39 import org.objectweb.tribe.exceptions.AlreadyMemberException; 40 import org.objectweb.tribe.exceptions.ChannelException; 41 import org.objectweb.tribe.exceptions.NotConnectedException; 42 43 49 public class JGroupsMembershipService extends GroupMembershipService 50 implements 51 MembershipListener 52 { 53 private JChannel jgroupsChannel; 55 private GroupIdentifier currentGroup; 56 57 62 public JGroupsMembershipService(URL jgroupsConfigFile) 63 throws ChannelException 64 { 65 super(null, null, null); 66 try 67 { 68 jgroupsChannel = new JChannel(jgroupsConfigFile); 69 } 70 catch (org.jgroups.ChannelException e) 71 { 72 throw new ChannelException(e); 73 } 74 } 75 76 81 public JChannel getJGroupsChannel() 82 { 83 return jgroupsChannel; 84 } 85 86 90 public Member join(ReliableGroupChannelWithGms channel, GroupIdentifier gid) 91 throws AlreadyMemberException, NotConnectedException, ChannelException 92 { 93 Member me; 94 Group g; 95 synchronized (groupMemberships) 96 { 97 try 98 { 99 jgroupsChannel.connect(gid.getGroupName()); 100 currentGroup = gid; 101 } 102 catch (ChannelClosedException e) 103 { 104 throw new NotConnectedException(e); 105 } 106 catch (org.jgroups.ChannelException e) 107 { 108 throw new ChannelException(e); 109 } 110 111 g = getGroup(currentGroup); 112 me = memberFromJGroupsAddress((org.jgroups.stack.IpAddress) jgroupsChannel 113 .getLocalAddress()); 114 if (g == null) 115 { g = new Group(currentGroup); 117 groupMemberships.put(gid, g); 118 } 119 else 120 throw new AlreadyMemberException(); 121 122 g.addMember(me); 124 } 125 126 synchronized (listeners) 128 { 129 int size = listeners.size(); 130 for (int i = 0; i < size; i++) 131 ((GroupMembershipListener) listeners.get(i)).joinMember(me, gid); 132 } 133 134 return me; 135 } 136 137 141 public void quit(ReliableGroupChannelWithGms channel, GroupIdentifier gid) 142 throws ChannelException, NotConnectedException 143 { 144 super.quit(channel, gid); 145 146 jgroupsChannel.disconnect(); 147 } 148 149 153 160 public static Member memberFromJGroupsAddress( 161 org.jgroups.stack.IpAddress jgroupsAddress) 162 { 163 return new Member(new IpAddress(jgroupsAddress.getIpAddress(), 164 jgroupsAddress.getPort()), jgroupsAddress.getIpAddress().toString() 165 + ":" + jgroupsAddress.getPort()); 166 } 167 168 171 public void viewAccepted(View newView) 172 { 173 String jGroupsBugWorkAroundDoNotRemoveThisStringRepresentationOfTheView = newView 182 .toString(); 183 if (logger.isDebugEnabled()) 184 logger.debug("JGroups reported new view: " 185 + jGroupsBugWorkAroundDoNotRemoveThisStringRepresentationOfTheView); 186 187 Vector members = newView.getMembers(); 188 int size = members.size(); 189 synchronized (groupMemberships) 190 { 191 Group g = getGroup(currentGroup); 192 for (int i = 0; i < size; i++) 193 { 194 Member m = memberFromJGroupsAddress((org.jgroups.stack.IpAddress) members 195 .get(i)); 196 if (!g.hasMember(m)) joinMember(m, currentGroup); 198 } 199 } 200 } 201 202 205 public void suspect(org.jgroups.Address suspectedMbr) 206 { 207 if (logger.isDebugEnabled()) 208 logger.debug("JGroups reports suspected member:" + suspectedMbr); 209 org.jgroups.stack.IpAddress jgroupsAddress = (org.jgroups.stack.IpAddress) suspectedMbr; 210 quitMember(memberFromJGroupsAddress(jgroupsAddress), currentGroup); 211 } 212 213 216 public void block() 217 { 218 if (logger.isDebugEnabled()) 219 logger.debug("JGroups reported block()"); 220 } 221 222 } | Popular Tags |