KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > fr > dyade > aaa > agent > Engine


1 /*
2  * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies
3  * Copyright (C) 1996 - 2000 BULL
4  * Copyright (C) 1996 - 2000 INRIA
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  */

21 package fr.dyade.aaa.agent;
22
23 import java.io.IOException JavaDoc;
24 import java.util.Hashtable JavaDoc;
25 import java.util.Enumeration JavaDoc;
26 import java.util.Vector JavaDoc;
27
28 import org.objectweb.util.monolog.api.BasicLevel;
29 import org.objectweb.util.monolog.api.Logger;
30
31 import fr.dyade.aaa.util.*;
32
33 class EngineThread extends Thread JavaDoc {
34   Engine engine = null;
35
36   EngineThread(Engine engine) {
37     super(AgentServer.getThreadGroup(), engine, engine.getName());
38     this.engine = engine;
39   }
40 }
41
42 /**
43  * The <code>Engine</code> class provides multiprogramming of agents. It
44  * realizes the program loop which successively gets the notifications from
45  * the message queue and calls the relevant reaction function member of the
46  * target agent. The engine's basic behaviour is:
47  * <p><blockquote><pre>
48  * While (true) {
49  * // get next message in channel
50  * Message msg = qin.get();
51  * // get the agent to process event
52  * Agent agent = load(msg.to);
53  * // execute relevant reaction, all notification sent during this
54  * // reaction is inserted into persistant queue in order to processed
55  * // by the channel.
56  * agent.react(msg.from, msg.not);
57  * // save changes, then commit.
58  * &lt;BEGIN TRANSACTION&gt;
59  * qin.pop();
60  * channel.dispatch();
61  * agent.save();
62  * &lt;COMMIT TRANSACTION&gt;
63  * }
64  * </pre></blockquote>
65  * <p>
66  * The <code>Engine</code> class ensures the atomic handling of an agent
67  * reacting to a notification:
68  * <ul>
69  * <li>if the reaction completes, a COMMIT ensures all changes related to
70  * the reaction are committed (state change of the agent, notifications
71  * signaled during the reaction, deletion of the handled notification);
72  * <li>if anything goes wrong during the reaction, a ROLLBACK undoes the
73  * changes; depending on the error kind it may be necessary to execute
74  * additional operations to resynchronize the database and the memory
75  * objects, and to allow the main program to continue.
76  * </ul>
77  * <p><hr>
78  * <b>Handling errors.</b><p>
79  * Two types of errors may occur: errors of first type are detected in the
80  * source code and signaled by an <code>Exception</code>; serious errors lead
81  * to an <code>Error</code> being raised then the engine exits. In the first
82  * case the exception may be handled at any level, even partially. Most of
83  * them are signaled up to the engine loop. Two cases are then distinguished
84  * depending on the recovery policy:<ul>
85  * <li>if <code>recoveryPolicy</code> is set to <code>RP_EXC_NOT</code>
86  * (default value) then the agent state and the message queue are restored
87  * (ROLLBACK); an <code>ExceptionNotification</code> notification is sent
88  * to the sender and the engine may then proceed with next notification;
89  * <li>if <code>recoveryPolicy</code> is set to <code>RP_EXIT</code> the engine
90  * stops the agent server.
91  * </ul>
92  */

93 class Engine implements Runnable JavaDoc, MessageConsumer, EngineMBean {
94   /**
95    * Queue of messages to be delivered to local agents.
96    */

97   protected MessageQueue qin;
98
99   /**
100    * Boolean variable used to stop the engine properly. The engine tests
101    * this variable between each reaction, and stops if it is false.
102    */

103   protected volatile boolean isRunning;
104
105   /**
106    * Boolean variable used to stop the engine properly. If this variable
107    * is true then the engine is waiting and it can interupted, else it
108    * handles a notification and it will exit after (the engine tests the
109    * <code><a HREF="#isRunning">isRunning</a></code> variable between
110    * each reaction)
111    */

112   protected volatile boolean canStop;
113
114   /** Logical timestamp information for messages in "local" domain. */
115   private int stamp;
116
117   /** Buffer used to optimise */
118   private byte[] stampBuf = null;
119
120   /** True if the timestamp is modified since last save. */
121   private boolean modified = false;
122
123   /** This table is used to maintain a list of agents already in memory
124    * using the AgentId as primary key.
125    */

126   Hashtable JavaDoc agents;
127   /** Virtual time counter use in FIFO swap-in/swap-out mechanisms. */
128   long now = 0;
129   /** Maximum number of memory loaded agents. */
130   int NbMaxAgents = 100;
131
132   /**
133    * Returns the number of agent's reaction since last boot.
134    *
135    * @return the number of agent's reaction since last boot
136    */

137   public long getNbReactions() {
138     return now;
139   }
140
141   /**
142    * Returns the maximum number of agents loaded in memory.
143    *
144    * @return the maximum number of agents loaded in memory
145    */

146   public int getNbMaxAgents() {
147     return NbMaxAgents;
148   }
149
150   /**
151    * Sets the maximum number of agents that can be loaded simultaneously
152    * in memory.
153    *
154    * @parama NbMaxAgents the maximum number of agents
155    */

156   public void setNbMaxAgents(int NbMaxAgents) {
157     this.NbMaxAgents = NbMaxAgents;
158   }
159
160   /**
161    * Returns the number of agents actually loaded in memory.
162    *
163    * @return the maximum number of agents actually loaded in memory
164    */

165   public int getNbAgents() {
166     return agents.size();
167   }
168
169   /**
170    * Gets the number of messages posted to this engine since creation.
171    *
172    * return the number of messages.
173    */

174   public int getNbMessages() {
175     return stamp;
176   }
177
178   /**
179    * Gets the number of waiting messages in this engine.
180    *
181    * return the number of waiting messages.
182    */

183   public int getNbWaitingMessages() {
184     return qin.size();
185   }
186
187   /** Vector containing id's of all fixed agents. */
188   Vector JavaDoc fixedAgentIdList = null;
189
190   /**
191    * Returns the number of fixed agents.
192    *
193    * @return the number of fixed agents
194    */

195   public int getNbFixedAgents() {
196     return fixedAgentIdList.size();
197   }
198
199   /**
200    * The current agent running.
201    */

202   Agent agent = null;
203
204   /**
205    * The message in progress.
206    */

207   Message msg = null;
208
209   /**
210    * The active component of this engine.
211    */

212   EngineThread thread = null;
213
214   /**
215    * Send <code>ExceptionNotification</code> notification in case of exception
216    * in agent specific code.
217    * Constant value for the <code>recoveryPolicy</code> variable.
218    */

219   static final int RP_EXC_NOT = 0;
220   /**
221    * Stop agent server in case of exception in agent specific code.
222    * Constant value for the <code>recoveryPolicy</code> variable.
223    */

224   static final int RP_EXIT = 1;
225   /**
226    * String representations of <code>RP_*</code> constant values
227    * for the <code>recoveryPolicy</code> variable.
228    */

229   static final String JavaDoc[] rpStrings = {
230     "notification",
231     "exit"
232   };
233   /**
234    * recovery policy in case of exception in agent specific code.
235    * Default value is <code>RP_EXC_NOT</code>.
236    */

237   int recoveryPolicy = RP_EXC_NOT;
238
239   private String JavaDoc name;
240
241   /**
242    * Returns this <code>Engine</code>'s name.
243    *
244    * @return this <code>Engine</code>'s name.
245    */

246   public final String JavaDoc getName() {
247     return name;
248   }
249
250   /**
251    * Returns the corresponding domain's name.
252    *
253    * @return this domain's name.
254    */

255   public final String JavaDoc getDomainName() {
256     return "engine";
257   }
258
259   /**
260    * Creates a new instance of Engine (real class depends of server type).
261    *
262    * @return the corresponding <code>engine</code>'s instance.
263    */

264   static Engine newInstance() throws Exception JavaDoc {
265     String JavaDoc cname = "fr.dyade.aaa.agent.Engine";
266     cname = AgentServer.getProperty("Engine", cname);
267
268     Class JavaDoc eclass = Class.forName(cname);
269     return (Engine) eclass.newInstance();
270   }
271
272   protected Queue mq;
273
274   /**
275    * Push a new message in temporary queue until the end of current reaction.
276    * As this method is only called by engine's thread it does not need to be
277    * synchronized.
278    */

279   final void push(AgentId from,
280                   AgentId to,
281                   Notification not) {
282     if (logmon.isLoggable(BasicLevel.DEBUG))
283       logmon.log(BasicLevel.DEBUG,
284                  getName() + ", push(" + from + ", " + to + ", " + not + ")");
285     if ((to == null) || to.isNullId())
286       return;
287     
288     mq.push(Message.alloc(from, to, not));
289   }
290
291   /**
292    * Dispatch messages between the <a HREF="MessageConsumer.html">
293    * <code>MessageConsumer</code></a>: <a HREF="Engine.html">
294    * <code>Engine</code></a> component and <a HREF="Network.html">
295    * <code>Network</code></a> components.<p>
296    * Handle persistent information in respect with engine transaction.
297    * <p><hr>
298    * Be careful, this method must only be used during a transaction in
299    * order to ensure the mutual exclusion.
300    *
301    * @exception IOException error when accessing the local persistent
302    * storage.
303    */

304   final void dispatch() throws Exception JavaDoc {
305     Message msg = null;
306
307     while (! mq.isEmpty()) {
308       try {
309     msg = (Message) mq.get();
310       } catch (InterruptedException JavaDoc exc) {
311     continue;
312       }
313
314       if (msg.from == null) msg.from = AgentId.localId;
315       Channel.post(msg);
316       mq.pop();
317     }
318     Channel.save();
319   }
320
321   /**
322    * Cleans the Channel queue of all pushed notifications.
323    * <p><hr>
324    * Be careful, this method must only be used during a transaction in
325    * order to ensure the mutual exclusion.
326    */

327   final void clean() {
328     mq.removeAllElements();
329   }
330
331   protected Logger logmon = null;
332
333   /**
334    * Initializes a new <code>Engine</code> object (can only be used by
335    * subclasses).
336    */

337   protected Engine() throws Exception JavaDoc {
338     name = "Engine#" + AgentServer.getServerId();
339
340     // Get the logging monitor from current server MonologLoggerFactory
341
logmon = Debug.getLogger(Debug.A3Engine +
342                              ".#" + AgentServer.getServerId());
343     logmon.log(BasicLevel.DEBUG,
344                getName() + " created [" + getClass().getName() + "].");
345
346     NbMaxAgents = Integer.getInteger("NbMaxAgents", NbMaxAgents).intValue();
347     qin = new MessageVector(name, AgentServer.getTransaction().isPersistent());
348     if (! AgentServer.getTransaction().isPersistent()) {
349       NbMaxAgents = Integer.MAX_VALUE;
350     }
351     mq = new Queue();
352  
353     isRunning = false;
354     canStop = false;
355     thread = null;
356
357     needToBeCommited = false;
358
359     restore();
360     if (modified) save();
361   }
362
363   void init() throws Exception JavaDoc {
364     // Before any agent may be used, the environment, including the hash table,
365
// must be initialized.
366
agents = new Hashtable JavaDoc();
367     try {
368       // Creates or initializes AgentFactory, then loads and initializes
369
// all fixed agents.
370
fixedAgentIdList = (Vector JavaDoc) AgentServer.getTransaction().load(getName() + ".fixed");
371       if (fixedAgentIdList == null) {
372         // It's the first launching of this engine, in other case theres is
373
// at least the factory in fixedAgentIdList.
374
fixedAgentIdList = new Vector JavaDoc();
375         // Creates factory
376
AgentFactory factory = new AgentFactory(AgentId.factoryId);
377         createAgent(AgentId.factoryId, factory);
378         factory.save();
379         logmon.log(BasicLevel.WARN, getName() + ", factory created");
380       }
381
382       // loads all fixed agents
383
for (int i=0; i<fixedAgentIdList.size(); ) {
384     try {
385           if (logmon.isLoggable(BasicLevel.DEBUG))
386             logmon.log(BasicLevel.DEBUG,
387                        getName() + ", loads fixed agent" +
388                        fixedAgentIdList.elementAt(i));
389       Agent ag = load((AgentId) fixedAgentIdList.elementAt(i));
390           i += 1;
391     } catch (Exception JavaDoc exc) {
392           logmon.log(BasicLevel.ERROR,
393                      getName() + ", can't restore fixed agent" +
394                      fixedAgentIdList.elementAt(i), exc);
395           fixedAgentIdList.removeElementAt(i);
396     }
397       }
398     } catch (IOException JavaDoc exc) {
399       logmon.log(BasicLevel.ERROR, getName() + ", can't initialize");
400       throw exc;
401     }
402     logmon.log(BasicLevel.DEBUG, getName() + ", initialized");
403   }
404
405   void terminate() {
406     logmon.log(BasicLevel.DEBUG, getName() + ", ends");
407     Agent[] ag = new Agent[agents.size()];
408     int i = 0;
409     for (Enumeration JavaDoc e = agents.elements() ; e.hasMoreElements() ;) {
410       ag[i++] = (Agent) e.nextElement();
411     }
412     for (i--; i>=0; i--) {
413       if (logmon.isLoggable(BasicLevel.DEBUG))
414         logmon.log(BasicLevel.DEBUG,
415                    "Agent" + ag[i].id + " [" + ag[i].name + "] garbaged");
416       agents.remove(ag[i].id);
417       ag[i].agentFinalize(false);
418       ag[i] = null;
419     }
420   }
421
422   /**
423    * Creates and initializes an agent.
424    *
425    * @param agent agent object to create
426    *
427    * @exception Exception
428    * unspecialized exception
429    */

430   final void createAgent(AgentId id, Agent agent) throws Exception JavaDoc {
431     agent.id = id;
432     agent.deployed = true;
433     agent.agentInitialize(true);
434     createAgent(agent);
435   }
436
437   /**
438    * Creates and initializes an agent.
439    *
440    * @param agent agent object to create
441    *
442    * @exception Exception
443    * unspecialized exception
444    */

445   final void createAgent(Agent agent) throws Exception JavaDoc {
446     if (logmon.isLoggable(BasicLevel.DEBUG))
447       logmon.log(BasicLevel.DEBUG, getName() + ", creates: " + agent);
448
449     if (agent.isFixed()) {
450       // Subscribe the agent in pre-loading list.
451
addFixedAgentId(agent.getId());
452     }
453     if (agent.logmon == null)
454       agent.logmon = Debug.getLogger(fr.dyade.aaa.agent.Debug.A3Agent +
455                                      ".#" + AgentServer.getServerId());
456     agent.save();
457
458     // Memorize the agent creation and ...
459
now += 1;
460     garbage();
461     
462     agents.put(agent.getId(), agent);
463   }
464
465   /**
466    * Deletes an agent.
467    *
468    * @param agent agent to delete
469    */

470   void deleteAgent(AgentId from) throws Exception JavaDoc {
471     Agent ag;
472     try {
473       ag = load(from);
474       if (logmon.isLoggable(BasicLevel.DEBUG))
475         logmon.log(BasicLevel.DEBUG,
476                    getName() + ", delete Agent" + ag.id + " [" + ag.name + "]");
477       AgentServer.getTransaction().delete(ag.id.toString());
478     } catch (UnknownAgentException exc) {
479       logmon.log(BasicLevel.ERROR,
480                  getName() +
481                  ", can't delete unknown Agent" + from);
482       throw new Exception JavaDoc("Can't delete unknown Agent" + from);
483     } catch (Exception JavaDoc exc) {
484       logmon.log(BasicLevel.ERROR,
485                  getName() + ", can't delete Agent" + from, exc);
486       throw new Exception JavaDoc("Can't delete Agent" + from);
487     }
488     if (ag.isFixed())
489       removeFixedAgentId(ag.id);
490     agents.remove(ag.getId());
491     ag.agentFinalize(true);
492   }
493
494   /**
495    * The <code>garbage</code> method should be called regularly , to swap out
496    * from memory all the agents which have not been accessed for a time.
497    */

498   void garbage() {
499     if (agents.size() < (NbMaxAgents + fixedAgentIdList.size()))
500       return;
501
502     if (logmon.isLoggable(BasicLevel.INFO))
503       logmon.log(BasicLevel.INFO,
504                  getName() + ", garbage: " + agents.size() +
505                  '/' + NbMaxAgents + '+' + fixedAgentIdList.size() +
506                  ' ' + now);
507     long deadline = now - NbMaxAgents;
508     Agent[] ag = new Agent[agents.size()];
509     int i = 0;
510     for (Enumeration JavaDoc e = agents.elements() ; e.hasMoreElements() ;) {
511       ag[i++] = (Agent) e.nextElement();
512     }
513     for (i--; i>=0; i--) {
514       if ((ag[i].last <= deadline) && (!ag[i].fixed)) {
515         if (logmon.isLoggable(BasicLevel.DEBUG))
516       logmon.log(BasicLevel.DEBUG,
517                      "Agent" + ag[i].id + " [" + ag[i].name + "] garbaged");
518     agents.remove(ag[i].id);
519         ag[i].agentFinalize(false);
520         ag[i] = null;
521       }
522     }
523
524     logmon.log(BasicLevel.INFO,
525                getName() + ", garbage: " + agents.size());
526   }
527
528   /**
529    * Removes an <code>AgentId</code> in the <code>fixedAgentIdList</code>
530    * <code>Vector</code>.
531    *
532    * @param id the <code>AgentId</code> of no more used fixed agent.
533    */

534   void removeFixedAgentId(AgentId id) throws IOException JavaDoc {
535     fixedAgentIdList.removeElement(id);
536     AgentServer.getTransaction().save(fixedAgentIdList, getName() + ".fixed");
537   }
538
539   /**
540    * Adds an <code>AgentId</code> in the <code>fixedAgentIdList</code>
541    * <code>Vector</code>.
542    *
543    * @param id the <code>AgentId</code> of new fixed agent.
544    */

545   void addFixedAgentId(AgentId id) throws IOException JavaDoc {
546     fixedAgentIdList.addElement(id);
547     AgentServer.getTransaction().save(fixedAgentIdList, getName() + ".fixed");
548   }
549
550   /**
551    * Method used for debug and monitoring. It returns an enumeration
552    * of all agents loaded in memory.
553    */

554   AgentId[] getLoadedAgentIdlist() {
555     AgentId list[] = new AgentId[agents.size()];
556     int i = 0;
557     for (Enumeration JavaDoc e = agents.elements(); e.hasMoreElements() ;)
558       list[i++] = ((Agent) e.nextElement()).id;
559     return list;
560   }
561
562   /**
563    * Returns a string representation of the specified agent.
564    *
565    * @param id The string representation of the agent's unique identification.
566    * @return A string representation of the specified agent.
567    * @see Engine#dumpAgent(AgentId)
568    */

569   public String JavaDoc dumpAgent(String JavaDoc id) throws Exception JavaDoc {
570     return dumpAgent(AgentId.fromString(id));
571   }
572
573   /**
574    * Returns a string representation of the specified agent. If the agent
575    * is not present it is loaded in memory, be careful it is not initialized
576    * (agentInitialize) nor cached in agents vector.
577    *
578    * @param id The agent's unique identification.
579    * @return A string representation of specified agent.
580    */

581   public String JavaDoc dumpAgent(AgentId id)
582     throws IOException JavaDoc, ClassNotFoundException JavaDoc, Exception JavaDoc {
583     Agent ag = (Agent) agents.get(id);
584     if (ag == null) {
585       ag = Agent.load(id);
586       if (ag == null) {
587         return id.toString() + " unknown";
588       }
589     }
590     return ag.toString();
591   }
592
593   /**
594    * The <code>load</code> method return the <code>Agent</code> object
595    * designed by the <code>AgentId</code> parameter. If the <code>Agent</code>
596    * object is not already present in the server memory, it is loaded from
597    * the storage.
598    *
599    * Be carefull, if the save method can be overloaded to optimize the save
600    * processus, the load procedure used by engine is always load.
601    *
602    * @param id The agent identification.
603    * @return The corresponding agent.
604    *
605    * @exception IOException
606    * If an I/O error occurs.
607    * @exception ClassNotFoundException
608    * Should never happen (the agent has already been loaded in deploy).
609    * @exception UnknownAgentException
610    * There is no correponding agent on secondary storage.
611    * @exception Exception
612    * when executing class specific initialization
613    */

614   final Agent load(AgentId id)
615     throws IOException JavaDoc, ClassNotFoundException JavaDoc, Exception JavaDoc {
616     now += 1;
617
618     Agent ag = (Agent) agents.get(id);
619     if (ag == null) {
620       ag = reload(id);
621       garbage();
622     }
623     ag.last = now;
624
625     return ag;
626   }
627
628   /**
629    * The <code>reload</code> method return the <code>Agent</code> object
630    * loaded from the storage.
631    *
632    * @param id The agent identification.
633    * @return The corresponding agent.
634    *
635    * @exception IOException
636    * when accessing the stored image
637    * @exception ClassNotFoundException
638    * if the stored image class may not be found
639    * @exception Exception
640    * unspecialized exception
641    */

642   final Agent reload(AgentId id)
643     throws IOException JavaDoc, ClassNotFoundException JavaDoc, Exception JavaDoc {
644     Agent ag = null;
645     if ((ag = Agent.load(id)) != null) {
646       try {
647         // Set current agent running in order to allow from field fixed
648
// for sendTo during agentInitialize (We assume that only Engine
649
// use this method).
650
agent = ag;
651         ag.agentInitialize(false);
652       } catch (Throwable JavaDoc exc) {
653         agent = null;
654         // AF: May be we have to delete the agent or not to allow
655
// reaction on it.
656
logmon.log(BasicLevel.ERROR,
657                    getName() + "Can't initialize Agent" + ag.id +
658                    " [" + ag.name + "]",
659                    exc);
660         throw new Exception JavaDoc(getName() + "Can't initialize Agent" + ag.id);
661       }
662       if (ag.logmon == null)
663         ag.logmon = Debug.getLogger(fr.dyade.aaa.agent.Debug.A3Agent +
664                                     ".#" + AgentServer.getServerId());
665       agents.put(ag.id, ag);
666       if (logmon.isLoggable(BasicLevel.DEBUG))
667         logmon.log(BasicLevel.DEBUG,
668                    getName() + "Agent" + ag.id + " [" + ag.name + "] loaded");
669     } else {
670       throw new UnknownAgentException();
671     }
672
673     return ag;
674   }
675
676   /**
677    * Insert a message in the <code>MessageQueue</code>.
678    * This method is used during initialisation to restore the component
679    * state from persistent storage.
680    *
681    * @param msg the message
682    */

683   public void insert(Message msg) {
684     qin.insert(msg);
685   }
686
687   /**
688    * Validates all messages pushed in queue during transaction session.
689    */

690   public void validate() {
691     qin.validate();
692   }
693
694   /**
695    * Causes this engine to begin execution.
696    *
697    * @see stop
698    */

699   public void start() {
700     if (isRunning) return;
701
702     thread = new EngineThread(this);
703     thread.setDaemon(false);
704
705     logmon.log(BasicLevel.DEBUG, getName() + " starting.");
706
707     String JavaDoc rp = AgentServer.getProperty("Engine.recoveryPolicy");
708     if (rp != null) {
709       for (int i = rpStrings.length; i-- > 0;) {
710     if (rp.equals(rpStrings[i])) {
711       recoveryPolicy = i;
712       break;
713     }
714       }
715     }
716     isRunning = true;
717     canStop = true;
718     thread.start();
719
720     logmon.log(BasicLevel.DEBUG, getName() + " started.");
721   }
722   
723   /**
724    * Forces the engine to stop executing.
725    *
726    * @see start
727    */

728   public void stop() {
729     logmon.log(BasicLevel.DEBUG, getName() + ", stops.");
730     isRunning = false;
731
732     if (thread != null) {
733       while (thread.isAlive()) {
734         if (canStop) {
735
736           if (thread.isAlive())
737             thread.interrupt();
738         }
739         try {
740           thread.join(1000L);
741         } catch (InterruptedException JavaDoc exc) {
742           continue;
743         }
744       }
745       thread = null;
746     }
747   }
748
749   /**
750    * Get this engine's <code>MessageQueue</code> qin.
751    *
752    * @return this <code>Engine</code>'s queue.
753    */

754   public MessageQueue getQueue() {
755     return qin;
756   }
757
758   /**
759    * Tests if the engine is alive.
760    *
761    * @return true if this <code>MessageConsumer</code> is alive; false
762    * otherwise.
763    */

764   public boolean isRunning() {
765     return isRunning;
766   }
767
768   /**
769    * Saves logical clock information to persistent storage.
770    */

771   public void save() throws IOException JavaDoc {
772     if (modified) {
773       stampBuf[0] = (byte)((stamp >>> 24) & 0xFF);
774       stampBuf[1] = (byte)((stamp >>> 16) & 0xFF);
775       stampBuf[2] = (byte)((stamp >>> 8) & 0xFF);
776       stampBuf[3] = (byte)(stamp & 0xFF);
777       AgentServer.getTransaction().saveByteArray(stampBuf, getName());
778       modified = false;
779     }
780   }
781
782   /**
783    * Restores logical clock information from persistent storage.
784    */

785   public void restore() throws Exception JavaDoc {
786     stampBuf = AgentServer.getTransaction().loadByteArray(getName());
787     if (stampBuf == null) {
788       stamp = 0;
789       stampBuf = new byte[4];
790       modified = true;
791     } else {
792       stamp = ((stampBuf[0] & 0xFF) << 24) +
793         ((stampBuf[1] & 0xFF) << 16) +
794         ((stampBuf[2] & 0xFF) << 8) +
795         (stampBuf[3] & 0xFF);
796       modified = false;
797     }
798   }
799
800   /**
801    * This operation always throws an IllegalStateException.
802    */

803   public void delete() throws IllegalStateException JavaDoc {
804     throw new IllegalStateException JavaDoc();
805   }
806
807   protected final int getStamp() {
808     return stamp;
809   }
810
811   protected final void setStamp(int stamp) {
812     modified = true;
813     this.stamp = stamp;
814   }
815
816   protected final void stamp(Message msg) {
817     modified = true;
818     msg.source = AgentServer.getServerId();
819     msg.dest = AgentServer.getServerId();
820     msg.stamp = ++stamp;
821   }
822
823   /**
824    * Adds a message in "ready to deliver" list. This method allocates a
825    * new time stamp to the message ; be Careful, changing the stamp imply
826    * the filename change too.
827    */

828   public void post(Message msg) throws Exception JavaDoc {
829     if ((msg.not.expiration > 0) &&
830         (msg.not.expiration < System.currentTimeMillis())) {
831       if (logmon.isLoggable(BasicLevel.DEBUG))
832         logmon.log(BasicLevel.DEBUG,
833                    getName() + ": removes expired notification " +
834                    msg.from + ", " + msg.not);
835       return;
836     }
837       
838     if (msg.isPersistent()) {
839       stamp(msg);
840       msg.save();
841     }
842
843     qin.push(msg);
844   }
845
846   protected boolean needToBeCommited = false;
847   protected long timeout = Long.MAX_VALUE;
848
849   protected void onTimeOut() {}
850
851   /**
852    * Main loop of agent server <code>Engine</code>.
853    */

854   public void run() {
855     try {
856       main_loop:
857       while (isRunning) {
858     agent = null;
859     canStop = true;
860
861     // Get a notification, then execute the right reaction.
862
try {
863       msg = (Message) qin.get(timeout);
864           if (msg == null) {
865             onTimeOut();
866             continue;
867           }
868     } catch (InterruptedException JavaDoc exc) {
869       continue;
870     }
871     
872     canStop = false;
873     if (! isRunning) break;
874
875         if ((msg.not.expiration <= 0) ||
876             (msg.not.expiration >= System.currentTimeMillis())) {
877           // The message is valid, try to load the destination agent
878
try {
879             agent = load(msg.to);
880           } catch (UnknownAgentException exc) {
881             // The destination agent don't exists, send an error
882
// notification to sending agent.
883
logmon.log(BasicLevel.ERROR,
884                        getName() + ": Unknown agent, " + msg.to + ".react(" +
885                        msg.from + ", " + msg.not + ")");
886             agent = null;
887             push(AgentId.localId,
888                  msg.from,
889                  new UnknownAgent(msg.to, msg.not));
890           } catch (Exception JavaDoc exc) {
891             // Can't load agent then send an error notification
892
// to sending agent.
893
logmon.log(BasicLevel.ERROR,
894                        getName() + ": Can't load agent, " + msg.to + ".react(" +
895                        msg.from + ", " + msg.not + ")",
896                        exc);
897             agent = null;
898             // Stop the AgentServer
899
AgentServer.stop(false);
900             break main_loop;
901           }
902         } else {
903           if (logmon.isLoggable(BasicLevel.DEBUG))
904             logmon.log(BasicLevel.DEBUG,
905                        getName() + ": removes expired notification " +
906                        msg.from + ", " + msg.not);
907         }
908
909     if (agent != null) {
910           if (logmon.isLoggable(BasicLevel.DEBUG))
911             logmon.log(BasicLevel.DEBUG,
912                        getName() + ": " + agent + ".react(" +
913                        msg.from + ", " + msg.not + ")");
914       try {
915             agent.react(msg.from, msg.not);
916           } catch (Exception JavaDoc exc) {
917             logmon.log(BasicLevel.ERROR,
918                        getName() + ": Uncaught exception during react, " +
919                        agent + ".react(" + msg.from + ", " + msg.not + ")",
920                        exc);
921         switch (recoveryPolicy) {
922         case RP_EXC_NOT:
923         default:
924           // In case of unrecoverable error during the reaction we have
925
// to rollback.
926
abort(exc);
927           // then continue.
928
continue;
929         case RP_EXIT:
930               // Stop the AgentServer
931
AgentServer.stop(false);
932           break main_loop;
933         }
934       }
935     }
936
937     // Commit all changes then continue.
938
commit();
939       }
940     } catch (Throwable JavaDoc exc) {
941       // There is an unrecoverable exception during the transaction
942
// we must exit from server.
943
logmon.log(BasicLevel.FATAL,
944                  getName() + ": Fatal error",
945                  exc);
946       canStop = false;
947       // Stop the AgentServer
948
AgentServer.stop(false);
949     } finally {
950       terminate();
951       logmon.log(BasicLevel.DEBUG, getName() + " stopped.");
952     }
953   }
954
955   /**
956    * Commit the agent reaction in case of rigth termination:<ul>
957    * <li>suppress the processed notification from message queue,
958    * then deletes it ;
959    * <li>push all new notifications in qin and qout, and saves them ;
960    * <li>saves the agent state ;
961    * <li>then commit the transaction to validate all changes.
962    * </ul>
963    */

964   void commit() throws Exception JavaDoc {
965     AgentServer.getTransaction().begin();
966     // Suppress the processed notification from message queue ..
967
qin.pop();
968     // .. then deletes it ..
969
msg.delete();
970     // .. and frees it.
971
msg.free();
972     // Post all notifications temporary keeped in mq in the rigth consumers,
973
// then saves changes.
974
dispatch();
975     // Saves the agent state then commit the transaction.
976
if (agent != null) agent.save();
977     AgentServer.getTransaction().commit();
978     // The transaction has commited, then validate all messages.
979
Channel.validate();
980     AgentServer.getTransaction().release();
981   }
982
983   /**
984    * Abort the agent reaction in case of error during execution. In case
985    * of unrecoverable error during the reaction we have to rollback:<ul>
986    * <li>reload the previous state of agent ;
987    * <li>remove the failed notification ;
988    * <li>clean the Channel queue of all pushed notifications ;
989    * <li>send an error notification to the sender ;
990    * <li>then commit the transaction to validate all changes.
991    * </ul>
992    */

993   void abort(Exception JavaDoc exc) throws Exception JavaDoc {
994     AgentServer.getTransaction().begin();
995     // Reload the state of agent.
996
try {
997       agent = reload(msg.to);
998     } catch (Exception JavaDoc exc2) {
999       logmon.log(BasicLevel.ERROR,
1000                 getName() + ", can't reload Agent" + msg.to, exc2);
1001      throw new Exception JavaDoc("Can't reload Agent" + msg.to);
1002    }
1003
1004    // Remove the failed notification ..
1005
qin.pop();
1006    // .. then deletes it ..
1007
msg.delete();
1008    // .. and frees it.
1009
msg.free();
1010    // Clean the Channel queue of all pushed notifications.
1011
clean();
1012    // Send an error notification to client agent.
1013
push(AgentId.localId,
1014         msg.from,
1015         new ExceptionNotification(msg.to, msg.not, exc));
1016    dispatch();
1017    AgentServer.getTransaction().commit();
1018    // The transaction has commited, then validate all messages.
1019
Channel.validate();
1020    AgentServer.getTransaction().release();
1021  }
1022
1023  /**
1024   * Returns a string representation of this engine.
1025   *
1026   * @return A string representation of this engine.
1027   */

1028  public String JavaDoc toString() {
1029    StringBuffer JavaDoc strbuf = new StringBuffer JavaDoc();
1030
1031    strbuf.append('(').append(super.toString());
1032    strbuf.append(",name=").append(getName());
1033    strbuf.append(",running=").append(isRunning());
1034    strbuf.append(",agent=").append(agent).append(')');
1035
1036    return strbuf.toString();
1037  }
1038}
1039
Popular Tags