1 23 package fr.dyade.aaa.agent; 24 25 import java.util.*; 26 import java.io.*; 27 28 import org.objectweb.util.monolog.api.BasicLevel; 29 import org.objectweb.util.monolog.api.Logger; 30 31 import org.jgroups.MembershipListener; 32 import org.jgroups.MessageListener; 33 import org.jgroups.Message; 34 import org.jgroups.Channel; 35 import org.jgroups.JChannel; 36 import org.jgroups.Address; 37 import org.jgroups.View; 38 import org.jgroups.blocks.*; 39 import org.jgroups.util.Util; 40 import org.jgroups.ChannelException; 41 import org.jgroups.ChannelClosedException; 42 import org.jgroups.ChannelNotConnectedException; 43 44 47 final class JGroups 48 implements MembershipListener, MessageListener { 49 50 static Logger logmon = null; 51 52 private int nbClusterExpected = 2; 53 boolean coordinator = false; 54 private Channel channel; 55 private Address myAddr = null; 56 private Address coordinatorAddr = null; 57 private String channelName = null; 58 HAEngine engine = null; 59 SimpleNetwork network = null; Object lock; 61 62 JGroups() throws Exception { 63 logmon = Debug.getLogger(Debug.JGroups); 65 logmon.log(BasicLevel.DEBUG, "JGroups created."); 66 67 nbClusterExpected = AgentServer.getInteger("nbClusterExpected", nbClusterExpected).intValue(); 68 } 69 70 void init(String name) throws Exception { 71 channelName = "HAJGroups." + name; 72 73 lock = new Object (); 74 75 state = STARTING; 76 77 String addr = System.getProperty("JGroups.MCastAddr", "224.0.0.35"); 78 String port = System.getProperty("JGroups.MCastPort", "25566"); 79 80 String props = System.getProperty( 81 "JGroupsProps", 82 "UDP(mcast_addr=" + addr + 83 ";mcast_port=" + port + ";ip_ttl=32;" + 84 "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + 85 "PING(timeout=2000;num_initial_members=3):" + 86 "MERGE2(min_interval=5000;max_interval=10000):" + 87 "FD_SOCK:" + 88 "VERIFY_SUSPECT(timeout=1500):" + 89 "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):" + 90 "UNICAST(timeout=5000):" + 91 "pbcast.STABLE(desired_avg_gossip=20000):" + 92 "FRAG(frag_size=4096;down_thread=false;up_thread=false):" + 93 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + 94 "shun=false;print_local_addr=true)"); 95 96 channel = new JChannel(props); 97 channel.connect(channelName); 98 99 new PullPushAdapter(channel, 100 (MessageListener) this, 101 (MembershipListener) this); 102 myAddr = channel.getLocalAddress(); 103 } 104 105 void disconnect() { 106 channel.disconnect(); 107 } 108 109 void connect() throws ChannelException, ChannelClosedException { 110 if (!channel.isConnected()) channel.connect(channelName); 111 } 112 113 void startConsAndServ() { 114 if (logmon.isLoggable(BasicLevel.DEBUG)) 115 logmon.log(BasicLevel.DEBUG,"start service and comsumer"); 116 117 Thread t = new Thread () { 120 public void run() { 121 try { 122 ServiceManager.start(); 123 } catch (Exception exc) { 124 logmon.log(BasicLevel.WARN, "services start failed.", exc); 125 } 126 try { 127 AgentServer.startConsumers(); 128 } catch (Throwable exc) { 129 logmon.log(BasicLevel.WARN, "consumer start failed.", exc); 130 } 131 } 132 }; 133 t.setDaemon(true); 134 t.start(); 135 } 136 137 void send(Serializable message) throws Exception { 138 if (logmon.isLoggable(BasicLevel.DEBUG)) 139 logmon.log(BasicLevel.DEBUG,"JGroups send(" + message + ")"); 140 141 byte[] buf = null; 142 try { 143 ByteArrayOutputStream bos = new ByteArrayOutputStream(256); 144 ObjectOutputStream oos = new ObjectOutputStream(bos); 145 oos.writeObject(message); 146 buf = bos.toByteArray(); 147 oos.flush(); 148 } catch(Exception e) { 149 logmon.log(BasicLevel.ERROR,"JGroups send message",e); 150 throw e; 151 } 152 if (buf == null) return; 153 Message msg = new Message(null, null, buf); 154 synchronized (lock) { 155 channel.send(msg); 156 lock.wait(); 157 } 158 } 159 160 void sendTo(Address dst, Serializable obj) throws Exception { 161 if (logmon.isLoggable(BasicLevel.DEBUG)) 162 logmon.log(BasicLevel.DEBUG,"JGroups sendTo(" + dst + "," + obj + ")"); 163 channel.send(dst,myAddr,obj); 164 } 165 166 Address getCoordinatorAddr() { 167 return coordinatorAddr; 168 } 169 170 void setEngine(HAEngine engine) { 171 logmon.log(BasicLevel.DEBUG, "setEngine"); 172 this.engine = engine; 173 } 174 175 void setNetWork(SimpleNetwork network) { 176 this.network = network; 177 } 178 179 boolean isCoordinator() { 180 return coordinator; 181 } 182 183 int state = NONE; 184 final static int NONE = -11; 185 final static int STARTING = 1; 186 final static int INITIALIZING = 2; 187 final static int RUNNING = 3; 188 189 190 public void receive(Message msg) { 191 try { 192 Object obj = Util.objectFromByteBuffer(msg.getBuffer()); 193 if (logmon.isLoggable(BasicLevel.DEBUG)) 194 logmon.log(BasicLevel.DEBUG," receive obj = " + obj + 195 "\nmsg.getSrc =" + msg.getSrc() + 196 "\nmsg.getDest =" + msg.getDest() + 197 "\nmyAddr = " + myAddr + 198 "\ncoordinator = " + coordinator + 199 "\nstate=" + state); 200 201 if (myAddr.equals(msg.getSrc())) { 202 if (logmon.isLoggable(BasicLevel.DEBUG)) 203 logmon.log(BasicLevel.DEBUG,"jgroups, I am the sender."); 204 if ((obj instanceof fr.dyade.aaa.agent.Message) || 205 (obj instanceof JGroupsAckMsg) || 206 (obj instanceof HAStateReply)) { 207 synchronized (lock) { 208 lock.notify(); 209 } 210 } 211 return; 212 } 213 214 if (obj instanceof HAStateRequest && coordinator) { 215 HAStateRequest req = (HAStateRequest) obj; 216 engine.requestor.add(req.getAddress()); 217 } else if (obj instanceof HAStateReply) { 218 if (state != INITIALIZING) return; 219 220 HAStateReply reply = (HAStateReply) obj; 221 ServiceDesc services[] = ServiceManager.getServices(); 224 if (services != null) { 225 for (int i = 0; i < services.length; i++) 226 services[i].initialized = true; 227 } 228 if (network != null) 230 network.setStamp(reply.getNetworkStamp()); 231 engine.setState(reply); 233 state = RUNNING; 234 } else if (obj instanceof fr.dyade.aaa.agent.Message) { 235 if (state != RUNNING) return; 236 237 fr.dyade.aaa.agent.Message m = (fr.dyade.aaa.agent.Message) obj; 238 if ((network != null) && 239 (m.from.getTo() != AgentServer.getServerId())) { 240 network.deliver(m); 241 } else { 242 engine.receiveFromJGroups(m); 243 } 244 } else if (obj instanceof JGroupsAckMsg && network != null) { 245 if (state != RUNNING) return; 246 247 network.ackMsg((JGroupsAckMsg) obj); 248 } 249 } catch(Exception exc) { 250 logmon.log(BasicLevel.ERROR, 251 "JGroups part receive msg = " + msg, exc); 252 } 253 } 254 255 public byte[] getState() { 256 if (logmon.isLoggable(BasicLevel.DEBUG)) 257 logmon.log(BasicLevel.DEBUG,"=== MessageListener getState"); 258 return null; 259 } 260 261 public void setState(byte[] state) { 262 if (logmon.isLoggable(BasicLevel.DEBUG)) 263 logmon.log(BasicLevel.DEBUG,"=== MessageListener setState"); 264 } 265 266 267 268 269 public void viewAccepted(View view) { 270 if (logmon.isLoggable(BasicLevel.DEBUG)) 271 logmon.log(BasicLevel.DEBUG,"==== viewAccepted: " + view); 272 273 Vector mbrs = view.getMembers(); 275 coordinatorAddr = (Address) mbrs.elementAt(0); 276 277 if (logmon.isLoggable(BasicLevel.DEBUG)) 278 logmon.log(BasicLevel.DEBUG, 279 "JGroups setView: " + coordinator + ", " + state); 280 281 if (coordinator) { 282 if (! coordinatorAddr.equals(myAddr)) { 284 logmon.log(BasicLevel.FATAL, "Bad view for coordinator"); 285 throw new RuntimeException ("Bad view for coordinator"); 286 } 287 return; 288 } 289 290 if ((state != RUNNING) && (! coordinatorAddr.equals(myAddr))) { 291 try { 293 sendTo(coordinatorAddr,new HAStateRequest(myAddr)); 294 state = INITIALIZING; 295 } catch (Exception exc) { 296 logmon.log(BasicLevel.ERROR,"JGroups sendTo()",exc); 297 } 298 } 299 300 if ((mbrs.size() >= nbClusterExpected) && 301 coordinatorAddr.equals(myAddr)) { 302 coordinator = true; 304 startConsAndServ(); 306 state = RUNNING; 309 } 310 311 if (logmon.isLoggable(BasicLevel.DEBUG)) 312 logmon.log(BasicLevel.DEBUG, 313 "JGroups setView: " + coordinator + ", " + state); 314 } 315 316 public void suspect(Address suspected_mbr) { 317 if (logmon.isLoggable(BasicLevel.DEBUG)) 318 logmon.log(BasicLevel.DEBUG,"==== suspect(): " + suspected_mbr); 319 } 320 321 public void block() { 322 if (logmon.isLoggable(BasicLevel.DEBUG)) 323 logmon.log(BasicLevel.DEBUG,"==== block()"); 324 } 325 326 } 327 | Popular Tags |