KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > demos > CausalDemo


1 // $Id: CausalDemo.java,v 1.4 2004/09/23 16:29:35 belaban Exp $
2
package org.jgroups.demos;
3
4 import org.jgroups.*;
5
6 import java.io.Serializable JavaDoc;
7 import java.util.Random JavaDoc;
8 import java.util.Vector JavaDoc;
9
10
11
12 /**
13  * Simple causal demo where each member bcast a consecutive letter from the
14  * alphabet and picks the next member to transmit the next letter. Start a
15  * few instances of CausalDemo and pass a paramter "-start" to a CausalDemo
16  * that initiates transmission of a letter A. All participanting members should
17  * have correct alphabet. DISCARD layer has been added to simulate lost messages,
18  * thus forcing delaying of delivery of a certain alphabet letter until the causally
19  * prior one has been received. Remove CAUSAL from the stack and witness how FIFO
20  * alone doesn't provide this guarantee.
21  *
22  * @author Vladimir Blagojevic
23  */

24 public class CausalDemo implements Runnable JavaDoc
25 {
26    private Channel channel;
27    private Thread JavaDoc mythread;
28    private final Vector JavaDoc alphabet = new Vector JavaDoc();
29    private boolean starter = false;
30    private int doneCount=0;
31
32    private final String JavaDoc props = "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;" +
33            "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +
34            "PING(timeout=2000;num_initial_members=5):" +
35            "DISCARD(up=0.05;excludeitself=true):" +
36            "FD_SOCK:" +
37            "VERIFY_SUSPECT(timeout=1500):" +
38            "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800,9600):" +
39            "UNICAST(timeout=5000):" +
40            "pbcast.STABLE(desired_avg_gossip=2000):" +
41            "FRAG(frag_size=4096;down_thread=false;up_thread=false):" +
42            "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
43            "shun=false;print_local_addr=true):CAUSAL";
44
45    public CausalDemo(boolean start)
46    {
47       starter = start;
48    }
49
50    public String JavaDoc getNext(String JavaDoc c)
51    {
52       char letter = c.charAt(0);
53       return new String JavaDoc(new char[]{++letter});
54    }
55
56    public void listAlphabet()
57    {
58       System.out.println(alphabet);
59    }
60
61    public void run()
62    {
63       Object JavaDoc obj;
64       Message msg;
65       Random JavaDoc r = new Random JavaDoc();
66
67       try
68       {
69          channel = new JChannel(props);
70          channel.connect("CausalGroup");
71          System.out.println("View:" + channel.getView());
72          if (starter)
73             channel.send(new Message(null, null, new CausalMessage("A", (Address) channel.getView().getMembers().get(0))));
74
75       }
76       catch (Exception JavaDoc e)
77       {
78          System.out.println("Could not conec to channel");
79       }
80
81       try
82       {
83          Runtime.getRuntime().addShutdownHook(
84                new Thread JavaDoc("Shutdown cleanup thread")
85                {
86                   public void run()
87                   {
88
89                      listAlphabet();
90                      channel.disconnect();
91                      channel.close();
92                   }
93                }
94          );
95       }
96       catch (Exception JavaDoc e)
97       {
98          System.out.println("Exception while shutting down" + e);
99       }
100
101       while (true)
102       {
103          try
104          {
105             CausalMessage cm = null;
106             obj = channel.receive(0); // no timeout
107
if (obj instanceof Message)
108             {
109                msg = (Message) obj;
110                cm = (CausalMessage) msg.getObject();
111                Vector JavaDoc members = channel.getView().getMembers();
112                String JavaDoc receivedLetter = cm.message;
113
114                if("Z".equals(receivedLetter))
115                {
116                   channel.send(new Message(null, null, new CausalMessage("done", null)));
117                }
118                if("done".equals(receivedLetter))
119                {
120                   if(++doneCount >= members.size())
121                   {
122                      System.exit(0);
123                   }
124                   continue;
125                }
126
127                alphabet.add(receivedLetter);
128                listAlphabet();
129
130                //am I chosen to transmit next letter?
131
if (cm.member.equals(channel.getLocalAddress()))
132                {
133                   int nextTarget = r.nextInt(members.size());
134
135                   //chose someone other than yourself
136
while (nextTarget == members.indexOf(channel.getLocalAddress()))
137                   {
138                      nextTarget = r.nextInt(members.size());
139                   }
140                   Address next = (Address) members.get(nextTarget);
141                   String JavaDoc nextChar = getNext(receivedLetter);
142                   if (nextChar.compareTo("Z") < 1)
143                   {
144                      System.out.println("Sending " + nextChar);
145                      channel.send(new Message(null, null, new CausalMessage(nextChar, next)));
146                   }
147                }
148             }
149          }
150          catch (ChannelNotConnectedException conn)
151          {
152             break;
153          }
154          catch (Exception JavaDoc e)
155          {
156             System.err.println(e);
157          }
158       }
159
160    }
161
162
163    public static void main(String JavaDoc args[])
164    {
165       CausalDemo test = null;
166       boolean start=false;
167
168       for(int i=0; i < args.length; i++) {
169       if("-help".equals(args[i])) {
170           System.out.println("CausalDemo [-help] [-start]");
171           return;
172       }
173       if("-start".equals(args[i])) {
174           start=true;
175           continue;
176       }
177       }
178
179       //if parameter start is passed , start the demo
180
test = new CausalDemo(start);
181       try
182       {
183          new Thread JavaDoc(test).start();
184       }
185       catch (Exception JavaDoc e)
186       {
187          System.err.println(e);
188       }
189
190    }
191
192 }
193
194 class CausalMessage implements Serializable JavaDoc
195 {
196    public final String JavaDoc message;
197    public final Address member;
198
199    public CausalMessage(String JavaDoc message, Address member)
200    {
201       this.message = message;
202       this.member = member;
203    }
204
205    public String JavaDoc toString()
206    {
207       return "CausalMessage[" + message + '=' + message + "member=" + member + ']';
208    }
209
210 }
211
Popular Tags