1 package com.tc.net.groups; 2 3 import org.apache.catalina.tribes.Channel; 4 import org.apache.catalina.tribes.ChannelException; 5 import org.apache.catalina.tribes.ChannelListener; 6 import org.apache.catalina.tribes.Member; 7 import org.apache.catalina.tribes.MembershipListener; 8 import org.apache.catalina.tribes.ChannelException.FaultyMember; 9 import org.apache.catalina.tribes.group.GroupChannel; 10 11 import com.tc.async.api.EventContext; 12 import com.tc.async.api.Sink; 13 import com.tc.logging.TCLogger; 14 import com.tc.logging.TCLogging; 15 import com.tc.util.Assert; 16 17 import java.io.Serializable ; 18 import java.lang.reflect.Constructor ; 19 import java.lang.reflect.Modifier ; 20 import java.util.ArrayList ; 21 import java.util.Arrays ; 22 import java.util.Collections ; 23 import java.util.HashMap ; 24 import java.util.HashSet ; 25 import java.util.Hashtable ; 26 import java.util.Iterator ; 27 import java.util.List ; 28 import java.util.Map ; 29 import java.util.concurrent.ConcurrentHashMap ; 30 import java.util.concurrent.CopyOnWriteArrayList ; 31 32 public class TribesGroupManager implements GroupManager, ChannelListener, MembershipListener { 33 private static final TCLogger logger = TCLogging 34 .getLogger(TribesGroupManager.class); 35 36 private final GroupChannel group; 37 38 private Member thisMember; 39 private NodeID thisNodeID; 40 41 private final CopyOnWriteArrayList <GroupEventsListener> groupListeners = new CopyOnWriteArrayList <GroupEventsListener>(); 42 private final Map <NodeID, Member> nodes = Collections 43 .synchronizedMap(new HashMap <NodeID, Member>()); 44 private final Map <String , GroupMessageListener> messageListeners = new ConcurrentHashMap <String , GroupMessageListener>(); 45 private final Map <MessageID, GroupResponse> pendingRequests = new Hashtable <MessageID, GroupResponse>(); 46 47 private boolean debug = false; 48 49 public TribesGroupManager() { 50 group = new GroupChannel(); 51 group.addChannelListener(this); 52 group.addMembershipListener(this); 53 } 54 55 public NodeID join() throws GroupException { 56 try { 57 group.start(Channel.DEFAULT); 58 this.thisMember = group.getLocalMember(false); 59 this.thisNodeID = new NodeID(this.thisMember.getName(), this.thisMember.getUniqueId()); 60 return this.thisNodeID; 61 } catch (ChannelException e) { 62 logger.error(e); 63 throw new GroupException(e); 64 } 65 } 66 67 public NodeID getLocalNodeID() throws GroupException { 68 if(this.thisNodeID == null) { 69 throw new GroupException("Node hasnt joined the group yet !"); 70 } 71 return this.thisNodeID; 72 } 73 74 private static void validateExternalizableClass(Class clazz) { 75 String name = clazz.getName(); 76 try { 77 Constructor cons = clazz.getDeclaredConstructor(new Class [0]); 78 if ((cons.getModifiers() & Modifier.PUBLIC) == 0) { 79 throw new AssertionError (name + " : public no arg constructor not found"); 81 } 82 } catch (NoSuchMethodException ex) { 83 throw new AssertionError (name + " : public no arg constructor not found"); 84 } 85 } 86 87 private static void validateEventClass(Class clazz) { 88 if (!EventContext.class.isAssignableFrom(clazz)) { throw new AssertionError (clazz 89 + " does not implement interface " 90 + EventContext.class.getName()); } 91 } 92 93 public void registerForMessages(Class msgClass, GroupMessageListener listener) { 94 validateExternalizableClass(msgClass); 95 GroupMessageListener prev = messageListeners.put(msgClass.getName(), listener); 96 if (prev != null) { 97 logger.warn("Previous listener removed : " + prev); 98 } 99 } 100 101 public void routeMessages(Class msgClass, Sink sink) { 102 validateEventClass(msgClass); 103 registerForMessages(msgClass, new RouteGroupMessagesToSink(msgClass.getName(), sink)); 104 } 105 106 public boolean accept(Serializable msg, Member sender) { 107 if (msg instanceof GroupMessage) { return true; } 108 logger.warn("Rejecting unknown message : " + msg + " from " + sender.getName()); 109 return false; 110 } 111 112 public void messageReceived(Serializable msg, Member sender) { 113 GroupMessage gmsg = (GroupMessage) msg; 114 if (debug) { 115 logger.info(this.thisNodeID + " recd msg " + gmsg.getMessageID() + " From " + sender.getName()); 116 } 117 MessageID requestID = gmsg.inResponseTo(); 118 NodeID from = new NodeID(sender.getName(), sender.getUniqueId()); 119 gmsg.setMessageOrginator(from); 120 if (requestID.isNull() || !notifyPendingRequests(requestID, gmsg, sender)) { 121 fireMessageReceivedEvent(from, gmsg); 122 } 123 } 124 125 private boolean notifyPendingRequests(MessageID requestID, GroupMessage gmsg, Member sender) { 126 GroupResponseImpl response = (GroupResponseImpl) pendingRequests.get(requestID); 127 if (response != null) { 128 response.addResponseFrom(sender, gmsg); 129 return true; 130 } 131 return false; 132 } 133 134 private void fireMessageReceivedEvent(NodeID from, GroupMessage msg) { 135 GroupMessageListener listener = messageListeners.get(msg.getClass().getName()); 136 if (listener != null) { 137 listener.messageReceived(from, msg); 138 } else { 139 String errorMsg = "No Route for " + msg + " from " + from; 140 logger.error(errorMsg); 141 throw new AssertionError (errorMsg); 142 } 143 144 } 145 146 public void registerForGroupEvents(GroupEventsListener listener) { 147 groupListeners.add(listener); 148 } 149 150 public void memberAdded(Member member) { 151 NodeID newNode = new NodeID(member.getName(), member.getUniqueId()); 152 Member old; 153 if ((old = nodes.put(newNode, member)) == null) { 154 fireNodeEvent(newNode, true); 155 } else { 156 logger.warn("Member Added Event called for : " + newNode + " while it is still present in the list of nodes : " 157 + old + " : " + nodes); 158 } 159 } 160 161 private void fireNodeEvent(NodeID newNode, boolean joined) { 162 Iterator i = groupListeners.iterator(); 163 while (i.hasNext()) { 164 GroupEventsListener listener = (GroupEventsListener) i.next(); 165 if (joined) { 166 listener.nodeJoined(newNode); 167 } else { 168 listener.nodeLeft(newNode); 169 } 170 } 171 } 172 173 public void memberDisappeared(Member member) { 174 NodeID node = new NodeID(member.getName(), member.getUniqueId()); 175 if ((nodes.remove(node)) != null) { 176 fireNodeEvent(node, false); 177 } else { 178 logger.warn("Member Disappered Event called for : " + node + " while it is not present in the list of nodes : " 179 + nodes); 180 } 181 notifyAnyPendingRequests(member); 182 } 183 184 private void notifyAnyPendingRequests(Member member) { 185 synchronized (pendingRequests) { 186 for (Iterator i = pendingRequests.values().iterator(); i.hasNext();) { 187 GroupResponseImpl response = (GroupResponseImpl) i.next(); 188 response.notifyMemberDead(member); 189 } 190 } 191 } 192 193 public void sendAll(GroupMessage msg) throws GroupException { 194 if (debug) { 195 logger.info(this.thisNodeID + " : Sending to ALL : " + msg.getMessageID()); 196 } 197 try { 199 Member m[] = group.getMembers(); 200 if (m.length > 0) { 201 group.send(m, msg, Channel.SEND_OPTIONS_DEFAULT); 202 } 203 } catch (ChannelException e) { 204 throw new GroupException(e); 205 } 206 } 207 208 public GroupResponse sendAllAndWaitForResponse(GroupMessage msg) throws GroupException { 209 if (debug) { 210 logger.info(this.thisNodeID + " : Sending to ALL and Waiting for Response : " + msg.getMessageID()); 211 } 212 GroupResponseImpl groupResponse = new GroupResponseImpl(); 213 MessageID msgID = msg.getMessageID(); 214 GroupResponse old = pendingRequests.put(msgID, groupResponse); 215 Assert.assertNull(old); 216 groupResponse.sendAll(group, msg); 217 groupResponse.waitForAllResponses(); 218 pendingRequests.remove(msgID); 219 return groupResponse; 220 } 221 222 public void sendTo(NodeID node, GroupMessage msg) throws GroupException { 223 if (debug) { 224 logger.info(this.thisNodeID + " : Sending to : " + node + " msg " + msg.getMessageID()); 225 } 226 try { 227 Member to[] = new Member[1]; 228 to[0] = nodes.get(node); 229 if (to[0] != null) { 230 group.send(to, msg, Channel.SEND_OPTIONS_DEFAULT); 232 } else { 233 logger.warn("Node " + node + " not present in the group. Ignoring Message : " + msg); 235 } 236 } catch (ChannelException e) { 237 throw new GroupException(e); 238 } 239 } 240 241 public GroupMessage sendToAndWaitForResponse(NodeID nodeID, GroupMessage msg) throws GroupException { 242 if (debug) { 243 logger.info(this.thisNodeID + " : Sending to " + nodeID + " and Waiting for Response : " + msg.getMessageID()); 244 } 245 GroupResponseImpl groupResponse = new GroupResponseImpl(); 246 MessageID msgID = msg.getMessageID(); 247 Member to[] = new Member[1]; 248 to[0] = nodes.get(nodeID); 249 if (to[0] != null) { 250 GroupResponse old = pendingRequests.put(msgID, groupResponse); 251 Assert.assertNull(old); 252 groupResponse.sendTo(group, msg, to); 253 groupResponse.waitForAllResponses(); 254 pendingRequests.remove(msgID); 255 } else { 256 String errorMsg = "Node " + nodeID + " not present in the group. Ignoring Message : " + msg; 257 logger.error(errorMsg); 258 throw new GroupException(errorMsg); 259 } 260 return groupResponse.getResponse(nodeID); 261 } 262 263 public void zapNode(NodeID nodeID) { 264 logger.warn("TODO::Zapping node : " + nodeID); 265 } 267 268 private static class GroupResponseImpl implements GroupResponse { 269 270 HashSet <Member> waitFor = new HashSet <Member>(); 271 List <GroupMessage> responses = new ArrayList <GroupMessage>(); 272 273 public synchronized List getResponses() { 274 Assert.assertTrue(waitFor.isEmpty()); 275 return responses; 276 } 277 278 public GroupMessage getResponse(NodeID nodeID) { 279 Assert.assertTrue(waitFor.isEmpty()); 280 for (Iterator <GroupMessage> i = responses.iterator(); i.hasNext();) { 281 GroupMessage msg = i.next(); 282 if(nodeID.equals(msg.messageFrom())) return msg; 283 } 284 return null; 285 } 286 287 public synchronized void sendTo(GroupChannel group, GroupMessage msg, Member[] m) { 288 waitFor.addAll(Arrays.asList(m)); 289 try { 290 if (m.length > 0) { 291 group.send(m, msg, Channel.SEND_OPTIONS_DEFAULT); 292 } 293 } catch (ChannelException e) { 294 logger.error("Error sending msg : " + msg, e); 295 reconsileWaitFor(e); 296 } 297 } 298 299 public synchronized void addResponseFrom(Member sender, GroupMessage gmsg) { 300 Assert.assertNotNull(waitFor.remove(sender)); 301 responses.add(gmsg); 302 notifyAll(); 303 } 304 305 public synchronized void notifyMemberDead(Member member) { 306 waitFor.remove(member); 307 notifyAll(); 308 } 309 310 public synchronized void waitForAllResponses() throws GroupException { 311 while (!waitFor.isEmpty()) { 312 try { 313 this.wait(); 314 } catch (InterruptedException e) { 315 throw new GroupException(e); 316 } 317 } 318 } 319 320 public synchronized void sendAll(GroupChannel group, GroupMessage msg) { 321 Member m[] = group.getMembers(); 322 sendTo(group, msg, m); 323 } 324 325 private void reconsileWaitFor(ChannelException e) { 326 FaultyMember fm[] = e.getFaultyMembers(); 327 for (int i = 0; i < fm.length; i++) { 328 waitFor.remove(fm[i].getMember()); 329 } 330 } 331 } 332 333 } 334 | Popular Tags |