KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > net > groups > TribesGroupManager


1 package com.tc.net.groups;
2
3 import org.apache.catalina.tribes.Channel;
4 import org.apache.catalina.tribes.ChannelException;
5 import org.apache.catalina.tribes.ChannelListener;
6 import org.apache.catalina.tribes.Member;
7 import org.apache.catalina.tribes.MembershipListener;
8 import org.apache.catalina.tribes.ChannelException.FaultyMember;
9 import org.apache.catalina.tribes.group.GroupChannel;
10
11 import com.tc.async.api.EventContext;
12 import com.tc.async.api.Sink;
13 import com.tc.logging.TCLogger;
14 import com.tc.logging.TCLogging;
15 import com.tc.util.Assert;
16
17 import java.io.Serializable JavaDoc;
18 import java.lang.reflect.Constructor JavaDoc;
19 import java.lang.reflect.Modifier JavaDoc;
20 import java.util.ArrayList JavaDoc;
21 import java.util.Arrays JavaDoc;
22 import java.util.Collections JavaDoc;
23 import java.util.HashMap JavaDoc;
24 import java.util.HashSet JavaDoc;
25 import java.util.Hashtable JavaDoc;
26 import java.util.Iterator JavaDoc;
27 import java.util.List JavaDoc;
28 import java.util.Map JavaDoc;
29 import java.util.concurrent.ConcurrentHashMap JavaDoc;
30 import java.util.concurrent.CopyOnWriteArrayList JavaDoc;
31
32 public class TribesGroupManager implements GroupManager, ChannelListener, MembershipListener {
33   private static final TCLogger logger = TCLogging
34                                                                                .getLogger(TribesGroupManager.class);
35
36   private final GroupChannel group;
37
38   private Member thisMember;
39   private NodeID thisNodeID;
40
41   private final CopyOnWriteArrayList JavaDoc<GroupEventsListener> groupListeners = new CopyOnWriteArrayList JavaDoc<GroupEventsListener>();
42   private final Map JavaDoc<NodeID, Member> nodes = Collections
43                                                                                .synchronizedMap(new HashMap JavaDoc<NodeID, Member>());
44   private final Map JavaDoc<String JavaDoc, GroupMessageListener> messageListeners = new ConcurrentHashMap JavaDoc<String JavaDoc, GroupMessageListener>();
45   private final Map JavaDoc<MessageID, GroupResponse> pendingRequests = new Hashtable JavaDoc<MessageID, GroupResponse>();
46
47   private boolean debug = false;
48
49   public TribesGroupManager() {
50     group = new GroupChannel();
51     group.addChannelListener(this);
52     group.addMembershipListener(this);
53   }
54
55   public NodeID join() throws GroupException {
56     try {
57       group.start(Channel.DEFAULT);
58       this.thisMember = group.getLocalMember(false);
59       this.thisNodeID = new NodeID(this.thisMember.getName(), this.thisMember.getUniqueId());
60       return this.thisNodeID;
61     } catch (ChannelException e) {
62       logger.error(e);
63       throw new GroupException(e);
64     }
65   }
66   
67   public NodeID getLocalNodeID() throws GroupException {
68     if(this.thisNodeID == null) {
69       throw new GroupException("Node hasnt joined the group yet !");
70     }
71     return this.thisNodeID;
72   }
73
74   private static void validateExternalizableClass(Class JavaDoc clazz) {
75     String JavaDoc name = clazz.getName();
76     try {
77       Constructor JavaDoc cons = clazz.getDeclaredConstructor(new Class JavaDoc[0]);
78       if ((cons.getModifiers() & Modifier.PUBLIC) == 0) {
79         //
80
throw new AssertionError JavaDoc(name + " : public no arg constructor not found");
81       }
82     } catch (NoSuchMethodException JavaDoc ex) {
83       throw new AssertionError JavaDoc(name + " : public no arg constructor not found");
84     }
85   }
86
87   private static void validateEventClass(Class JavaDoc clazz) {
88     if (!EventContext.class.isAssignableFrom(clazz)) { throw new AssertionError JavaDoc(clazz
89                                                                                 + " does not implement interface "
90                                                                                 + EventContext.class.getName()); }
91   }
92
93   public void registerForMessages(Class JavaDoc msgClass, GroupMessageListener listener) {
94     validateExternalizableClass(msgClass);
95     GroupMessageListener prev = messageListeners.put(msgClass.getName(), listener);
96     if (prev != null) {
97       logger.warn("Previous listener removed : " + prev);
98     }
99   }
100
101   public void routeMessages(Class JavaDoc msgClass, Sink sink) {
102     validateEventClass(msgClass);
103     registerForMessages(msgClass, new RouteGroupMessagesToSink(msgClass.getName(), sink));
104   }
105
106   public boolean accept(Serializable JavaDoc msg, Member sender) {
107     if (msg instanceof GroupMessage) { return true; }
108     logger.warn("Rejecting unknown message : " + msg + " from " + sender.getName());
109     return false;
110   }
111
112   public void messageReceived(Serializable JavaDoc msg, Member sender) {
113     GroupMessage gmsg = (GroupMessage) msg;
114     if (debug) {
115       logger.info(this.thisNodeID + " recd msg " + gmsg.getMessageID() + " From " + sender.getName());
116     }
117     MessageID requestID = gmsg.inResponseTo();
118     NodeID from = new NodeID(sender.getName(), sender.getUniqueId());
119     gmsg.setMessageOrginator(from);
120     if (requestID.isNull() || !notifyPendingRequests(requestID, gmsg, sender)) {
121       fireMessageReceivedEvent(from, gmsg);
122     }
123   }
124
125   private boolean notifyPendingRequests(MessageID requestID, GroupMessage gmsg, Member sender) {
126     GroupResponseImpl response = (GroupResponseImpl) pendingRequests.get(requestID);
127     if (response != null) {
128       response.addResponseFrom(sender, gmsg);
129       return true;
130     }
131     return false;
132   }
133
134   private void fireMessageReceivedEvent(NodeID from, GroupMessage msg) {
135     GroupMessageListener listener = messageListeners.get(msg.getClass().getName());
136     if (listener != null) {
137       listener.messageReceived(from, msg);
138     } else {
139       String JavaDoc errorMsg = "No Route for " + msg + " from " + from;
140       logger.error(errorMsg);
141       throw new AssertionError JavaDoc(errorMsg);
142     }
143
144   }
145
146   public void registerForGroupEvents(GroupEventsListener listener) {
147     groupListeners.add(listener);
148   }
149
150   public void memberAdded(Member member) {
151     NodeID newNode = new NodeID(member.getName(), member.getUniqueId());
152     Member old;
153     if ((old = nodes.put(newNode, member)) == null) {
154       fireNodeEvent(newNode, true);
155     } else {
156       logger.warn("Member Added Event called for : " + newNode + " while it is still present in the list of nodes : "
157                   + old + " : " + nodes);
158     }
159   }
160
161   private void fireNodeEvent(NodeID newNode, boolean joined) {
162     Iterator JavaDoc i = groupListeners.iterator();
163     while (i.hasNext()) {
164       GroupEventsListener listener = (GroupEventsListener) i.next();
165       if (joined) {
166         listener.nodeJoined(newNode);
167       } else {
168         listener.nodeLeft(newNode);
169       }
170     }
171   }
172
173   public void memberDisappeared(Member member) {
174     NodeID node = new NodeID(member.getName(), member.getUniqueId());
175     if ((nodes.remove(node)) != null) {
176       fireNodeEvent(node, false);
177     } else {
178       logger.warn("Member Disappered Event called for : " + node + " while it is not present in the list of nodes : "
179                   + nodes);
180     }
181     notifyAnyPendingRequests(member);
182   }
183
184   private void notifyAnyPendingRequests(Member member) {
185     synchronized (pendingRequests) {
186       for (Iterator JavaDoc i = pendingRequests.values().iterator(); i.hasNext();) {
187         GroupResponseImpl response = (GroupResponseImpl) i.next();
188         response.notifyMemberDead(member);
189       }
190     }
191   }
192
193   public void sendAll(GroupMessage msg) throws GroupException {
194     if (debug) {
195       logger.info(this.thisNodeID + " : Sending to ALL : " + msg.getMessageID());
196     }
197     // TODO :: Validate the options flag
198
try {
199       Member m[] = group.getMembers();
200       if (m.length > 0) {
201         group.send(m, msg, Channel.SEND_OPTIONS_DEFAULT);
202       }
203     } catch (ChannelException e) {
204       throw new GroupException(e);
205     }
206   }
207
208   public GroupResponse sendAllAndWaitForResponse(GroupMessage msg) throws GroupException {
209     if (debug) {
210       logger.info(this.thisNodeID + " : Sending to ALL and Waiting for Response : " + msg.getMessageID());
211     }
212     GroupResponseImpl groupResponse = new GroupResponseImpl();
213     MessageID msgID = msg.getMessageID();
214     GroupResponse old = pendingRequests.put(msgID, groupResponse);
215     Assert.assertNull(old);
216     groupResponse.sendAll(group, msg);
217     groupResponse.waitForAllResponses();
218     pendingRequests.remove(msgID);
219     return groupResponse;
220   }
221
222   public void sendTo(NodeID node, GroupMessage msg) throws GroupException {
223     if (debug) {
224       logger.info(this.thisNodeID + " : Sending to : " + node + " msg " + msg.getMessageID());
225     }
226     try {
227       Member to[] = new Member[1];
228       to[0] = nodes.get(node);
229       if (to[0] != null) {
230         // TODO :: Validate the options flag
231
group.send(to, msg, Channel.SEND_OPTIONS_DEFAULT);
232       } else {
233         // TODO:: These could be exceptions
234
logger.warn("Node " + node + " not present in the group. Ignoring Message : " + msg);
235       }
236     } catch (ChannelException e) {
237       throw new GroupException(e);
238     }
239   }
240
241   public GroupMessage sendToAndWaitForResponse(NodeID nodeID, GroupMessage msg) throws GroupException {
242     if (debug) {
243       logger.info(this.thisNodeID + " : Sending to " + nodeID + " and Waiting for Response : " + msg.getMessageID());
244     }
245     GroupResponseImpl groupResponse = new GroupResponseImpl();
246     MessageID msgID = msg.getMessageID();
247     Member to[] = new Member[1];
248     to[0] = nodes.get(nodeID);
249     if (to[0] != null) {
250       GroupResponse old = pendingRequests.put(msgID, groupResponse);
251       Assert.assertNull(old);
252       groupResponse.sendTo(group, msg, to);
253       groupResponse.waitForAllResponses();
254       pendingRequests.remove(msgID);
255     } else {
256       String JavaDoc errorMsg = "Node " + nodeID + " not present in the group. Ignoring Message : " + msg;
257       logger.error(errorMsg);
258       throw new GroupException(errorMsg);
259     }
260     return groupResponse.getResponse(nodeID);
261   }
262
263   public void zapNode(NodeID nodeID) {
264     logger.warn("TODO::Zapping node : " + nodeID);
265     // TODO:: Implement this
266
}
267
268   private static class GroupResponseImpl implements GroupResponse {
269
270     HashSet JavaDoc<Member> waitFor = new HashSet JavaDoc<Member>();
271     List JavaDoc<GroupMessage> responses = new ArrayList JavaDoc<GroupMessage>();
272
273     public synchronized List JavaDoc getResponses() {
274       Assert.assertTrue(waitFor.isEmpty());
275       return responses;
276     }
277
278     public GroupMessage getResponse(NodeID nodeID) {
279       Assert.assertTrue(waitFor.isEmpty());
280       for (Iterator JavaDoc<GroupMessage> i = responses.iterator(); i.hasNext();) {
281         GroupMessage msg = i.next();
282         if(nodeID.equals(msg.messageFrom())) return msg;
283       }
284       return null;
285     }
286
287     public synchronized void sendTo(GroupChannel group, GroupMessage msg, Member[] m) {
288       waitFor.addAll(Arrays.asList(m));
289       try {
290         if (m.length > 0) {
291           group.send(m, msg, Channel.SEND_OPTIONS_DEFAULT);
292         }
293       } catch (ChannelException e) {
294         logger.error("Error sending msg : " + msg, e);
295         reconsileWaitFor(e);
296       }
297     }
298
299     public synchronized void addResponseFrom(Member sender, GroupMessage gmsg) {
300       Assert.assertNotNull(waitFor.remove(sender));
301       responses.add(gmsg);
302       notifyAll();
303     }
304
305     public synchronized void notifyMemberDead(Member member) {
306       waitFor.remove(member);
307       notifyAll();
308     }
309
310     public synchronized void waitForAllResponses() throws GroupException {
311       while (!waitFor.isEmpty()) {
312         try {
313           this.wait();
314         } catch (InterruptedException JavaDoc e) {
315           throw new GroupException(e);
316         }
317       }
318     }
319
320     public synchronized void sendAll(GroupChannel group, GroupMessage msg) {
321       Member m[] = group.getMembers();
322       sendTo(group, msg, m);
323     }
324
325     private void reconsileWaitFor(ChannelException e) {
326       FaultyMember fm[] = e.getFaultyMembers();
327       for (int i = 0; i < fm.length; i++) {
328         waitFor.remove(fm[i].getMember());
329       }
330     }
331   }
332
333 }
334
Popular Tags