KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > fr > dyade > aaa > agent > HAEngine


1 /*
2  * Copyright (C) 2004 - 2005 ScalAgent Distributed Technologies
3  * Copyright (C) 2004 France Telecom R&D
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA.
19  *
20  * Initial developer(s): ScalAgent Distributed Technologies
21  * Contributor(s):
22  */

23 package fr.dyade.aaa.agent;
24
25 import java.io.*;
26 import java.util.Hashtable JavaDoc;
27 import java.util.Enumeration JavaDoc;
28 import java.util.Vector JavaDoc;
29
30 import org.objectweb.util.monolog.api.BasicLevel;
31 import org.objectweb.util.monolog.api.Logger;
32
33 import fr.dyade.aaa.util.*;
34
35 /**
36  * Implementation of Engine that used JGroups in order to improve
37  * fiability.
38  */

39 final class HAEngine extends Engine {
40   /** Queue of messages provide from external agent. */
41   private Vector JavaDoc qinFromExt;
42
43   /** JGroups component */
44   private JGroups jgroups = null;
45
46   HAEngine() throws Exception JavaDoc {
47     super();
48
49     qinFromExt = new Vector JavaDoc();
50     requestor = new Vector JavaDoc();
51   }
52
53   public void setJGroups(JGroups jgroups) {
54     this.jgroups = jgroups;
55   }
56
57   /**
58    * Saves logical clock information to persistent storage.
59    */

60   public void save() throws IOException {}
61
62   /**
63    * Restores logical clock information from persistent storage.
64    */

65   public void restore() throws Exception JavaDoc {}
66
67   /**
68    * Adds a message in "ready to deliver" list. This method allocates a
69    * new time stamp to the message ; be Careful, changing the stamp imply
70    * the filename change too.
71    */

72   public synchronized void post(Message msg) throws Exception JavaDoc {
73     if (logmon.isLoggable(BasicLevel.DEBUG))
74       logmon.log(BasicLevel.DEBUG, getName() + " post(" + msg +")");
75
76     if (EngineThread.class.isInstance(Thread.currentThread())) {
77       // It's an internal message (from an agent reaction).
78
super.post(msg);
79     } else {
80       // send to JGROUPS msg
81
if (jgroups.coordinator) jgroups.send(msg);
82
83       stamp(msg);
84       msg.save();
85
86       qinFromExt.addElement(msg);
87     }
88   }
89
90   /**
91    * If the internal queue is empty, moves the first message from external
92    * queue. Il must always be bracketed by transaction begin and release
93    * methods.
94    */

95   private void postFromExt() {
96     Message msg = null;
97     if (qin.size() == 0) {
98       try {
99         msg = (Message) qinFromExt.elementAt(0);
100         qinFromExt.removeElementAt(0);
101       } catch (ArrayIndexOutOfBoundsException JavaDoc exc) {
102         if (logmon.isLoggable(BasicLevel.DEBUG))
103           logmon.log(BasicLevel.DEBUG,
104                      getName() + ", postFromExt(): qinFromExt empty");
105         return;
106       }
107       if (logmon.isLoggable(BasicLevel.DEBUG))
108         logmon.log(BasicLevel.DEBUG, getName() + ", postFromExt() -> " + msg);
109       qin.push(msg);
110       qin.validate();
111       return;
112     }
113     if (logmon.isLoggable(BasicLevel.DEBUG)) {
114       logmon.log(BasicLevel.DEBUG, getName() + ", postFromExt()");
115     }
116   }
117
118   /**
119    * Validates all messages pushed in queue during transaction session.
120    */

121   public void validate() {
122     if (! needToSync) postFromExt();
123     super.validate();
124   }
125
126   /**
127    * Commit the agent reaction in case of rigth termination:<ul>
128    * <li>suppress the processed notification from message queue,
129    * then deletes it ;
130    * <li>push all new notifications in qin and qout, and saves them ;
131    * <li>saves the agent state ;
132    * <li>then commit the transaction to validate all changes.
133    * </ul>
134    */

135   void commit() throws Exception JavaDoc {
136     if (! requestor.isEmpty())
137       needToSync = true;
138
139     super.commit();
140
141     if (needToSync && (qin.size() == 0)) {
142       // Get state server
143
getState();
144
145       // Now feed qin if possible
146
needToSync = false;
147     }
148     postFromExt();
149   }
150
151   /**
152    * Abort the agent reaction in case of error during execution. In case
153    * of unrecoverable error during the reaction we have to rollback:<ul>
154    * <li>reload the previous state of agent ;
155    * <li>remove the failed notification ;
156    * <li>clean the Channel queue of all pushed notifications ;
157    * <li>send an error notification to the sender ;
158    * <li>then commit the transaction to validate all changes.
159    * </ul>
160    */

161   void abort(Exception JavaDoc exc) throws Exception JavaDoc {
162     super.abort(exc);
163     postFromExt();
164   }
165   
166   void receiveFromJGroups(Message msg) throws Exception JavaDoc {
167     if (logmon.isLoggable(BasicLevel.DEBUG))
168       logmon.log(BasicLevel.DEBUG,
169                  getName() + " receiveFromJGroups(" + msg + ")");
170
171     AgentServer.getTransaction().begin();
172     stamp(msg);
173     msg.save();
174     
175     if (logmon.isLoggable(BasicLevel.DEBUG))
176       logmon.log(BasicLevel.DEBUG,getName() +
177                  " receiveFromJGroups qin.size() = " + qin.size() +
178                  ", qinFromExt.size() = " + qinFromExt.size());
179
180     AgentServer.getTransaction().commit();
181     qinFromExt.addElement(msg);
182     postFromExt();
183     AgentServer.getTransaction().release();
184   }
185
186   volatile boolean needToSync = false;
187   volatile Vector JavaDoc requestor = null;
188
189   static private final byte[] OOS_STREAM_HEADER = {
190     (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF),
191     (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF),
192     (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF),
193     (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF)
194   };
195
196   /**
197    * Get the current state of Engine: agents, messages, etc.
198    * This operation is done in transactoion so Engine can't get more
199    * messages and Network can't post new !!
200    */

201   synchronized void getState() throws Exception JavaDoc {
202     if (logmon.isLoggable(BasicLevel.DEBUG))
203       logmon.log(BasicLevel.DEBUG, getName() + ", getState()");
204
205     HAStateReply reply = new HAStateReply();
206     // Get clock
207
reply.now = now;
208     reply.stamp = getStamp();
209     // gets state of all agents
210
ByteArrayOutputStream baos = new ByteArrayOutputStream();
211     ObjectOutputStream oos = new ObjectOutputStream(baos);
212     try {
213       oos.writeObject(AgentIdStamp.stamp);
214       for (Enumeration JavaDoc e = agents.elements(); e.hasMoreElements();) {
215         Agent agent = (Agent) e.nextElement();
216         // Don't put the agent factory
217
if (! (agent instanceof AgentFactory)) {
218           oos.writeObject(agent.getId());
219           oos.writeObject(agent);
220           if (agent instanceof BagSerializer) {
221             ((BagSerializer) agent).writeBag(oos);
222           }
223         }
224       }
225       oos.flush();
226       reply.agents = baos.toByteArray();
227     } finally {
228       try {
229         oos.close();
230       } catch (Exception JavaDoc exc) {}
231     }
232
233     // gets all pending messages
234
baos.reset();
235     oos = new ObjectOutputStream(baos);
236     try {
237       oos.writeObject(qinFromExt);
238       reply.messages = baos.toByteArray();
239     } finally {
240       try {
241         oos.close();
242       } catch (Exception JavaDoc exc) {}
243     }
244
245     // Get Network clock
246
reply.setNetworkStamp(jgroups.network.getStamp());
247
248 // while (! requestor.isEmpty()) {
249
// jgroups.sendTo((org.jgroups.Address) requestor.firstElement(), reply);
250
// requestor.removeElementAt(0);
251
// }
252
requestor.clear();
253     jgroups.send(reply);
254   }
255
256   synchronized void setState(HAStateReply reply) throws Exception JavaDoc {
257     if (logmon.isLoggable(BasicLevel.DEBUG))
258       logmon.log(BasicLevel.DEBUG, getName() + ", setState()");
259     
260     now = reply.now;
261     setStamp(reply.stamp);
262     // creates all agents
263
ByteArrayInputStream bis = new ByteArrayInputStream(reply.agents);
264     ObjectInputStream ois = new ObjectInputStream(bis);
265     try {
266       AgentId id = null;
267       Agent agent = null;
268       AgentIdStamp.stamp = (AgentIdStamp) ois.readObject();
269       while (true) {
270         id = (AgentId) ois.readObject();
271         agent = (Agent) ois.readObject();
272         agent.id = id;
273         agent.deployed = true;
274         // If there is a bag associated with this agent don't initialize it!
275
if (agent instanceof BagSerializer) {
276           ((BagSerializer) agent).readBag(ois);
277         } else {
278           agent.agentInitialize(false);
279         }
280         createAgent(agent);
281       }
282     } catch (EOFException exc) {
283     } finally {
284       try {
285         ois.close();
286       } catch (Exception JavaDoc exc) {}
287     }
288     // inserts all pending messages
289
bis = new ByteArrayInputStream(reply.messages);
290     ois = new ObjectInputStream(bis);
291     try {
292       qinFromExt = (Vector JavaDoc) ois.readObject();
293       postFromExt();
294     } finally {
295       try {
296         ois.close();
297       } catch (Exception JavaDoc exc) {}
298     }
299   }
300
301   Object JavaDoc load(byte[] buf) throws Exception JavaDoc {
302     Object JavaDoc obj = null;
303     ByteArrayInputStream bis = new ByteArrayInputStream(buf);
304     ObjectInputStream ois = new ObjectInputStream(bis);
305     obj = ois.readObject();
306     try {
307       ois.close();
308     } catch (IOException exc) {}
309     return obj;
310   }
311   
312 }
313
Popular Tags