1 21 package fr.dyade.aaa.agent; 22 23 import java.io.IOException ; 24 import java.util.Hashtable ; 25 import java.util.Enumeration ; 26 import java.util.Vector ; 27 28 import org.objectweb.util.monolog.api.BasicLevel; 29 import org.objectweb.util.monolog.api.Logger; 30 31 import fr.dyade.aaa.util.*; 32 33 class EngineThread extends Thread { 34 Engine engine = null; 35 36 EngineThread(Engine engine) { 37 super(AgentServer.getThreadGroup(), engine, engine.getName()); 38 this.engine = engine; 39 } 40 } 41 42 93 class Engine implements Runnable , MessageConsumer, EngineMBean { 94 97 protected MessageQueue qin; 98 99 103 protected volatile boolean isRunning; 104 105 112 protected volatile boolean canStop; 113 114 115 private int stamp; 116 117 118 private byte[] stampBuf = null; 119 120 121 private boolean modified = false; 122 123 126 Hashtable agents; 127 128 long now = 0; 129 130 int NbMaxAgents = 100; 131 132 137 public long getNbReactions() { 138 return now; 139 } 140 141 146 public int getNbMaxAgents() { 147 return NbMaxAgents; 148 } 149 150 156 public void setNbMaxAgents(int NbMaxAgents) { 157 this.NbMaxAgents = NbMaxAgents; 158 } 159 160 165 public int getNbAgents() { 166 return agents.size(); 167 } 168 169 174 public int getNbMessages() { 175 return stamp; 176 } 177 178 183 public int getNbWaitingMessages() { 184 return qin.size(); 185 } 186 187 188 Vector fixedAgentIdList = null; 189 190 195 public int getNbFixedAgents() { 196 return fixedAgentIdList.size(); 197 } 198 199 202 Agent agent = null; 203 204 207 Message msg = null; 208 209 212 EngineThread thread = null; 213 214 219 static final int RP_EXC_NOT = 0; 220 224 static final int RP_EXIT = 1; 225 229 static final String [] rpStrings = { 230 "notification", 231 "exit" 232 }; 233 237 int recoveryPolicy = RP_EXC_NOT; 238 239 private String name; 240 241 246 public final String getName() { 247 return name; 248 } 249 250 255 public final String getDomainName() { 256 return "engine"; 257 } 258 259 264 static Engine newInstance() throws Exception { 265 String cname = "fr.dyade.aaa.agent.Engine"; 266 cname = AgentServer.getProperty("Engine", cname); 267 268 Class eclass = Class.forName(cname); 269 return (Engine) eclass.newInstance(); 270 } 271 272 protected Queue mq; 273 274 279 final void push(AgentId from, 280 AgentId to, 281 Notification not) { 282 if (logmon.isLoggable(BasicLevel.DEBUG)) 283 logmon.log(BasicLevel.DEBUG, 284 getName() + ", push(" + from + ", " + to + ", " + not + ")"); 285 if ((to == null) || to.isNullId()) 286 return; 287 288 mq.push(Message.alloc(from, to, not)); 289 } 290 291 304 final void dispatch() throws Exception { 305 Message msg = null; 306 307 while (! mq.isEmpty()) { 308 try { 309 msg = (Message) mq.get(); 310 } catch (InterruptedException exc) { 311 continue; 312 } 313 314 if (msg.from == null) msg.from = AgentId.localId; 315 Channel.post(msg); 316 mq.pop(); 317 } 318 Channel.save(); 319 } 320 321 327 final void clean() { 328 mq.removeAllElements(); 329 } 330 331 protected Logger logmon = null; 332 333 337 protected Engine() throws Exception { 338 name = "Engine#" + AgentServer.getServerId(); 339 340 logmon = Debug.getLogger(Debug.A3Engine + 342 ".#" + AgentServer.getServerId()); 343 logmon.log(BasicLevel.DEBUG, 344 getName() + " created [" + getClass().getName() + "]."); 345 346 NbMaxAgents = Integer.getInteger("NbMaxAgents", NbMaxAgents).intValue(); 347 qin = new MessageVector(name, AgentServer.getTransaction().isPersistent()); 348 if (! AgentServer.getTransaction().isPersistent()) { 349 NbMaxAgents = Integer.MAX_VALUE; 350 } 351 mq = new Queue(); 352 353 isRunning = false; 354 canStop = false; 355 thread = null; 356 357 needToBeCommited = false; 358 359 restore(); 360 if (modified) save(); 361 } 362 363 void init() throws Exception { 364 agents = new Hashtable (); 367 try { 368 fixedAgentIdList = (Vector ) AgentServer.getTransaction().load(getName() + ".fixed"); 371 if (fixedAgentIdList == null) { 372 fixedAgentIdList = new Vector (); 375 AgentFactory factory = new AgentFactory(AgentId.factoryId); 377 createAgent(AgentId.factoryId, factory); 378 factory.save(); 379 logmon.log(BasicLevel.WARN, getName() + ", factory created"); 380 } 381 382 for (int i=0; i<fixedAgentIdList.size(); ) { 384 try { 385 if (logmon.isLoggable(BasicLevel.DEBUG)) 386 logmon.log(BasicLevel.DEBUG, 387 getName() + ", loads fixed agent" + 388 fixedAgentIdList.elementAt(i)); 389 Agent ag = load((AgentId) fixedAgentIdList.elementAt(i)); 390 i += 1; 391 } catch (Exception exc) { 392 logmon.log(BasicLevel.ERROR, 393 getName() + ", can't restore fixed agent" + 394 fixedAgentIdList.elementAt(i), exc); 395 fixedAgentIdList.removeElementAt(i); 396 } 397 } 398 } catch (IOException exc) { 399 logmon.log(BasicLevel.ERROR, getName() + ", can't initialize"); 400 throw exc; 401 } 402 logmon.log(BasicLevel.DEBUG, getName() + ", initialized"); 403 } 404 405 void terminate() { 406 logmon.log(BasicLevel.DEBUG, getName() + ", ends"); 407 Agent[] ag = new Agent[agents.size()]; 408 int i = 0; 409 for (Enumeration e = agents.elements() ; e.hasMoreElements() ;) { 410 ag[i++] = (Agent) e.nextElement(); 411 } 412 for (i--; i>=0; i--) { 413 if (logmon.isLoggable(BasicLevel.DEBUG)) 414 logmon.log(BasicLevel.DEBUG, 415 "Agent" + ag[i].id + " [" + ag[i].name + "] garbaged"); 416 agents.remove(ag[i].id); 417 ag[i].agentFinalize(false); 418 ag[i] = null; 419 } 420 } 421 422 430 final void createAgent(AgentId id, Agent agent) throws Exception { 431 agent.id = id; 432 agent.deployed = true; 433 agent.agentInitialize(true); 434 createAgent(agent); 435 } 436 437 445 final void createAgent(Agent agent) throws Exception { 446 if (logmon.isLoggable(BasicLevel.DEBUG)) 447 logmon.log(BasicLevel.DEBUG, getName() + ", creates: " + agent); 448 449 if (agent.isFixed()) { 450 addFixedAgentId(agent.getId()); 452 } 453 if (agent.logmon == null) 454 agent.logmon = Debug.getLogger(fr.dyade.aaa.agent.Debug.A3Agent + 455 ".#" + AgentServer.getServerId()); 456 agent.save(); 457 458 now += 1; 460 garbage(); 461 462 agents.put(agent.getId(), agent); 463 } 464 465 470 void deleteAgent(AgentId from) throws Exception { 471 Agent ag; 472 try { 473 ag = load(from); 474 if (logmon.isLoggable(BasicLevel.DEBUG)) 475 logmon.log(BasicLevel.DEBUG, 476 getName() + ", delete Agent" + ag.id + " [" + ag.name + "]"); 477 AgentServer.getTransaction().delete(ag.id.toString()); 478 } catch (UnknownAgentException exc) { 479 logmon.log(BasicLevel.ERROR, 480 getName() + 481 ", can't delete unknown Agent" + from); 482 throw new Exception ("Can't delete unknown Agent" + from); 483 } catch (Exception exc) { 484 logmon.log(BasicLevel.ERROR, 485 getName() + ", can't delete Agent" + from, exc); 486 throw new Exception ("Can't delete Agent" + from); 487 } 488 if (ag.isFixed()) 489 removeFixedAgentId(ag.id); 490 agents.remove(ag.getId()); 491 ag.agentFinalize(true); 492 } 493 494 498 void garbage() { 499 if (agents.size() < (NbMaxAgents + fixedAgentIdList.size())) 500 return; 501 502 if (logmon.isLoggable(BasicLevel.INFO)) 503 logmon.log(BasicLevel.INFO, 504 getName() + ", garbage: " + agents.size() + 505 '/' + NbMaxAgents + '+' + fixedAgentIdList.size() + 506 ' ' + now); 507 long deadline = now - NbMaxAgents; 508 Agent[] ag = new Agent[agents.size()]; 509 int i = 0; 510 for (Enumeration e = agents.elements() ; e.hasMoreElements() ;) { 511 ag[i++] = (Agent) e.nextElement(); 512 } 513 for (i--; i>=0; i--) { 514 if ((ag[i].last <= deadline) && (!ag[i].fixed)) { 515 if (logmon.isLoggable(BasicLevel.DEBUG)) 516 logmon.log(BasicLevel.DEBUG, 517 "Agent" + ag[i].id + " [" + ag[i].name + "] garbaged"); 518 agents.remove(ag[i].id); 519 ag[i].agentFinalize(false); 520 ag[i] = null; 521 } 522 } 523 524 logmon.log(BasicLevel.INFO, 525 getName() + ", garbage: " + agents.size()); 526 } 527 528 534 void removeFixedAgentId(AgentId id) throws IOException { 535 fixedAgentIdList.removeElement(id); 536 AgentServer.getTransaction().save(fixedAgentIdList, getName() + ".fixed"); 537 } 538 539 545 void addFixedAgentId(AgentId id) throws IOException { 546 fixedAgentIdList.addElement(id); 547 AgentServer.getTransaction().save(fixedAgentIdList, getName() + ".fixed"); 548 } 549 550 554 AgentId[] getLoadedAgentIdlist() { 555 AgentId list[] = new AgentId[agents.size()]; 556 int i = 0; 557 for (Enumeration e = agents.elements(); e.hasMoreElements() ;) 558 list[i++] = ((Agent) e.nextElement()).id; 559 return list; 560 } 561 562 569 public String dumpAgent(String id) throws Exception { 570 return dumpAgent(AgentId.fromString(id)); 571 } 572 573 581 public String dumpAgent(AgentId id) 582 throws IOException , ClassNotFoundException , Exception { 583 Agent ag = (Agent) agents.get(id); 584 if (ag == null) { 585 ag = Agent.load(id); 586 if (ag == null) { 587 return id.toString() + " unknown"; 588 } 589 } 590 return ag.toString(); 591 } 592 593 614 final Agent load(AgentId id) 615 throws IOException , ClassNotFoundException , Exception { 616 now += 1; 617 618 Agent ag = (Agent) agents.get(id); 619 if (ag == null) { 620 ag = reload(id); 621 garbage(); 622 } 623 ag.last = now; 624 625 return ag; 626 } 627 628 642 final Agent reload(AgentId id) 643 throws IOException , ClassNotFoundException , Exception { 644 Agent ag = null; 645 if ((ag = Agent.load(id)) != null) { 646 try { 647 agent = ag; 651 ag.agentInitialize(false); 652 } catch (Throwable exc) { 653 agent = null; 654 logmon.log(BasicLevel.ERROR, 657 getName() + "Can't initialize Agent" + ag.id + 658 " [" + ag.name + "]", 659 exc); 660 throw new Exception (getName() + "Can't initialize Agent" + ag.id); 661 } 662 if (ag.logmon == null) 663 ag.logmon = Debug.getLogger(fr.dyade.aaa.agent.Debug.A3Agent + 664 ".#" + AgentServer.getServerId()); 665 agents.put(ag.id, ag); 666 if (logmon.isLoggable(BasicLevel.DEBUG)) 667 logmon.log(BasicLevel.DEBUG, 668 getName() + "Agent" + ag.id + " [" + ag.name + "] loaded"); 669 } else { 670 throw new UnknownAgentException(); 671 } 672 673 return ag; 674 } 675 676 683 public void insert(Message msg) { 684 qin.insert(msg); 685 } 686 687 690 public void validate() { 691 qin.validate(); 692 } 693 694 699 public void start() { 700 if (isRunning) return; 701 702 thread = new EngineThread(this); 703 thread.setDaemon(false); 704 705 logmon.log(BasicLevel.DEBUG, getName() + " starting."); 706 707 String rp = AgentServer.getProperty("Engine.recoveryPolicy"); 708 if (rp != null) { 709 for (int i = rpStrings.length; i-- > 0;) { 710 if (rp.equals(rpStrings[i])) { 711 recoveryPolicy = i; 712 break; 713 } 714 } 715 } 716 isRunning = true; 717 canStop = true; 718 thread.start(); 719 720 logmon.log(BasicLevel.DEBUG, getName() + " started."); 721 } 722 723 728 public void stop() { 729 logmon.log(BasicLevel.DEBUG, getName() + ", stops."); 730 isRunning = false; 731 732 if (thread != null) { 733 while (thread.isAlive()) { 734 if (canStop) { 735 736 if (thread.isAlive()) 737 thread.interrupt(); 738 } 739 try { 740 thread.join(1000L); 741 } catch (InterruptedException exc) { 742 continue; 743 } 744 } 745 thread = null; 746 } 747 } 748 749 754 public MessageQueue getQueue() { 755 return qin; 756 } 757 758 764 public boolean isRunning() { 765 return isRunning; 766 } 767 768 771 public void save() throws IOException { 772 if (modified) { 773 stampBuf[0] = (byte)((stamp >>> 24) & 0xFF); 774 stampBuf[1] = (byte)((stamp >>> 16) & 0xFF); 775 stampBuf[2] = (byte)((stamp >>> 8) & 0xFF); 776 stampBuf[3] = (byte)(stamp & 0xFF); 777 AgentServer.getTransaction().saveByteArray(stampBuf, getName()); 778 modified = false; 779 } 780 } 781 782 785 public void restore() throws Exception { 786 stampBuf = AgentServer.getTransaction().loadByteArray(getName()); 787 if (stampBuf == null) { 788 stamp = 0; 789 stampBuf = new byte[4]; 790 modified = true; 791 } else { 792 stamp = ((stampBuf[0] & 0xFF) << 24) + 793 ((stampBuf[1] & 0xFF) << 16) + 794 ((stampBuf[2] & 0xFF) << 8) + 795 (stampBuf[3] & 0xFF); 796 modified = false; 797 } 798 } 799 800 803 public void delete() throws IllegalStateException { 804 throw new IllegalStateException (); 805 } 806 807 protected final int getStamp() { 808 return stamp; 809 } 810 811 protected final void setStamp(int stamp) { 812 modified = true; 813 this.stamp = stamp; 814 } 815 816 protected final void stamp(Message msg) { 817 modified = true; 818 msg.source = AgentServer.getServerId(); 819 msg.dest = AgentServer.getServerId(); 820 msg.stamp = ++stamp; 821 } 822 823 828 public void post(Message msg) throws Exception { 829 if ((msg.not.expiration > 0) && 830 (msg.not.expiration < System.currentTimeMillis())) { 831 if (logmon.isLoggable(BasicLevel.DEBUG)) 832 logmon.log(BasicLevel.DEBUG, 833 getName() + ": removes expired notification " + 834 msg.from + ", " + msg.not); 835 return; 836 } 837 838 if (msg.isPersistent()) { 839 stamp(msg); 840 msg.save(); 841 } 842 843 qin.push(msg); 844 } 845 846 protected boolean needToBeCommited = false; 847 protected long timeout = Long.MAX_VALUE; 848 849 protected void onTimeOut() {} 850 851 854 public void run() { 855 try { 856 main_loop: 857 while (isRunning) { 858 agent = null; 859 canStop = true; 860 861 try { 863 msg = (Message) qin.get(timeout); 864 if (msg == null) { 865 onTimeOut(); 866 continue; 867 } 868 } catch (InterruptedException exc) { 869 continue; 870 } 871 872 canStop = false; 873 if (! isRunning) break; 874 875 if ((msg.not.expiration <= 0) || 876 (msg.not.expiration >= System.currentTimeMillis())) { 877 try { 879 agent = load(msg.to); 880 } catch (UnknownAgentException exc) { 881 logmon.log(BasicLevel.ERROR, 884 getName() + ": Unknown agent, " + msg.to + ".react(" + 885 msg.from + ", " + msg.not + ")"); 886 agent = null; 887 push(AgentId.localId, 888 msg.from, 889 new UnknownAgent(msg.to, msg.not)); 890 } catch (Exception exc) { 891 logmon.log(BasicLevel.ERROR, 894 getName() + ": Can't load agent, " + msg.to + ".react(" + 895 msg.from + ", " + msg.not + ")", 896 exc); 897 agent = null; 898 AgentServer.stop(false); 900 break main_loop; 901 } 902 } else { 903 if (logmon.isLoggable(BasicLevel.DEBUG)) 904 logmon.log(BasicLevel.DEBUG, 905 getName() + ": removes expired notification " + 906 msg.from + ", " + msg.not); 907 } 908 909 if (agent != null) { 910 if (logmon.isLoggable(BasicLevel.DEBUG)) 911 logmon.log(BasicLevel.DEBUG, 912 getName() + ": " + agent + ".react(" + 913 msg.from + ", " + msg.not + ")"); 914 try { 915 agent.react(msg.from, msg.not); 916 } catch (Exception exc) { 917 logmon.log(BasicLevel.ERROR, 918 getName() + ": Uncaught exception during react, " + 919 agent + ".react(" + msg.from + ", " + msg.not + ")", 920 exc); 921 switch (recoveryPolicy) { 922 case RP_EXC_NOT: 923 default: 924 abort(exc); 927 continue; 929 case RP_EXIT: 930 AgentServer.stop(false); 932 break main_loop; 933 } 934 } 935 } 936 937 commit(); 939 } 940 } catch (Throwable exc) { 941 logmon.log(BasicLevel.FATAL, 944 getName() + ": Fatal error", 945 exc); 946 canStop = false; 947 AgentServer.stop(false); 949 } finally { 950 terminate(); 951 logmon.log(BasicLevel.DEBUG, getName() + " stopped."); 952 } 953 } 954 955 964 void commit() throws Exception { 965 AgentServer.getTransaction().begin(); 966 qin.pop(); 968 msg.delete(); 970 msg.free(); 972 dispatch(); 975 if (agent != null) agent.save(); 977 AgentServer.getTransaction().commit(); 978 Channel.validate(); 980 AgentServer.getTransaction().release(); 981 } 982 983 993 void abort(Exception exc) throws Exception { 994 AgentServer.getTransaction().begin(); 995 try { 997 agent = reload(msg.to); 998 } catch (Exception exc2) { 999 logmon.log(BasicLevel.ERROR, 1000 getName() + ", can't reload Agent" + msg.to, exc2); 1001 throw new Exception ("Can't reload Agent" + msg.to); 1002 } 1003 1004 qin.pop(); 1006 msg.delete(); 1008 msg.free(); 1010 clean(); 1012 push(AgentId.localId, 1014 msg.from, 1015 new ExceptionNotification(msg.to, msg.not, exc)); 1016 dispatch(); 1017 AgentServer.getTransaction().commit(); 1018 Channel.validate(); 1020 AgentServer.getTransaction().release(); 1021 } 1022 1023 1028 public String toString() { 1029 StringBuffer strbuf = new StringBuffer (); 1030 1031 strbuf.append('(').append(super.toString()); 1032 strbuf.append(",name=").append(getName()); 1033 strbuf.append(",running=").append(isRunning()); 1034 strbuf.append(",agent=").append(agent).append(')'); 1035 1036 return strbuf.toString(); 1037 } 1038} 1039 | Popular Tags |