KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > SEQUENCER


1
2 package org.jgroups.protocols;
3
4 import org.jgroups.*;
5 import org.jgroups.stack.Protocol;
6 import org.jgroups.util.Streamable;
7 import org.jgroups.util.Util;
8
9 import java.io.*;
10 import java.util.*;
11 import java.util.concurrent.ConcurrentHashMap JavaDoc;
12 import java.util.concurrent.ConcurrentMap JavaDoc;
13
14
15 /**
16  * Implementation of total order protocol using a sequencer. Consult doc/design/SEQUENCER.txt for details
17  * @author Bela Ban
18  * @version $Id: SEQUENCER.java,v 1.19 2007/05/01 10:55:10 belaban Exp $
19  */

20 public class SEQUENCER extends Protocol {
21     private Address local_addr=null, coord=null;
22     static final String JavaDoc name="SEQUENCER";
23     private boolean is_coord=false;
24     private long seqno=0;
25
26     /** Map<seqno, Message>: maintains messages forwarded to the coord which which no ack has been received yet */
27     private final Map JavaDoc<Long JavaDoc,Message> forward_table=new TreeMap<Long JavaDoc,Message>();
28
29     /** Map<Address, seqno>: maintains the highest seqnos seen for a given member */
30     private final ConcurrentMap JavaDoc<Address,Long JavaDoc> received_table=new ConcurrentHashMap JavaDoc<Address,Long JavaDoc>();
31
32     private long forwarded_msgs=0;
33     private long bcast_msgs=0;
34     private long received_forwards=0;
35     private long received_bcasts=0;
36
37     public boolean isCoordinator() {return is_coord;}
38     public Address getCoordinator() {return coord;}
39     public Address getLocalAddress() {return local_addr;}
40     public String JavaDoc getName() {return name;}
41     public long getForwarded() {return forwarded_msgs;}
42     public long getBroadcast() {return bcast_msgs;}
43     public long getReceivedForwards() {return received_forwards;}
44     public long getReceivedBroadcasts() {return received_bcasts;}
45
46     public void resetStats() {
47         forwarded_msgs=bcast_msgs=received_forwards=received_bcasts=0L;
48     }
49
50     public Map JavaDoc<String JavaDoc,Object JavaDoc> dumpStats() {
51         Map JavaDoc<String JavaDoc,Object JavaDoc> m=super.dumpStats();
52         if(m == null)
53             m=new HashMap JavaDoc<String JavaDoc,Object JavaDoc>();
54         m.put("forwarded", new Long JavaDoc(forwarded_msgs));
55         m.put("broadcast", new Long JavaDoc(bcast_msgs));
56         m.put("received_forwards", new Long JavaDoc(received_forwards));
57         m.put("received_bcasts", new Long JavaDoc(received_bcasts));
58         return m;
59     }
60
61     public String JavaDoc printStats() {
62         return dumpStats().toString();
63     }
64
65
66     public boolean setProperties(Properties props) {
67         super.setProperties(props);
68
69         if(!props.isEmpty()) {
70             log.error("the following properties are not recognized: " + props);
71             return false;
72         }
73         return true;
74     }
75
76     private final long nextSeqno() {
77         synchronized(this) {
78             return seqno++;
79         }
80     }
81
82     
83     public Object JavaDoc down(Event evt) {
84         switch(evt.getType()) {
85             case Event.MSG:
86                 Message msg=(Message)evt.getArg();
87                 Address dest=msg.getDest();
88                 if(dest == null || dest.isMulticastAddress()) { // only handle multicasts
89
long next_seqno=nextSeqno();
90                     SequencerHeader hdr=new SequencerHeader(SequencerHeader.FORWARD, local_addr, next_seqno);
91                     msg.putHeader(name, hdr);
92                     if(!is_coord) {
93                         forwardToCoord(msg, next_seqno);
94                     }
95                     else {
96                         broadcast(msg);
97                     }
98                     return null; // don't pass down
99
}
100                 break;
101
102             case Event.VIEW_CHANGE:
103                 handleViewChange((View)evt.getArg());
104                 break;
105         }
106         return down_prot.down(evt);
107     }
108
109
110
111
112     public Object JavaDoc up(Event evt) {
113         Message msg;
114         SequencerHeader hdr;
115
116         switch(evt.getType()) {
117
118             case Event.SET_LOCAL_ADDRESS:
119                 local_addr=(Address)evt.getArg();
120                 break;
121
122             case Event.MSG:
123                 msg=(Message)evt.getArg();
124                 hdr=(SequencerHeader)msg.getHeader(name);
125                 if(hdr == null)
126                     break; // pass up
127

128                 switch(hdr.type) {
129                     case SequencerHeader.FORWARD:
130                         if(!is_coord) {
131                             if(log.isErrorEnabled())
132                                 log.warn("I (" + local_addr + ") am not the coord and don't handle " +
133                                         "FORWARD requests, ignoring request");
134                             return null;
135                         }
136                         broadcast(msg);
137                         received_forwards++;
138                         return null;
139                     case SequencerHeader.BCAST:
140                         deliver(msg, hdr); // deliver a copy and return (discard the original msg)
141
received_bcasts++;
142                         return null;
143                 }
144                 break;
145
146             case Event.VIEW_CHANGE:
147                 handleViewChange((View)evt.getArg());
148                 break;
149         }
150
151         return up_prot.up(evt);
152     }
153
154
155
156     /* --------------------------------- Private Methods ----------------------------------- */
157
158     private void handleViewChange(View v) {
159         Vector members=v.getMembers();
160         if(members.isEmpty()) return;
161
162         Address prev_coord=coord;
163         coord=(Address)members.firstElement();
164         is_coord=local_addr != null && local_addr.equals(coord);
165
166         boolean coord_changed=prev_coord != null && !prev_coord.equals(coord);
167         if(coord_changed) {
168             resendMessagesInForwardTable(); // maybe optimize in the future: broadcast directly if coord
169
}
170         // remove left members from received_table
171
int size=received_table.size();
172         Set<Address> keys=received_table.keySet();
173         keys.retainAll(members);
174         if(keys.size() != size) {
175             if(log.isTraceEnabled())
176                 log.trace("adjusted received_table, keys are " + keys);
177         }
178     }
179
180     /**
181      * Sends all messages currently in forward_table to the new coordinator (changing the dest field).
182      * This needs to be done, so the underlying reliable unicast protocol (e.g. UNICAST) adds these messages
183      * to its retransmission mechanism<br/>
184      * Note that we need to resend the messages in order of their seqnos ! We also need to prevent other message
185      * from being inserted until we're done, that's why there's synchronization.
186      */

187     private void resendMessagesInForwardTable() {
188         Map JavaDoc<Long JavaDoc,Message> copy;
189         synchronized(forward_table) {
190             copy=new TreeMap<Long JavaDoc,Message>(forward_table);
191         }
192         for(Message msg: copy.values()) {
193             msg.setDest(coord);
194             down_prot.down(new Event(Event.MSG, msg));
195         }
196     }
197
198
199     private void forwardToCoord(Message msg, long seqno) {
200         msg.setDest(coord); // we change the message dest from multicast to unicast (to coord)
201
synchronized(forward_table) {
202             forward_table.put(new Long JavaDoc(seqno), msg);
203         }
204         down_prot.down(new Event(Event.MSG, msg));
205         forwarded_msgs++;
206     }
207
208     private void broadcast(Message msg) {
209         SequencerHeader hdr=(SequencerHeader)msg.getHeader(name);
210         hdr.type=SequencerHeader.BCAST; // we change the type of header, but leave the tag intact
211
msg.setDest(null); // mcast
212
msg.setSrc(local_addr); // the coord is sending it - this will be replaced with sender in deliver()
213
down_prot.down(new Event(Event.MSG, msg));
214         bcast_msgs++;
215     }
216
217     /**
218      * We copy the message in order to change the sender's address. If we did this on the original message,
219      * retransmission would likely run into problems, and possibly also stability (STABLE) of messages
220      * @param msg
221      * @param hdr
222      */

223     private void deliver(Message msg, SequencerHeader hdr) {
224         Address original_sender=hdr.getOriginalSender();
225         if(original_sender == null) {
226             if(log.isErrorEnabled())
227                 log.error("original sender is null, cannot swap sender address back to original sender");
228             return;
229         }
230         long msg_seqno=hdr.getSeqno();
231
232         // this is the ack for the message sent by myself
233
if(original_sender.equals(local_addr)) {
234             synchronized(forward_table) {
235                 forward_table.remove(new Long JavaDoc(msg_seqno));
236             }
237         }
238
239         // if msg was already delivered, discard it
240
Long JavaDoc highest_seqno_seen=received_table.get(original_sender);
241         if(highest_seqno_seen != null) {
242             if(highest_seqno_seen.longValue() >= msg_seqno) {
243                 if(log.isWarnEnabled())
244                 log.warn("message seqno (" + original_sender + "::" + msg_seqno + " has already " +
245                         "been received (highest received=" + highest_seqno_seen + "); discarding duplicate message");
246                 return;
247             }
248         }
249         // update the table with the new seqno
250
received_table.put(original_sender, new Long JavaDoc(msg_seqno));
251
252         // pass a copy of the message up the stack
253
Message tmp=msg.copy(true);
254         tmp.setSrc(original_sender);
255         up_prot.up(new Event(Event.MSG, tmp));
256     }
257
258     /* ----------------------------- End of Private Methods -------------------------------- */
259
260
261
262
263
264     public static class SequencerHeader extends Header implements Streamable {
265         static final byte FORWARD = 1;
266         static final byte BCAST = 2;
267
268         byte type=-1;
269         /** the original sender's address and a seqno */
270         ViewId tag=null;
271
272
273         public SequencerHeader() {
274         }
275
276         public SequencerHeader(byte type, Address original_sender, long seqno) {
277             this.type=type;
278             this.tag=new ViewId(original_sender, seqno);
279         }
280
281         public Address getOriginalSender() {
282             return tag != null? tag.getCoordAddress() : null;
283         }
284
285         public long getSeqno() {
286             return tag != null? tag.getId() : -1;
287         }
288
289         public String JavaDoc toString() {
290             StringBuilder JavaDoc sb=new StringBuilder JavaDoc(64);
291             sb.append(printType());
292             if(tag != null)
293                 sb.append(" (tag=").append(tag).append(")");
294             return sb.toString();
295         }
296
297         private final String JavaDoc printType() {
298             switch(type) {
299                 case FORWARD: return "FORWARD";
300                 case BCAST: return "BCAST";
301                 default: return "n/a";
302             }
303         }
304
305         public void writeExternal(ObjectOutput out) throws IOException {
306             out.writeByte(type);
307             out.writeObject(tag);
308         }
309
310         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
311             type=in.readByte();
312             tag=(ViewId)in.readObject();
313         }
314
315         public void writeTo(DataOutputStream out) throws IOException {
316             out.writeByte(type);
317             Util.writeStreamable(tag, out);
318         }
319
320         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
321             type=in.readByte();
322             tag=(ViewId)Util.readStreamable(ViewId.class, in);
323         }
324
325         public int size() {
326             int size=Global.BYTE_SIZE *2; // type + presence byte
327
if(tag != null)
328                 size+=tag.serializedSize();
329             return size;
330         }
331
332     }
333
334
335 }
Popular Tags