1 23 package fr.dyade.aaa.agent; 24 25 import java.io.*; 26 import java.util.Hashtable ; 27 import java.util.Enumeration ; 28 import java.util.Vector ; 29 30 import org.objectweb.util.monolog.api.BasicLevel; 31 import org.objectweb.util.monolog.api.Logger; 32 33 import fr.dyade.aaa.util.*; 34 35 39 final class HAEngine extends Engine { 40 41 private Vector qinFromExt; 42 43 44 private JGroups jgroups = null; 45 46 HAEngine() throws Exception { 47 super(); 48 49 qinFromExt = new Vector (); 50 requestor = new Vector (); 51 } 52 53 public void setJGroups(JGroups jgroups) { 54 this.jgroups = jgroups; 55 } 56 57 60 public void save() throws IOException {} 61 62 65 public void restore() throws Exception {} 66 67 72 public synchronized void post(Message msg) throws Exception { 73 if (logmon.isLoggable(BasicLevel.DEBUG)) 74 logmon.log(BasicLevel.DEBUG, getName() + " post(" + msg +")"); 75 76 if (EngineThread.class.isInstance(Thread.currentThread())) { 77 super.post(msg); 79 } else { 80 if (jgroups.coordinator) jgroups.send(msg); 82 83 stamp(msg); 84 msg.save(); 85 86 qinFromExt.addElement(msg); 87 } 88 } 89 90 95 private void postFromExt() { 96 Message msg = null; 97 if (qin.size() == 0) { 98 try { 99 msg = (Message) qinFromExt.elementAt(0); 100 qinFromExt.removeElementAt(0); 101 } catch (ArrayIndexOutOfBoundsException exc) { 102 if (logmon.isLoggable(BasicLevel.DEBUG)) 103 logmon.log(BasicLevel.DEBUG, 104 getName() + ", postFromExt(): qinFromExt empty"); 105 return; 106 } 107 if (logmon.isLoggable(BasicLevel.DEBUG)) 108 logmon.log(BasicLevel.DEBUG, getName() + ", postFromExt() -> " + msg); 109 qin.push(msg); 110 qin.validate(); 111 return; 112 } 113 if (logmon.isLoggable(BasicLevel.DEBUG)) { 114 logmon.log(BasicLevel.DEBUG, getName() + ", postFromExt()"); 115 } 116 } 117 118 121 public void validate() { 122 if (! needToSync) postFromExt(); 123 super.validate(); 124 } 125 126 135 void commit() throws Exception { 136 if (! requestor.isEmpty()) 137 needToSync = true; 138 139 super.commit(); 140 141 if (needToSync && (qin.size() == 0)) { 142 getState(); 144 145 needToSync = false; 147 } 148 postFromExt(); 149 } 150 151 161 void abort(Exception exc) throws Exception { 162 super.abort(exc); 163 postFromExt(); 164 } 165 166 void receiveFromJGroups(Message msg) throws Exception { 167 if (logmon.isLoggable(BasicLevel.DEBUG)) 168 logmon.log(BasicLevel.DEBUG, 169 getName() + " receiveFromJGroups(" + msg + ")"); 170 171 AgentServer.getTransaction().begin(); 172 stamp(msg); 173 msg.save(); 174 175 if (logmon.isLoggable(BasicLevel.DEBUG)) 176 logmon.log(BasicLevel.DEBUG,getName() + 177 " receiveFromJGroups qin.size() = " + qin.size() + 178 ", qinFromExt.size() = " + qinFromExt.size()); 179 180 AgentServer.getTransaction().commit(); 181 qinFromExt.addElement(msg); 182 postFromExt(); 183 AgentServer.getTransaction().release(); 184 } 185 186 volatile boolean needToSync = false; 187 volatile Vector requestor = null; 188 189 static private final byte[] OOS_STREAM_HEADER = { 190 (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF), 191 (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF), 192 (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF), 193 (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF) 194 }; 195 196 201 synchronized void getState() throws Exception { 202 if (logmon.isLoggable(BasicLevel.DEBUG)) 203 logmon.log(BasicLevel.DEBUG, getName() + ", getState()"); 204 205 HAStateReply reply = new HAStateReply(); 206 reply.now = now; 208 reply.stamp = getStamp(); 209 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 211 ObjectOutputStream oos = new ObjectOutputStream(baos); 212 try { 213 oos.writeObject(AgentIdStamp.stamp); 214 for (Enumeration e = agents.elements(); e.hasMoreElements();) { 215 Agent agent = (Agent) e.nextElement(); 216 if (! (agent instanceof AgentFactory)) { 218 oos.writeObject(agent.getId()); 219 oos.writeObject(agent); 220 if (agent instanceof BagSerializer) { 221 ((BagSerializer) agent).writeBag(oos); 222 } 223 } 224 } 225 oos.flush(); 226 reply.agents = baos.toByteArray(); 227 } finally { 228 try { 229 oos.close(); 230 } catch (Exception exc) {} 231 } 232 233 baos.reset(); 235 oos = new ObjectOutputStream(baos); 236 try { 237 oos.writeObject(qinFromExt); 238 reply.messages = baos.toByteArray(); 239 } finally { 240 try { 241 oos.close(); 242 } catch (Exception exc) {} 243 } 244 245 reply.setNetworkStamp(jgroups.network.getStamp()); 247 248 requestor.clear(); 253 jgroups.send(reply); 254 } 255 256 synchronized void setState(HAStateReply reply) throws Exception { 257 if (logmon.isLoggable(BasicLevel.DEBUG)) 258 logmon.log(BasicLevel.DEBUG, getName() + ", setState()"); 259 260 now = reply.now; 261 setStamp(reply.stamp); 262 ByteArrayInputStream bis = new ByteArrayInputStream(reply.agents); 264 ObjectInputStream ois = new ObjectInputStream(bis); 265 try { 266 AgentId id = null; 267 Agent agent = null; 268 AgentIdStamp.stamp = (AgentIdStamp) ois.readObject(); 269 while (true) { 270 id = (AgentId) ois.readObject(); 271 agent = (Agent) ois.readObject(); 272 agent.id = id; 273 agent.deployed = true; 274 if (agent instanceof BagSerializer) { 276 ((BagSerializer) agent).readBag(ois); 277 } else { 278 agent.agentInitialize(false); 279 } 280 createAgent(agent); 281 } 282 } catch (EOFException exc) { 283 } finally { 284 try { 285 ois.close(); 286 } catch (Exception exc) {} 287 } 288 bis = new ByteArrayInputStream(reply.messages); 290 ois = new ObjectInputStream(bis); 291 try { 292 qinFromExt = (Vector ) ois.readObject(); 293 postFromExt(); 294 } finally { 295 try { 296 ois.close(); 297 } catch (Exception exc) {} 298 } 299 } 300 301 Object load(byte[] buf) throws Exception { 302 Object obj = null; 303 ByteArrayInputStream bis = new ByteArrayInputStream(buf); 304 ObjectInputStream ois = new ObjectInputStream(bis); 305 obj = ois.readObject(); 306 try { 307 ois.close(); 308 } catch (IOException exc) {} 309 return obj; 310 } 311 312 } 313 | Popular Tags |