KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > VIEW_SYNC


1 package org.jgroups.protocols;
2
3
4 import org.jgroups.*;
5 import org.jgroups.annotations.GuardedBy;
6 import org.jgroups.stack.Protocol;
7 import org.jgroups.util.Streamable;
8 import org.jgroups.util.TimeScheduler;
9 import org.jgroups.util.Util;
10
11 import java.io.*;
12 import java.util.Properties JavaDoc;
13 import java.util.Vector JavaDoc;
14 import java.util.concurrent.Future JavaDoc;
15 import java.util.concurrent.locks.Lock JavaDoc;
16 import java.util.concurrent.locks.ReentrantLock JavaDoc;
17
18
19 /**
20  * Periodically sends the view to the group. When a view is received which is greater than the current view, we
21  * install it. Otherwise we simply discard it. This is used to solve the problem for unreliable view
22  * dissemination outlined in JGroups/doc/ReliableViewInstallation.txt. This protocol is supposed to be just below GMS.
23  * @author Bela Ban
24  * @version $Id: VIEW_SYNC.java,v 1.21 2007/05/01 10:55:10 belaban Exp $
25  */

26 public class VIEW_SYNC extends Protocol {
27     Address local_addr=null;
28     final Vector JavaDoc mbrs=new Vector JavaDoc();
29     View my_view=null;
30     ViewId my_vid=null;
31
32     /** Sends a VIEW_SYNC message to the group every 20 seconds on average. 0 disables sending of VIEW_SYNC messages */
33     long avg_send_interval=60000;
34
35     private int num_views_sent=0;
36     private int num_views_adjusted=0;
37
38     @GuardedBy("view_task_lock")
39     private Future JavaDoc view_send_task_future=null; // bcasts periodic view sync message (added to timer below)
40

41     private final Lock JavaDoc view_task_lock=new ReentrantLock JavaDoc();
42
43     TimeScheduler timer=null;
44     static final String JavaDoc name="VIEW_SYNC";
45
46
47
48     public String JavaDoc getName() {
49         return name;
50     }
51
52     public long getAverageSendInterval() {
53         return avg_send_interval;
54     }
55
56     public void setAverageSendInterval(long gossip_interval) {
57         avg_send_interval=gossip_interval;
58     }
59
60     public int getNumViewsSent() {
61         return num_views_sent;
62     }
63
64     public int getNumViewsAdjusted() {
65         return num_views_adjusted;
66     }
67
68     public void resetStats() {
69         super.resetStats();
70         num_views_adjusted=num_views_sent=0;
71     }
72
73
74
75     public boolean setProperties(Properties JavaDoc props) {
76         String JavaDoc str;
77
78         super.setProperties(props);
79
80         str=props.getProperty("avg_send_interval");
81         if(str != null) {
82             avg_send_interval=Long.parseLong(str);
83             props.remove("avg_send_interval");
84         }
85
86         if(!props.isEmpty()) {
87             log.error("these properties are not recognized: " + props);
88             return false;
89         }
90         return true;
91     }
92
93
94     public void start() throws Exception JavaDoc {
95         if(stack != null && stack.timer != null)
96             timer=stack.timer;
97         else
98             throw new Exception JavaDoc("timer cannot be retrieved from protocol stack");
99     }
100
101     public void stop() {
102         stopViewSender();
103     }
104
105     /** Sends a VIEW_SYNC_REQ to all members, every member replies with a VIEW multicast */
106     public void sendViewRequest() {
107         Message msg=new Message(null);
108         msg.setFlag(Message.OOB);
109         ViewSyncHeader hdr=new ViewSyncHeader(ViewSyncHeader.VIEW_SYNC_REQ, null);
110         msg.putHeader(name, hdr);
111         down_prot.down(new Event(Event.MSG, msg));
112     }
113
114 // public void sendFakeViewForTestingOnly() {
115
// ViewId fake_vid=new ViewId(local_addr, my_vid.getId() +2);
116
// View fake_view=new View(fake_vid, new Vector(my_view.getMembers()));
117
// System.out.println("sending fake view " + fake_view);
118
// my_view=fake_view;
119
// my_vid=fake_vid;
120
// sendView();
121
// }
122

123
124     public Object JavaDoc up(Event evt) {
125         Message msg;
126         ViewSyncHeader hdr;
127         int type=evt.getType();
128
129         switch(type) {
130
131             case Event.MSG:
132                 msg=(Message)evt.getArg();
133                 hdr=(ViewSyncHeader)msg.getHeader(name);
134                 if(hdr == null)
135                     break;
136                 Address sender=msg.getSrc();
137                 switch(hdr.type) {
138                     case ViewSyncHeader.VIEW_SYNC:
139                         handleView(hdr.view, sender);
140                         break;
141                     case ViewSyncHeader.VIEW_SYNC_REQ:
142                         if(!sender.equals(local_addr))
143                             sendView();
144                         break;
145                     default:
146                         if(log.isErrorEnabled()) log.error("ViewSyncHeader type " + hdr.type + " not known");
147                 }
148                 return null;
149
150             case Event.VIEW_CHANGE:
151                 View view=(View)evt.getArg();
152                 handleViewChange(view);
153                 break;
154
155             case Event.SET_LOCAL_ADDRESS:
156                 local_addr=(Address)evt.getArg();
157                 break;
158         }
159
160         return up_prot.up(evt);
161     }
162
163
164
165     public Object JavaDoc down(Event evt) {
166         switch(evt.getType()) {
167             case Event.VIEW_CHANGE:
168                 View v=(View)evt.getArg();
169                 handleViewChange(v);
170                 break;
171         }
172         return down_prot.down(evt);
173     }
174
175
176
177     /* --------------------------------------- Private Methods ---------------------------------------- */
178
179     private void handleView(View v, Address sender) {
180         Vector JavaDoc members=v.getMembers();
181         if(!members.contains(local_addr)) {
182             if(log.isWarnEnabled())
183                 log.warn("discarding view as I (" + local_addr + ") am not member of view (" + v + ")");
184             return;
185         }
186
187         ViewId vid=v.getVid();
188         int rc=vid.compareTo(my_vid);
189         if(rc > 0) { // foreign view is greater than my own view; update my own view !
190
if(log.isTraceEnabled())
191                 log.trace("view from " + sender + " (" + vid + ") is greater than my own view (" + my_vid + ");" +
192                 " will update my own view");
193
194             Message view_change=new Message(local_addr, local_addr, null);
195             org.jgroups.protocols.pbcast.GMS.GmsHeader hdr;
196             hdr=new org.jgroups.protocols.pbcast.GMS.GmsHeader(org.jgroups.protocols.pbcast.GMS.GmsHeader.VIEW, v);
197             view_change.putHeader(name, hdr);
198             up_prot.up(new Event(Event.MSG, view_change));
199             num_views_adjusted++;
200         }
201     }
202
203     private void handleViewChange(View view) {
204         Vector JavaDoc tmp=view.getMembers();
205         if(tmp != null) {
206             mbrs.clear();
207             mbrs.addAll(tmp);
208         }
209         my_view=(View)view.clone();
210         my_vid=my_view.getVid();
211         if(my_view.size() > 1) {
212             startViewSender();
213         }
214         else {
215             stopViewSender();
216         }
217     }
218
219     private void sendView() {
220         View tmp=(View)(my_view != null? my_view.clone() : null);
221         if(tmp == null) return;
222         Message msg=new Message(null); // send to the group
223
msg.setFlag(Message.OOB);
224         ViewSyncHeader hdr=new ViewSyncHeader(ViewSyncHeader.VIEW_SYNC, tmp);
225         msg.putHeader(name, hdr);
226         down_prot.down(new Event(Event.MSG, msg));
227         num_views_sent++;
228     }
229
230     /** Starts with view_task_lock held, no need to acquire it again */
231     void startViewSender() {
232         try {
233             view_task_lock.lock();
234             if(view_send_task_future == null || view_send_task_future.isDone()) {
235                 ViewSendTask view_send_task=new ViewSendTask();
236                 view_send_task_future=timer.scheduleWithDynamicInterval(view_send_task, true); // fixed-rate scheduling
237
if(log.isTraceEnabled())
238                     log.trace("view send task started");
239             }
240         }
241         finally {
242             view_task_lock.unlock();
243         }
244     }
245
246
247     void stopViewSender() {
248         try {
249             view_task_lock.lock();
250             if(view_send_task_future != null) {
251                 view_send_task_future.cancel(false);
252                 view_send_task_future=null;
253                 if(log.isTraceEnabled())
254                     log.trace("view send task stopped");
255             }
256         }
257         finally {
258             view_task_lock.unlock();
259         }
260     }
261
262
263
264
265
266
267     /* ------------------------------------End of Private Methods ------------------------------------- */
268
269
270
271
272
273
274
275     public static class ViewSyncHeader extends Header implements Streamable {
276         public static final int VIEW_SYNC = 1; // contains a view
277
public static final int VIEW_SYNC_REQ = 2; // request to all members to send their views
278

279         int type=0;
280         View view=null;
281
282         public ViewSyncHeader() {
283         }
284
285
286         public ViewSyncHeader(int type, View view) {
287             this.type=type;
288             this.view=view;
289         }
290
291         public int getType() {
292             return type;
293         }
294
295         public View getView() {
296             return view;
297         }
298
299         static String JavaDoc type2String(int t) {
300             switch(t) {
301                 case VIEW_SYNC:
302                     return "VIEW_SYNC";
303                 case VIEW_SYNC_REQ:
304                     return "VIEW_SYNC_REQ";
305                 default:
306                     return "<unknown>";
307             }
308         }
309
310         public String JavaDoc toString() {
311             StringBuilder JavaDoc sb=new StringBuilder JavaDoc("[").append(type2String(type)).append("]");
312             if(view != null)
313                 sb.append(", view= ").append(view);
314             return sb.toString();
315         }
316
317
318         public void writeExternal(ObjectOutput out) throws IOException {
319             out.writeInt(type);
320             if(view == null) {
321                 out.writeBoolean(false);
322                 return;
323             }
324             out.writeBoolean(true);
325             view.writeExternal(out);
326         }
327
328
329         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
330             type=in.readInt();
331             boolean available=in.readBoolean();
332             if(available) {
333                 view=new View();
334                 view.readExternal(in);
335             }
336         }
337
338         public int size() {
339             int retval=Global.INT_SIZE + Global.BYTE_SIZE + Global.BYTE_SIZE; // type + view type + presence for digest
340
if(view != null)
341                 retval+=view.serializedSize();
342             return retval;
343         }
344
345         public void writeTo(DataOutputStream out) throws IOException {
346             out.writeInt(type);
347             // 0 == null, 1 == View, 2 == MergeView
348
byte b=(byte)(view == null? 0 : (view instanceof MergeView? 2 : 1));
349             out.writeByte(b);
350             Util.writeStreamable(view, out);
351         }
352
353         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
354             type=in.readInt();
355             byte b=in.readByte();
356             Class JavaDoc clazz=b == 2? MergeView.class : View.class;
357             view=(View)Util.readStreamable(clazz, in);
358         }
359
360
361     }
362
363
364
365
366     /**
367      Periodically multicasts a View_SYNC message
368      */

369     private class ViewSendTask implements TimeScheduler.Task {
370
371         public long nextInterval() {
372             long interval=computeSleepTime();
373             if(interval <= 0)
374                 return 10000;
375             else
376                 return interval;
377         }
378
379
380         public void run() {
381             sendView();
382         }
383
384         long computeSleepTime() {
385             int num_mbrs=Math.max(mbrs.size(), 1);
386             return getRandom((num_mbrs * avg_send_interval * 2));
387         }
388
389         long getRandom(long range) {
390             return (long)((Math.random() * range) % range);
391         }
392     }
393
394
395
396 }
397
Popular Tags