KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > fr > dyade > aaa > agent > JGroups


1 /*
2  * Copyright (C) 2004 - 2005 ScalAgent Distributed Technologies
3  * Copyright (C) 2004 France Telecom R&D
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA.
19  *
20  * Initial developer(s): ScalAgent Distributed Technologies
21  * Contributor(s):
22  */

23 package fr.dyade.aaa.agent;
24
25 import java.util.*;
26 import java.io.*;
27
28 import org.objectweb.util.monolog.api.BasicLevel;
29 import org.objectweb.util.monolog.api.Logger;
30
31 import org.jgroups.MembershipListener;
32 import org.jgroups.MessageListener;
33 import org.jgroups.Message;
34 import org.jgroups.Channel;
35 import org.jgroups.JChannel;
36 import org.jgroups.Address;
37 import org.jgroups.View;
38 import org.jgroups.blocks.*;
39 import org.jgroups.util.Util;
40 import org.jgroups.ChannelException;
41 import org.jgroups.ChannelClosedException;
42 import org.jgroups.ChannelNotConnectedException;
43
44 /**
45  * Implementation of JGroups in order to improve HA.
46  */

47 final class JGroups
48   implements MembershipListener, MessageListener {
49   
50   static Logger logmon = null;
51
52   private int nbClusterExpected = 2;
53   boolean coordinator = false;
54   private Channel channel;
55   private Address myAddr = null;
56   private Address coordinatorAddr = null;
57   private String JavaDoc channelName = null;
58   HAEngine engine = null;
59   SimpleNetwork network = null; // AF: to replace with HANetwork
60
Object JavaDoc lock;
61
62   JGroups() throws Exception JavaDoc {
63     // Get the logging monitor from current server MonologLoggerFactory
64
logmon = Debug.getLogger(Debug.JGroups);
65     logmon.log(BasicLevel.DEBUG, "JGroups created.");
66
67     nbClusterExpected = AgentServer.getInteger("nbClusterExpected", nbClusterExpected).intValue();
68   }
69
70   void init(String JavaDoc name) throws Exception JavaDoc {
71     channelName = "HAJGroups." + name;
72
73     lock = new Object JavaDoc();
74
75     state = STARTING;
76
77     String JavaDoc addr = System.getProperty("JGroups.MCastAddr", "224.0.0.35");
78     String JavaDoc port = System.getProperty("JGroups.MCastPort", "25566");
79       
80     String JavaDoc props = System.getProperty(
81       "JGroupsProps",
82       "UDP(mcast_addr=" + addr +
83       ";mcast_port=" + port + ";ip_ttl=32;" +
84       "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +
85       "PING(timeout=2000;num_initial_members=3):" +
86       "MERGE2(min_interval=5000;max_interval=10000):" +
87       "FD_SOCK:" +
88       "VERIFY_SUSPECT(timeout=1500):" +
89       "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):" +
90       "UNICAST(timeout=5000):" +
91       "pbcast.STABLE(desired_avg_gossip=20000):" +
92       "FRAG(frag_size=4096;down_thread=false;up_thread=false):" +
93       "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
94       "shun=false;print_local_addr=true)");
95
96     channel = new JChannel(props);
97     channel.connect(channelName);
98     
99     new PullPushAdapter(channel,
100                         (MessageListener) this,
101                         (MembershipListener) this);
102     myAddr = channel.getLocalAddress();
103   }
104
105   void disconnect() {
106     channel.disconnect();
107   }
108   
109   void connect() throws ChannelException, ChannelClosedException {
110     if (!channel.isConnected()) channel.connect(channelName);
111   }
112
113   void startConsAndServ() {
114     if (logmon.isLoggable(BasicLevel.DEBUG))
115       logmon.log(BasicLevel.DEBUG,"start service and comsumer");
116
117     // Use another thread to start services and network in order
118
// to avoid dead-lock during send.
119
Thread JavaDoc t = new Thread JavaDoc() {
120       public void run() {
121         try {
122           ServiceManager.start();
123         } catch (Exception JavaDoc exc) {
124           logmon.log(BasicLevel.WARN, "services start failed.", exc);
125         }
126         try {
127           AgentServer.startConsumers();
128         } catch (Throwable JavaDoc exc) {
129           logmon.log(BasicLevel.WARN, "consumer start failed.", exc);
130         }
131       }
132     };
133     t.setDaemon(true);
134     t.start();
135   }
136
137   void send(Serializable message) throws Exception JavaDoc {
138     if (logmon.isLoggable(BasicLevel.DEBUG))
139       logmon.log(BasicLevel.DEBUG,"JGroups send(" + message + ")");
140
141     byte[] buf = null;
142     try {
143       ByteArrayOutputStream bos = new ByteArrayOutputStream(256);
144       ObjectOutputStream oos = new ObjectOutputStream(bos);
145       oos.writeObject(message);
146       buf = bos.toByteArray();
147       oos.flush();
148     } catch(Exception JavaDoc e) {
149       logmon.log(BasicLevel.ERROR,"JGroups send message",e);
150       throw e;
151     }
152     if (buf == null) return;
153     Message msg = new Message(null, null, buf);
154     synchronized (lock) {
155       channel.send(msg);
156       lock.wait();
157     }
158   }
159   
160   void sendTo(Address dst, Serializable obj) throws Exception JavaDoc {
161     if (logmon.isLoggable(BasicLevel.DEBUG))
162       logmon.log(BasicLevel.DEBUG,"JGroups sendTo(" + dst + "," + obj + ")");
163     channel.send(dst,myAddr,obj);
164   }
165
166   Address getCoordinatorAddr() {
167     return coordinatorAddr;
168   }
169
170   void setEngine(HAEngine engine) {
171     logmon.log(BasicLevel.DEBUG, "setEngine");
172     this.engine = engine;
173   }
174
175   void setNetWork(SimpleNetwork network) {
176     this.network = network;
177   }
178
179   boolean isCoordinator() {
180     return coordinator;
181   }
182
183   int state = NONE;
184   final static int NONE = -11;
185   final static int STARTING = 1;
186   final static int INITIALIZING = 2;
187   final static int RUNNING = 3;
188
189   /* ----- MessageListener interface ----- */
190   public void receive(Message msg) {
191     try {
192       Object JavaDoc obj = Util.objectFromByteBuffer(msg.getBuffer());
193       if (logmon.isLoggable(BasicLevel.DEBUG))
194         logmon.log(BasicLevel.DEBUG," receive obj = " + obj +
195                    "\nmsg.getSrc =" + msg.getSrc() +
196                    "\nmsg.getDest =" + msg.getDest() +
197                    "\nmyAddr = " + myAddr +
198                    "\ncoordinator = " + coordinator +
199                    "\nstate=" + state);
200       
201       if (myAddr.equals(msg.getSrc())) {
202         if (logmon.isLoggable(BasicLevel.DEBUG))
203           logmon.log(BasicLevel.DEBUG,"jgroups, I am the sender.");
204         if ((obj instanceof fr.dyade.aaa.agent.Message) ||
205             (obj instanceof JGroupsAckMsg) ||
206             (obj instanceof HAStateReply)) {
207           synchronized (lock) {
208             lock.notify();
209           }
210         }
211         return;
212       }
213
214       if (obj instanceof HAStateRequest && coordinator) {
215         HAStateRequest req = (HAStateRequest) obj;
216         engine.requestor.add(req.getAddress());
217       } else if (obj instanceof HAStateReply) {
218         if (state != INITIALIZING) return;
219
220         HAStateReply reply = (HAStateReply) obj;
221         // Services are already first initialized on master server, we
222
// have just to start them in startConsAndServ.
223
ServiceDesc services[] = ServiceManager.getServices();
224         if (services != null) {
225           for (int i = 0; i < services.length; i++)
226             services[i].initialized = true;
227         }
228         // Synchronizes network's logical clock
229
if (network != null)
230           network.setStamp(reply.getNetworkStamp());
231         // Sets engine's state (
232
engine.setState(reply);
233         state = RUNNING;
234       } else if (obj instanceof fr.dyade.aaa.agent.Message) {
235         if (state != RUNNING) return;
236
237         fr.dyade.aaa.agent.Message m = (fr.dyade.aaa.agent.Message) obj;
238         if ((network != null) &&
239             (m.from.getTo() != AgentServer.getServerId())) {
240           network.deliver(m);
241         } else {
242           engine.receiveFromJGroups(m);
243         }
244       } else if (obj instanceof JGroupsAckMsg && network != null) {
245         if (state != RUNNING) return;
246
247         network.ackMsg((JGroupsAckMsg) obj);
248       }
249     } catch(Exception JavaDoc exc) {
250       logmon.log(BasicLevel.ERROR,
251                  "JGroups part receive msg = " + msg, exc);
252     }
253   }
254
255   public byte[] getState() {
256     if (logmon.isLoggable(BasicLevel.DEBUG))
257       logmon.log(BasicLevel.DEBUG,"=== MessageListener getState");
258     return null;
259   }
260   
261   public void setState(byte[] state) {
262     if (logmon.isLoggable(BasicLevel.DEBUG))
263       logmon.log(BasicLevel.DEBUG,"=== MessageListener setState");
264   }
265   /* ----------------- End of Interface MessageListener --------------- */
266
267   /* ------------ Interface MembershipListener ------------- */
268
269   public void viewAccepted(View view) {
270     if (logmon.isLoggable(BasicLevel.DEBUG))
271       logmon.log(BasicLevel.DEBUG,"==== viewAccepted: " + view);
272
273     // Eventually get new coordinator address
274
Vector mbrs = view.getMembers();
275     coordinatorAddr = (Address) mbrs.elementAt(0);
276
277     if (logmon.isLoggable(BasicLevel.DEBUG))
278       logmon.log(BasicLevel.DEBUG,
279                  "JGroups setView: " + coordinator + ", " + state);
280
281     if (coordinator) {
282       // Test that the server is always master.
283
if (! coordinatorAddr.equals(myAddr)) {
284         logmon.log(BasicLevel.FATAL, "Bad view for coordinator");
285         throw new RuntimeException JavaDoc("Bad view for coordinator");
286       }
287       return;
288     }
289
290     if ((state != RUNNING) && (! coordinatorAddr.equals(myAddr))) {
291       // Ask current state to the new coordinator.
292
try {
293         sendTo(coordinatorAddr,new HAStateRequest(myAddr));
294         state = INITIALIZING;
295       } catch (Exception JavaDoc exc) {
296         logmon.log(BasicLevel.ERROR,"JGroups sendTo()",exc);
297       }
298     }
299
300     if ((mbrs.size() >= nbClusterExpected) &&
301         coordinatorAddr.equals(myAddr)) {
302       // This server is the new master !
303
coordinator = true;
304       // Starts the service
305
startConsAndServ();
306       // If not already done set state to RUNNING (this can be
307
// happen for the 1st master).
308
state = RUNNING;
309     }
310
311     if (logmon.isLoggable(BasicLevel.DEBUG))
312       logmon.log(BasicLevel.DEBUG,
313                  "JGroups setView: " + coordinator + ", " + state);
314   }
315   
316   public void suspect(Address suspected_mbr) {
317     if (logmon.isLoggable(BasicLevel.DEBUG))
318       logmon.log(BasicLevel.DEBUG,"==== suspect(): " + suspected_mbr);
319   }
320   
321   public void block() {
322     if (logmon.isLoggable(BasicLevel.DEBUG))
323       logmon.log(BasicLevel.DEBUG,"==== block()");
324   }
325   /* -------------------- End of Interface MembershipListener ----------------- */
326 }
327
Popular Tags