KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > demos > Gossip


1 // $Id: Gossip.java,v 1.6 2004/09/23 16:29:35 belaban Exp $
2

3 package org.jgroups.demos;
4
5
6 import org.jgroups.*;
7 import org.jgroups.util.Util;
8
9 import javax.swing.*;
10 import java.awt.*;
11 import java.awt.event.*;
12 import java.io.ByteArrayOutputStream JavaDoc;
13 import java.io.ObjectOutputStream JavaDoc;
14 import java.io.Serializable JavaDoc;
15 import java.util.Random JavaDoc;
16 import java.util.Vector JavaDoc;
17
18
19
20
21 /**
22  * Demos that tries to graphically illustrating the gossip (or pbcast) protocol: every sender periodically sends
23  * a DRAW command to a random subset of the group members. Each member checks whether it already received the
24  * message and applies it if not yet received. Otherwise it discards it. If not yet received, the message will
25  * be forwarded to 10% of the group members. This demo is probably only interesting when we have a larger
26  * number of members: a gossip will gradually reach all members, coloring their whiteboards.
27  */

28 public class Gossip implements Runnable JavaDoc, WindowListener, ActionListener, ChannelListener {
29     private Graphics graphics=null;
30     private Frame mainFrame=null;
31     private JPanel panel=null, sub_panel=null;
32     private final ByteArrayOutputStream JavaDoc out=new ByteArrayOutputStream JavaDoc();
33     private final Random JavaDoc random=new Random JavaDoc(System.currentTimeMillis());
34     private Button gossip_button, clear_button, leave_button;
35     private final Font default_font=new Font("Helvetica", Font.PLAIN, 12);
36     private final String JavaDoc groupname="GossipGroupDemo";
37     private Channel channel=null;
38     private Thread JavaDoc receiver=null;
39     private int member_size=1;
40     private final Vector JavaDoc members=new Vector JavaDoc();
41     private int red=0, green=0, blue=0;
42     private Color default_color=null;
43     boolean first=true;
44     final double subset=0.1;
45     Address local_addr=null;
46     TrafficGenerator gen=null;
47     long traffic_interval=0;
48
49
50     public Gossip(String JavaDoc props, long traffic) throws Exception JavaDoc {
51
52         channel=new JChannel(props);
53         channel.setChannelListener(this);
54         channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
55         traffic_interval=traffic;
56         if(traffic_interval > 0) {
57             gen=new TrafficGenerator();
58             gen.start();
59         }
60     }
61
62
63     public static void main(String JavaDoc[] args) {
64         Gossip gossip=null;
65         String JavaDoc props=null;
66         long traffic=0;
67
68
69         for(int i=0; i < args.length; i++) {
70             if("-help".equals(args[i])) {
71                 System.out.println("Gossip [-traffic_interval <interval in msecs>] [-help]");
72                 return;
73             }
74             if("-traffic_interval".equals(args[i])) {
75                 traffic=Long.parseLong(args[++i]);
76                 continue;
77             }
78         }
79
80
81         // props="UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:PERF(trace=;details=true)";
82

83
84
85         /**
86          props="TCP(start_port=8000):" +
87          "TCPPING(num_initial_members=1;timeout=3000;port_range=2;"+
88          "initial_hosts=daddy[8000],terrapin[8000],sindhu[8000]):" +
89          "FD:" +
90          "pbcast.PBCAST(gossip_interval=5000;gc_lag=50):" +
91          "UNICAST:" +
92          "FRAG:" +
93          "pbcast.GMS";
94          // "PERF(trace=true;details=true)";
95          **/

96
97
98
99
100
101         props="UDP(mcast_addr=224.10.10.100;mcast_port=5678;ip_ttl=32):" +
102                 "PING:" +
103                 // "FD(shun=true;timeout=5000):" +
104
"pbcast.FD(timeout=3000):" +
105                 "VERIFY_SUSPECT(timeout=2000;num_msgs=2):" +
106                 "pbcast.PBCAST(desired_avg_gossip=8000;mcast_gossip=true;gc_lag=30;max_queue=20):" +
107                 "UNICAST:" +
108                 "FRAG:" +
109                 "pbcast.GMS"; // :" + // ;join_timeout=20):" +
110
// "PERF(trace=true;details=true)";
111

112
113
114         try {
115             gossip=new Gossip(props, traffic);
116             gossip.go();
117         }
118         catch(Exception JavaDoc e) {
119             System.err.println(e);
120             System.exit(0);
121         }
122     }
123
124
125     private void selectColor() {
126         red=(Math.abs(random.nextInt()) % 255);
127         green=(Math.abs(random.nextInt()) % 255);
128         blue=(Math.abs(random.nextInt()) % 255);
129         default_color=new Color(red, green, blue);
130     }
131
132
133     public void go() {
134         try {
135             channel.connect(groupname);
136             local_addr=channel.getLocalAddress();
137             startThread();
138             mainFrame=new Frame();
139             panel=new MyPanel();
140             sub_panel=new JPanel();
141             mainFrame.setSize(250, 250);
142             mainFrame.add("Center", panel);
143             clear_button=new Button("Clear");
144             clear_button.setFont(default_font);
145             clear_button.addActionListener(this);
146             gossip_button=new Button("Gossip");
147             gossip_button.setFont(default_font);
148             gossip_button.addActionListener(this);
149             leave_button=new Button("Leave & Exit");
150             leave_button.setFont(default_font);
151             leave_button.addActionListener(this);
152             sub_panel.add("South", gossip_button);
153             sub_panel.add("South", clear_button);
154             sub_panel.add("South", leave_button);
155             mainFrame.add("South", sub_panel);
156             mainFrame.addWindowListener(this);
157             mainFrame.setVisible(true);
158             setTitle();
159             graphics=panel.getGraphics();
160             graphics.setColor(default_color);
161             mainFrame.setBackground(Color.white);
162             mainFrame.pack();
163             gossip_button.setForeground(Color.blue);
164             clear_button.setForeground(Color.blue);
165             leave_button.setForeground(Color.blue);
166         }
167         catch(Exception JavaDoc e) {
168             System.err.println(e);
169             return;
170         }
171     }
172
173
174     void startThread() {
175         receiver=new Thread JavaDoc(this, "GossipThread");
176         receiver.setPriority(Thread.MAX_PRIORITY);
177         receiver.start();
178     }
179
180
181     void setTitle() {
182         String JavaDoc title="";
183         if(local_addr != null)
184             title+=local_addr;
185         title+=" (" + member_size + ") mbrs";
186         mainFrame.setTitle(title);
187     }
188
189
190     public void run() {
191         Object JavaDoc tmp;
192         Message msg=null;
193         Command comm;
194         boolean fl=true;
195         Vector JavaDoc mbrs;
196         ObjectOutputStream JavaDoc os;
197
198         while(fl) {
199             try {
200                 tmp=channel.receive(0);
201                 // System.out.println("Gossip.run(): received " + tmp);
202

203                 if(tmp == null) continue;
204
205                 if(tmp instanceof View) {
206                     View v=(View)tmp;
207                     member_size=v.size();
208                     mbrs=v.getMembers();
209                     members.removeAllElements();
210                     for(int i=0; i < mbrs.size(); i++)
211                         members.addElement(mbrs.elementAt(i));
212                     if(mainFrame != null)
213                         setTitle();
214                     continue;
215                 }
216
217                 if(tmp instanceof ExitEvent) {
218                     // System.out.println("-- Gossip.main(): received EXIT, waiting for ChannelReconnected callback");
219
break;
220                 }
221
222                 if(!(tmp instanceof Message))
223                     continue;
224
225                 msg=(Message)tmp;
226                 comm=null;
227
228                 Object JavaDoc obj=msg.getObject();
229
230                 // System.out.println("obj is " + obj);
231

232                 if(obj instanceof Command)
233                     comm=(Command)obj;
234                 else
235                     if(obj instanceof Message) {
236                         System.out.println("*** Message is " + Util.printMessage((Message)obj));
237                         Util.dumpStack(true);
238                     }
239                     else {
240                         if(obj != null)
241                             System.out.println("obj is " + obj.getClass() + ", hdrs are" + msg.printObjectHeaders());
242                         else
243                             System.out.println("hdrs are" + msg.printObjectHeaders());
244                         Util.dumpStack(true);
245                     }
246
247                 switch(comm.mode) {
248                     case Command.GOSSIP:
249                         if(graphics != null) {
250                             colorPanel(comm.r, comm.g, comm.b);
251                             comm.not_seen.removeElement(local_addr);
252                             if(comm.not_seen.size() > 0) { // forward gossip
253
Vector JavaDoc v=Util.pickSubset(comm.not_seen, subset);
254                                 out.reset();
255                                 os=new ObjectOutputStream JavaDoc(out);
256                                 os.writeObject(comm);
257                                 os.flush();
258                                 for(int i=0; i < v.size(); i++) {
259                                     channel.send(new Message((Address)v.elementAt(i), null, out.toByteArray()));
260                                 }
261                             }
262                         }
263                         break;
264                     case Command.CLEAR:
265                         clearPanel();
266                         continue;
267                     default:
268                         System.err.println("***** Gossip.run(): received invalid draw command " + comm.mode);
269                         break;
270                 }
271
272             }
273             catch(ChannelNotConnectedException not) {
274                 System.err.println("Gossip: " + not);
275                 break;
276             }
277             catch(ChannelClosedException closed) {
278                 System.err.println("Gossip: channel was closed");
279                 break;
280             }
281             catch(Exception JavaDoc e) {
282                 System.err.println(e);
283                 continue; // break;
284
}
285         }
286     }
287
288
289     /* --------------- Callbacks --------------- */
290
291
292     public void mouseMoved(MouseEvent e) {
293     }
294
295
296     public void clearPanel() {
297         Rectangle bounds=null;
298         if(panel == null || graphics == null)
299             return;
300
301         bounds=panel.getBounds();
302         graphics.clearRect(0, 0, bounds.width, bounds.height);
303     }
304
305
306     public void colorPanel(int r, int g, int b) {
307         if(graphics != null) {
308             red=r;
309             green=g;
310             blue=b;
311             graphics.setColor(new Color(red, green, blue));
312             Rectangle bounds=panel.getBounds();
313             graphics.fillRect(0, 0, bounds.width, bounds.height);
314             graphics.setColor(default_color);
315         }
316     }
317
318
319     void sendGossip() {
320         int tmp[]=new int[1];
321         tmp[0]=0;
322         Command comm;
323         ObjectOutputStream JavaDoc os;
324         Vector JavaDoc dests=(Vector JavaDoc)members.clone();
325
326         try {
327             selectColor(); // set a new randomly chosen color
328
dests.removeElement(local_addr);
329             dests=Util.pickSubset(dests, subset);
330             if(dests == null || dests.size() == 0) { // only apply new color locally
331
// System.out.println("-- local");
332
colorPanel(red, green, blue);
333                 return;
334             }
335
336             colorPanel(red, green, blue);
337             comm=new Command(Command.GOSSIP, red, green, blue);
338             comm.not_seen=(Vector JavaDoc)members.clone();
339             comm.not_seen.removeElement(local_addr);
340             out.reset();
341             os=new ObjectOutputStream JavaDoc(out);
342             os.writeObject(comm);
343             os.flush();
344             for(int i=0; i < dests.size(); i++) {
345                 channel.send(new Message((Address)dests.elementAt(i), null, out.toByteArray()));
346             }
347         }
348         catch(Exception JavaDoc ex) {
349             System.err.println(ex);
350         }
351     }
352
353
354     public void sendClearPanelMsg() {
355         int tmp[]=new int[1];
356         tmp[0]=0;
357         Command comm=new Command(Command.CLEAR);
358         ObjectOutputStream JavaDoc os;
359
360         try {
361             out.reset();
362             os=new ObjectOutputStream JavaDoc(out);
363             os.writeObject(comm);
364             os.flush();
365             channel.send(new Message(null, null, out.toByteArray()));
366         }
367         catch(Exception JavaDoc ex) {
368             System.err.println(ex);
369         }
370     }
371
372
373     public void windowActivated(WindowEvent e) {
374     }
375
376     public void windowClosed(WindowEvent e) {
377     }
378
379     public void windowClosing(WindowEvent e) {
380         System.exit(0); // exit the dirty way ...
381
}
382
383     public void windowDeactivated(WindowEvent e) {
384     }
385
386     public void windowDeiconified(WindowEvent e) {
387     }
388
389     public void windowIconified(WindowEvent e) {
390     }
391
392     public void windowOpened(WindowEvent e) {
393     }
394
395
396     public void actionPerformed(ActionEvent e) {
397         String JavaDoc command=e.getActionCommand();
398         if("Gossip".equals(command)) {
399             sendGossip();
400         }
401         else
402             if("Clear".equals(command))
403                 sendClearPanelMsg();
404             else
405                 if("Leave & Exit".equals(command)) {
406                     try {
407                         channel.disconnect();
408                         channel.close();
409                     }
410                     catch(Exception JavaDoc ex) {
411                         System.err.println(ex);
412                     }
413                     mainFrame.setVisible(false);
414                     System.exit(0);
415                 }
416                 else
417                     System.out.println("Unknown action");
418     }
419
420
421     public void channelConnected(Channel channel) {
422         if(first)
423             first=false;
424         else
425             startThread();
426     }
427
428     public void channelDisconnected(Channel channel) {
429         // System.out.println("----> channelDisconnected()");
430
}
431
432     public void channelClosed(Channel channel) {
433         // System.out.println("----> channelClosed()");
434
}
435
436     public void channelShunned() {
437         System.out.println("----> channelShunned()");
438     }
439
440     public void channelReconnected(Address new_addr) {
441         System.out.println("----> channelReconnected(" + new_addr + ')');
442         local_addr=new_addr;
443     }
444
445
446     private static class Command implements Serializable JavaDoc {
447         static final int GOSSIP=1;
448         static final int CLEAR=2;
449         final int mode;
450         int r=0;
451         int g=0;
452         int b=0;
453         Vector JavaDoc not_seen=new Vector JavaDoc();
454
455         Command(int mode) {
456             this.mode=mode;
457         }
458
459         Command(int mode, int r, int g, int b) {
460             this.mode=mode;
461             this.r=r;
462             this.g=g;
463             this.b=b;
464         }
465
466
467         public String JavaDoc toString() {
468             StringBuffer JavaDoc ret=new StringBuffer JavaDoc();
469             switch(mode) {
470                 case GOSSIP:
471                     ret.append("GOSSIP(" + r + '|' + g + '|' + b);
472                     break;
473                 case CLEAR:
474                     ret.append("CLEAR");
475                     break;
476                 default:
477                     return "<undefined>";
478             }
479             ret.append(", not_seen=" + not_seen);
480             return ret.toString();
481         }
482     }
483
484
485     private class TrafficGenerator implements Runnable JavaDoc {
486         Thread JavaDoc generator=null;
487
488         public void start() {
489             if(generator == null) {
490                 generator=new Thread JavaDoc(this, "TrafficGeneratorThread");
491                 generator.start();
492             }
493         }
494
495         public void stop() {
496             if(generator != null)
497                 generator=null;
498             generator=null;
499         }
500
501         public void run() {
502             while(generator != null) {
503                 Util.sleep(traffic_interval);
504                 if(generator != null)
505                     sendGossip();
506             }
507         }
508     }
509
510
511     private class MyPanel extends JPanel {
512         final Dimension preferred_size=new Dimension(200, 200);
513
514         public Dimension getPreferredSize() {
515             return preferred_size;
516         }
517
518     }
519
520
521 }
522
523
524
525
526
527
528
Popular Tags