KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > proxies > UserAgent


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2004 - 2006 ScalAgent Distributed Technologies
4  * Copyright (C) 2004 France Telecom R&D
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): ScalAgent Distributed Technologies
22  */

23 package org.objectweb.joram.mom.proxies;
24
25 import java.io.IOException JavaDoc;
26 import java.io.ObjectInputStream JavaDoc;
27 import java.io.ObjectOutputStream JavaDoc;
28 import java.util.Collection JavaDoc;
29 import java.util.Enumeration JavaDoc;
30 import java.util.Hashtable JavaDoc;
31 import java.util.Iterator JavaDoc;
32
33 import org.objectweb.joram.shared.client.AbstractJmsReply;
34 import org.objectweb.joram.shared.client.AbstractJmsRequest;
35 import org.objectweb.joram.shared.client.CnxCloseRequest;
36 import org.objectweb.joram.shared.client.JmsRequestGroup;
37 import org.objectweb.joram.shared.client.ProducerMessages;
38 import org.objectweb.joram.shared.client.ServerReply;
39 import org.objectweb.joram.shared.client.MomExceptionReply;
40 import org.objectweb.joram.shared.excepts.MomException;
41
42 import org.objectweb.joram.mom.proxies.ProxyImpl;
43 import org.objectweb.joram.mom.proxies.SendReplyNot;
44 import org.objectweb.joram.mom.proxies.ProxyAgentItf;
45 import org.objectweb.joram.mom.notifications.WakeUpNot;
46
47
48 import fr.dyade.aaa.agent.Agent;
49 import fr.dyade.aaa.agent.AgentId;
50 import fr.dyade.aaa.agent.BagSerializer;
51 import fr.dyade.aaa.agent.Notification;
52 import fr.dyade.aaa.agent.UnknownNotificationException;
53
54 import fr.dyade.aaa.util.Timer;
55 import fr.dyade.aaa.util.TimerTask;
56
57 import fr.dyade.aaa.util.Queue;
58 import fr.dyade.aaa.util.management.MXWrapper;
59 import org.objectweb.joram.shared.JoramTracing;
60 import org.objectweb.util.monolog.api.BasicLevel;
61
62 /**
63  * Class of a user proxy agent.
64  */

65 public class UserAgent extends Agent implements BagSerializer, ProxyAgentItf {
66   /**
67    * All the user requests are delegated to the proxy
68    */

69   private ProxyImpl proxyImpl;
70
71   /**
72    * Table that contains the user connections:
73    * - key = <code>Integer</code> (connection key)
74    * - value = <code></code>
75    */

76   private transient Hashtable JavaDoc connections;
77
78   private transient Hashtable JavaDoc heartBeatTasks;
79
80   /**
81    * Counter of the connection keys
82    */

83   private int keyCounter;
84
85   /**
86    * Creates a new user proxy.
87    *
88    * @see ConnectionManager
89    */

90   public UserAgent() {
91     super(true);
92     init();
93   }
94
95   /**
96    * Creates a new user proxy.
97    *
98    * @see ConnectionManager
99    */

100   public UserAgent(int stamp) {
101     super("JoramAdminProxy", true, stamp);
102     init();
103   }
104
105   private void init() {
106     proxyImpl = new ProxyImpl(this);
107     keyCounter = 0;
108   }
109
110   private transient CleaningTask cleaningTask;
111
112   /** (Re)initializes the agent when (re)loading. */
113   public void agentInitialize(boolean firstTime) throws Exception JavaDoc {
114     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
115       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
116                                 "UserAgent.agentInitialize(" + firstTime + ')');
117
118     super.agentInitialize(firstTime);
119     proxyImpl.initialize(firstTime);
120     cleaningTask = new CleaningTask();
121     cleaningTask.schedule();
122     MXWrapper.registerMBean(proxyImpl, "Joram", getMBeanName());
123   }
124
125   /** Finalizes the agent before it is garbaged. */
126   public void agentFinalize(boolean lastTime) {
127     try {
128       MXWrapper.unregisterMBean("Joram", getMBeanName());
129     } catch (Exception JavaDoc exc) {
130       if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
131         JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "", exc);
132     }
133     super.agentFinalize(lastTime);
134   }
135
136   private String JavaDoc getMBeanName() {
137     return new StringBuffer JavaDoc().append("type=User").append(",name=").append(
138         (name == nullName) ? getId().toString() : name).toString();
139   }
140
141   /**
142    * Overrides the <code>Agent</code> class <code>react</code> method for
143    * providing the JMS client proxy with its specific behaviour.
144    * <p>
145    * A JMS proxy specifically reacts to the following notifications:
146    * <ul>
147    * <li><code>OpenConnectionNot</code></li>
148    * </ul>
149    */

150   public void react(AgentId from, Notification not) throws Exception JavaDoc {
151     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
152       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
153                                 "UserAgent.react(" + from + ',' + not + ')');
154
155     // set agent no save:
156
// the default behavior is transient
157
setNoSave();
158
159     if (not instanceof OpenConnectionNot) {
160       doReact((OpenConnectionNot) not);
161     } else if (not instanceof GetConnectionNot) {
162       doReact((GetConnectionNot) not);
163     } else if (not instanceof CloseConnectionNot) {
164       doReact((CloseConnectionNot) not);
165     } else if (not instanceof ResetCollocatedConnectionsNot) {
166       doReact((ResetCollocatedConnectionsNot) not);
167     } else if (not instanceof SendReplyNot) {
168       doReact((SendReplyNot) not);
169     } else if (not instanceof RequestNot) {
170       doReact((RequestNot) not);
171     } else if (not instanceof ReturnConnectionNot) {
172       doReact((ReturnConnectionNot) not);
173     } else if (not instanceof SendRepliesNot) {
174       doReact((SendRepliesNot) not);
175     } else if (not instanceof ProxyRequestGroupNot) {
176       doReact((ProxyRequestGroupNot) not);
177     } else if (not instanceof WakeUpNot) {
178       try {
179         proxyImpl.cleanPendingMessages(System.currentTimeMillis());
180       } catch (Exception JavaDoc exc) {
181         if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
182           JoramTracing.dbgDestination.log(BasicLevel.ERROR,
183                                           "--- " + this + " Proxy(...)", exc);
184       }
185       if (cleaningTask == null)
186         cleaningTask = new CleaningTask();
187       cleaningTask.schedule();
188     } else {
189       try {
190         proxyImpl.react(from, not);
191       } catch (UnknownNotificationException exc) {
192         super.react(from, not);
193       }
194     }
195   }
196
197   /**
198    * Registers and starts the <code>UserConnection</code>.
199    */

200   private void doReact(OpenConnectionNot not) {
201     // state change, so save.
202
setSave();
203
204     if (connections == null) {
205       connections = new Hashtable JavaDoc();
206       heartBeatTasks = new Hashtable JavaDoc();
207     }
208
209     Integer JavaDoc objKey = new Integer JavaDoc(keyCounter);
210     ConnectionContext ctx;
211     if (not.getReliable()) {
212       ctx = new ReliableConnectionContext(
213           proxyImpl, keyCounter,
214           not.getHeartBeat());
215       connections.put(objKey, ctx);
216     } else {
217       ctx = new StandardConnectionContext(
218           proxyImpl, keyCounter);
219       connections.put(objKey, ctx);
220     }
221
222     if (not.getHeartBeat() > 0) {
223       HeartBeatTask heartBeatTask = new HeartBeatTask(2 * not.getHeartBeat(),
224           objKey);
225       heartBeatTasks.put(objKey, heartBeatTask);
226       heartBeatTask.start();
227     }
228
229     // Differs the reply because the connection key counter
230
// must be saved before the OpenConnectionNot returns.
231
sendTo(getId(), new ReturnConnectionNot(not, ctx));
232     keyCounter++;
233   }
234
235   /**
236    * Differs the reply because the connection key counter
237    * must be saved before the OpenConnectionNot returns.
238    */

239   private void doReact(ReturnConnectionNot not) {
240     not.Return();
241   }
242
243   private void doReact(GetConnectionNot not) {
244     int key = not.getKey();
245     if (connections == null) {
246       not.Throw(new Exception JavaDoc("Connection " + key + " not found"));
247     } else {
248       Integer JavaDoc objKey = new Integer JavaDoc(key);
249       ReliableConnectionContext ctx = (ReliableConnectionContext) connections
250           .get(objKey);
251       if (ctx == null) {
252         not.Throw(new Exception JavaDoc("Connection " + key + " not found"));
253       } else {
254         not.Return(ctx);
255       }
256     }
257   }
258
259   private void doReact(RequestNot not) {
260     Integer JavaDoc key = new Integer JavaDoc(not.getConnectionKey());
261     if (connections != null) {
262       ConnectionContext ctx = (ConnectionContext) connections.get(key);
263       if (ctx != null) {
264         HeartBeatTask heartBeatTask = (HeartBeatTask) heartBeatTasks.get(key);
265         if (heartBeatTask != null) {
266           heartBeatTask.touch();
267         }
268         
269         AbstractJmsRequest request = ctx.getRequest(not.getMessage());
270         proxyImpl.reactToClientRequest(key.intValue(), request);
271         
272         if (ctx.isClosed()) {
273           //CnxCloseRequest request = (CnxCloseRequest) not.getMessage();
274
connections.remove(key);
275           HeartBeatTask hbt = (HeartBeatTask) heartBeatTasks.remove(key);
276           if (hbt != null) {
277             hbt.cancel();
278           }
279         }
280       }
281     }
282     // else should not happen because:
283
// - RequestNot is transient
284
// - RequestNot always follows an OpenConnection or
285
// a GetConnection
286
}
287   
288   private void doReact(ProxyRequestGroupNot not) {
289     RequestNot[] requests = not.getRequests();
290     RequestBuffer rm = new RequestBuffer(this);
291     for (int i = 0; i < requests.length; i++) {
292       RequestNot req = requests[i];
293       Integer JavaDoc key = new Integer JavaDoc(req.getConnectionKey());
294       HeartBeatTask heartBeatTask = (HeartBeatTask) heartBeatTasks.get(key);
295       if (heartBeatTask != null) {
296         heartBeatTask.touch();
297       }
298       ConnectionContext ctx = (ConnectionContext) connections.get(key);
299       if (ctx != null) {
300         AbstractJmsRequest request = ctx.getRequest(req.getMessage());
301         if (request instanceof ProducerMessages) {
302           ProducerMessages pm = (ProducerMessages) request;
303           rm.put(req.getConnectionKey(), pm);
304         } else if (request instanceof JmsRequestGroup) {
305           JmsRequestGroup jrg = (JmsRequestGroup)request;
306           AbstractJmsRequest[] groupedRequests = jrg.getRequests();
307           for (int j = 0; j < groupedRequests.length; j++) {
308             if (groupedRequests[i] instanceof ProducerMessages) {
309               ProducerMessages pm = (ProducerMessages) groupedRequests[i];
310               rm.put(req.getConnectionKey(), pm);
311             } else {
312               proxyImpl.reactToClientRequest(key.intValue(), groupedRequests[i]);
313             }
314           }
315         } else {
316           proxyImpl.reactToClientRequest(key.intValue(), request);
317         }
318       }
319     }
320     rm.flush();
321   }
322
323   private void doReact(CloseConnectionNot not) {
324     if (connections != null) {
325       Integer JavaDoc key = new Integer JavaDoc(not.getKey());
326       // The connection may have already been
327
// explicitely closed by a CnxCloseRequest.
328
if (connections.remove(key) != null) {
329         proxyImpl.reactToClientRequest(not.getKey(), new CnxCloseRequest());
330         heartBeatTasks.remove(key);
331       }
332     }
333     // else should not happen:
334
// 1- CloseConnectionNot is transient
335
// 2- CloseConnectionNot follows an OpenConnectionNot
336
// or a GetConnectionNot
337
}
338
339   private void doReact(ResetCollocatedConnectionsNot not) {
340     if (connections != null) {
341       Collection JavaDoc values = connections.values();
342       Iterator JavaDoc iterator = values.iterator();
343       while (iterator.hasNext()) {
344         Object JavaDoc obj = iterator.next();
345         // Standard connections must be dropped.
346
// Only reliable connections can be recovered.
347
if (obj instanceof StandardConnectionContext) {
348           ConnectionContext cc = (ConnectionContext) obj;
349           proxyImpl.reactToClientRequest(
350               cc.getKey(), new CnxCloseRequest());
351           iterator.remove();
352         }
353       }
354     }
355   }
356
357   private void doReact(SendRepliesNot not) {
358     Enumeration JavaDoc en = not.getReplies();
359     while (en.hasMoreElements()) {
360       SendReplyNot sr = (SendReplyNot) en.nextElement();
361       doReact(sr);
362     }
363   }
364
365   /**
366    * Notification sent by local agents (destinations)
367    * indicating that the proxy can reply to a client.
368    * @param not
369    */

370   private void doReact(SendReplyNot not) {
371     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
372       JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "UserAgent.doReact(" + not + ')');
373     ClientContext cc = proxyImpl.getClientContext(not.getKey());
374     if (cc != null) {
375       if (cc.setReply(not.getRequestId()) == 0) {
376         sendToClient(not.getKey(), new ServerReply(not.getRequestId()));
377       }
378     } else if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) {
379       // Can happen if the connection is closed before the SendReplyNot
380
// arrives.
381
JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
382                                 "UserAgent: unknown client context for " + not);
383     }
384   }
385
386   /**
387    * Sends a notification to the specified agent.
388    *
389    * @param to the identifier of the recipient agent
390    * @param not the notification to send
391    */

392   public void sendNot(AgentId to, Notification not) {
393     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
394       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
395                                 "UserAgent.sendNot(" + to + ',' + not + ')');
396     sendTo(to, not);
397   }
398
399   /**
400    * Sends a reply to the client connected through
401    * the specified connection.
402    *
403    * @param key the key of the connection the client
404    * is connected through.
405    * @param reply the reply to send to the client.
406    */

407   public void sendToClient(int key, AbstractJmsReply reply) {
408     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
409       JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
410                                 "UserAgent.sendToClient(" + key + ',' + reply + ')');
411     Integer JavaDoc objKey = new Integer JavaDoc(key);
412     if (connections != null) {
413       ConnectionContext ctx = (ConnectionContext)connections.get(objKey);
414       if (ctx != null) {
415         ctx.pushReply(reply);
416       }
417     }
418     // else may happen. Drop the reply.
419
}
420
421   /**
422    * Timer task responsible for closing the connection if
423    * it has not sent any requests for the duration 'timeout'.
424    */

425   class HeartBeatTask extends fr.dyade.aaa.util.TimerTask implements
426       java.io.Serializable JavaDoc {
427     private int timeout;
428
429     private Integer JavaDoc key;
430
431     private long lastRequestDate;
432
433     HeartBeatTask(int timeout, Integer JavaDoc key) {
434       this.timeout = timeout;
435       this.key = key;
436     }
437
438     public void run() {
439       long date = System.currentTimeMillis();
440       if ((date - lastRequestDate) > timeout) {
441         if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
442           JoramTracing.dbgProxy.log(BasicLevel.DEBUG,
443                                     "HeartBeatTask: close connection");
444         ConnectionContext ctx = (ConnectionContext) connections.remove(key);
445         heartBeatTasks.remove(key);
446         proxyImpl.reactToClientRequest(key.intValue(), new CnxCloseRequest());
447         MomException exc = new MomException(MomExceptionReply.HBCloseConnection,
448                                             "Connection " + getId() + ':' + key + " closed");
449         ctx.pushError(exc);
450       } else {
451         start();
452       }
453     }
454
455     public void start() {
456       try {
457         ConnectionManager.getTimer().schedule(this, timeout);
458       } catch (Exception JavaDoc exc) {
459         throw new Error JavaDoc(exc.toString());
460       }
461     }
462
463     public void touch() {
464       lastRequestDate = System.currentTimeMillis();
465     }
466   }
467
468   class CleaningTask extends TimerTask {
469     CleaningTask() {
470     }
471     
472     /** Method called when the timer expires. */
473     public void run() {
474       sendTo(getId(), new WakeUpNot());
475     }
476  
477     public void schedule() {
478       long period = proxyImpl.getPeriod();
479
480       if (period != -1) {
481         try {
482           Timer timer = ConnectionManager.getTimer();
483           timer.schedule(this, period);
484         } catch (Exception JavaDoc exc) {
485           if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
486             JoramTracing.dbgDestination.log(BasicLevel.ERROR,
487                                             "--- " + this + " Proxy(...)", exc);
488         }
489       }
490     }
491   }
492
493   public void setNoSave() {
494     if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
495       JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "setNoSave()");
496     
497     super.setNoSave();
498   }
499
500   public void setSave() {
501    if (JoramTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))
502       JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "UserAgent.setSave()");
503     
504     super.setSave();
505   }
506
507   public void readBag(ObjectInputStream JavaDoc in) throws IOException JavaDoc,
508       ClassNotFoundException JavaDoc {
509     connections = (Hashtable JavaDoc) in.readObject();
510     heartBeatTasks = (Hashtable JavaDoc) in.readObject();
511
512     if (heartBeatTasks != null) {
513       // Start the tasks
514
Enumeration JavaDoc tasks = heartBeatTasks.elements();
515       while (tasks.hasMoreElements()) {
516         HeartBeatTask task = (HeartBeatTask) tasks.nextElement();
517         task.start();
518       }
519     }
520
521     proxyImpl.readBag(in);
522   }
523
524   public void writeBag(ObjectOutputStream JavaDoc out) throws IOException JavaDoc {
525     out.writeObject(connections);
526     out.writeObject(heartBeatTasks);
527     proxyImpl.writeBag(out);
528   }
529 }
530
Popular Tags