KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > fr > dyade > aaa > agent > Channel


1 /*
2  * Copyright (C) 2001 - 2005 ScalAgent Distributed Technologies
3  * Copyright (C) 1996 - 2000 BULL
4  * Copyright (C) 1996 - 2000 INRIA
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  */

21 package fr.dyade.aaa.agent;
22
23 import java.io.*;
24 import java.util.*;
25
26 import org.objectweb.util.monolog.api.BasicLevel;
27 import org.objectweb.util.monolog.api.Logger;
28
29 import fr.dyade.aaa.util.*;
30
31 /**
32  * Class <code>Channel</code> realizes the interface for sending messages.
33  * It defines function member SendTo to send a notification to an identified
34  * agent.<p>
35  * Notifications are then routed to a message queue where they are
36  * stored in chronological order. The Channel object is responsible for
37  * localizing the target agent.
38  */

39 public class Channel {
40   static Channel channel = null;
41
42   /**
43    * Creates a new instance of channel (result depends of server type).
44    *
45    * @return the corresponding <code>Channel</code>'s instance.
46    */

47   static Channel newInstance() throws Exception JavaDoc {
48     String JavaDoc cname = System.getProperty("Channel",
49                                       "fr.dyade.aaa.agent.Channel");
50     Class JavaDoc cclass = Class.forName(cname);
51     channel = (Channel) cclass.newInstance();
52     return channel;
53   }
54
55   protected Logger logmon = null;
56
57   /**
58    * Constructs a new <code>Channel</code> object (can only be used by
59    * subclasses).
60    */

61   protected Channel() {
62     consumers = new Vector();
63
64     // Get the logging monitor from current server MonologLoggerFactory
65
logmon = Debug.getLogger(Debug.A3Engine +
66                              ".#" + AgentServer.getServerId());
67     logmon.log(BasicLevel.DEBUG, toString() + " created.");
68   }
69
70   static Vector consumers = null;
71
72   /**
73    * Sends a notification to an agent. It may be used anywhere,
74    * from any object and any thread. However it is best practice to call
75    * the <a HREF="Agent.html#sendTo(AgentId, Notification)"><code>sendTo
76    * </code></a> function defined in class <code>Agent</code> from an agent
77    * code executed during a reaction.<p>
78    * The destination agent receives the notification with a declared null
79    * source agent id, which may be recognized using the <code>isNullId</code>
80    * function of class <code>AgentId</code>.
81    * This is not true when this call is performed during an standard agent
82    * reaction. In that case the current reacting agent, known by the engine,
83    * is provided as source agent.<p>
84    * The notification is immediately validated, that is made persistent,
85    * if it is not sent from an agent reaction.
86    * Be careful, does not use this method in the engine thread, sometime
87    * engine.agent is null and it throws a NullPointerException.
88    *
89    * @param to destination agent.
90    * @param not notification.
91    *
92    * @exception IOException
93    * error when accessing the local persistent storage
94    */

95   public final static void sendTo(AgentId to,
96                   Notification not) {
97 // try {
98
// EngineThread thread = (EngineThread) Thread.currentThread();
99
// // Use the engine's sendTo method that push message in temporary queue
100
// // until the end of current reaction.
101
// thread.engine.push(AgentServer.engine.agent.getId(), to, not);
102
// } catch (ClassCastException exc) {
103
// // Be careful, the destination node use the from.to field to
104
// // get the from node id.
105
// channel.directSendTo(AgentId.localId, to, not);
106
// }
107

108 // if (Class.EngineThread.isAssignable(Thread.currentThread())) {
109
if (Thread.currentThread() == AgentServer.engine.thread) {
110       AgentServer.engine.push(AgentServer.engine.agent.getId(), to, not);
111     } else {
112       channel.directSendTo(AgentId.localId, to, not);
113     }
114   }
115
116   /**
117    * Adds a message in "ready to deliver" list of right consumer. This method
118    * set the logical date of message, push it in the corresponding queue, and
119    * save it.
120    *
121    * @param msg The message to deliver.
122    */

123   static final void post(Message msg) throws Exception JavaDoc {
124     try {
125       MessageConsumer cons = AgentServer.getConsumer(msg.to.getTo());
126       if (! consumers.contains(cons)) {
127         consumers.add(cons);
128       }
129       cons.post(msg);
130     } catch (UnknownServerException exc) {
131       channel.logmon.log(BasicLevel.WARN,
132                          channel.toString() + ", can't post message: " + msg,
133                          exc);
134       // TODO: Post an ErrorNotification
135
}
136   }
137
138   /**
139    * Save state of all modified consumer.
140    */

141   static final void save() throws IOException {
142     for (int i=0; i<consumers.size(); i++) {
143       ((MessageConsumer) consumers.elementAt(i)).save();
144     }
145   }
146
147   /**
148    * Validates all messages previously dispatched. There is two separate
149    * methods for dispatch and validate messages because of use of transactions
150    * in <code>Engine</code>. The messages are only validated in queues after
151    * the commit of transaction.
152    * <p><hr>
153    * Be careful, this method must only be used during a transaction in
154    * order to ensure the mutual exclusion.
155    *
156    * @see Engine#commit()
157    */

158   static final void validate() {
159     for (int i=0; i<consumers.size(); i++) {
160       ((MessageConsumer) consumers.elementAt(i)).validate();
161     }
162     consumers.clear();
163   }
164
165   /**
166    * Sends an immediately validated notification to an agent. Post and
167    * directly dispatches the notification to the <code>MessageConsumer</code>.
168    * Does not queue the notification in the local message queue.<p>
169    * Normally used uniquely in <a HREF="#sendTo(AgentId, Notification)"><code>
170    * sendTo</code></a> method.
171    *
172    * This function is designed to be indirectly used by secondary threads,
173    * such as <code>Driver</code>s.
174    *
175    * @param from source agent.
176    * @param to destination agent.
177    * @param not notification.
178    *
179    * @exception IOException
180    * error when accessing the local persistent storage
181    */

182   void directSendTo(AgentId from,
183             AgentId to,
184             Notification not) {
185     MessageConsumer consumer = null;
186     Message msg = null;
187
188     if (logmon.isLoggable(BasicLevel.DEBUG))
189       logmon.log(BasicLevel.DEBUG,
190                  toString() + ".directSendTo(" + from + ", " + to + ", " + not + ")");
191
192     if ((to == null) || to.isNullId())
193       return;
194
195     msg = Message.alloc(from, to, not);
196     try {
197       consumer = AgentServer.getConsumer(to.to);
198     } catch (UnknownServerException exc) {
199       channel.logmon.log(BasicLevel.ERROR,
200                          toString() + ", can't post message: " + msg,
201                          exc);
202       // TODO: Post an ErrorNotification ?
203
return;
204     }
205
206     try {
207       AgentServer.getTransaction().begin();
208       consumer.post(msg);
209       consumer.save();
210       AgentServer.getTransaction().commit();
211       // then commit and validate the message.
212
consumer.validate();
213       AgentServer.getTransaction().release();
214     } catch (Exception JavaDoc exc) {
215       // Should never happened (IOException or ClassNotFoundException).
216
logmon.log(BasicLevel.FATAL,
217                  toString() + ", Transaction problem.", exc);
218       throw new TransactionError(toString() + ", " + exc.getMessage());
219     }
220   }
221
222   /**
223    * Returns a string representation of this <code>Channel</code> object.
224    *
225    * @return A string representation of this object.
226    */

227   public final String JavaDoc toString() {
228     StringBuffer JavaDoc strbuf = new StringBuffer JavaDoc();
229     strbuf.append("Channel#").append(AgentServer.getServerId());
230     return strbuf.toString();
231   }
232 }
233
Popular Tags