KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > proactive > ic2d > spy > SpyEventManager


1 /*
2 * ################################################################
3 *
4 * ProActive: The Java(TM) library for Parallel, Distributed,
5 * Concurrent computing with Security and Mobility
6 *
7 * Copyright (C) 1997-2002 INRIA/University of Nice-Sophia Antipolis
8 * Contact: proactive-support@inria.fr
9 *
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License as published by the Free Software Foundation; either
13 * version 2.1 of the License, or any later version.
14 *
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with this library; if not, write to the Free Software
22 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
23 * USA
24 *
25 * Initial developer(s): The ProActive Team
26 * http://www.inria.fr/oasis/ProActive/contacts.html
27 * Contributor(s):
28 *
29 * ################################################################
30 */

31 package org.objectweb.proactive.ic2d.spy;
32
33 import org.objectweb.proactive.Body;
34 import org.objectweb.proactive.core.UniqueID;
35 import org.objectweb.proactive.core.body.BodyMap;
36 import org.objectweb.proactive.core.body.LocalBodyStore;
37 import org.objectweb.proactive.core.body.UniversalBody;
38 import org.objectweb.proactive.core.body.future.FutureProxy;
39 import org.objectweb.proactive.core.event.BodyEvent;
40 import org.objectweb.proactive.core.event.BodyEventListener;
41 import org.objectweb.proactive.core.event.FutureEvent;
42 import org.objectweb.proactive.core.event.FutureEventListener;
43 import org.objectweb.proactive.core.event.MessageEvent;
44 import org.objectweb.proactive.core.event.MessageEventListener;
45 import org.objectweb.proactive.core.event.RequestQueueEvent;
46 import org.objectweb.proactive.core.event.RequestQueueEventListener;
47
48 /**
49  * Helper class for the Spy that is listener of all objects.
50  */

51 public class SpyEventManager {
52
53   protected static final int MASTER_SPY_CHECK_INTERVAL = 100000; // 100 seconds
54

55   protected MessageEventListener messageEventListener;
56   
57   protected RequestQueueEventListener requestQueueEventListener;
58   
59   protected BodyEventListener bodyEventListener;
60   
61   protected FutureEventListener futureEventListener;
62   
63   /**
64    * Log of all the RequestSent messages received
65    * to perform a search when receiving a ReplyReceive
66    */

67   protected java.util.LinkedList JavaDoc requestSentEventsList;
68   protected java.util.LinkedList JavaDoc replyReceivedEventsList;
69
70   
71   /**
72    * Vector of pending messages
73    */

74   protected java.util.List JavaDoc pendingSpyEvents;
75   
76   protected UniqueID masterSpyID;
77   
78   /**
79    * Every so often this Manager check if its master spy is still running ok
80    */

81    protected long lastTimeMasterSpyCheck;
82
83   //
84
// -- CONSTRUCTORS -----------------------------------------------
85
//
86

87   public SpyEventManager(UniqueID masterSpyID) {
88     this.masterSpyID = masterSpyID;
89     messageEventListener = new MyMessageEventListener();
90     requestQueueEventListener = new MyRequestQueueEventListener();
91     bodyEventListener = new MyBodyEventListener();
92     futureEventListener = new MyFutureEventListener();
93     
94     // Initialises the list of pending messages
95
pendingSpyEvents = new java.util.ArrayList JavaDoc();
96     // Initialize the list of RequestSent
97
requestSentEventsList = new java.util.LinkedList JavaDoc();
98     // Initialize the list of ReplyReceived
99
replyReceivedEventsList = new java.util.LinkedList JavaDoc();
100     
101     lastTimeMasterSpyCheck = System.currentTimeMillis();
102   }
103
104
105
106   //
107
// -- PUBLIC METHODS -----------------------------------------------
108
//
109

110   public SpyEvent[] collectPendingSpyEvents() {
111     synchronized (pendingSpyEvents) {
112       SpyEvent[] events = new SpyEvent[pendingSpyEvents.size()];
113       if (pendingSpyEvents.size() > 0) {
114         pendingSpyEvents.toArray(events);
115         pendingSpyEvents.clear();
116       }
117       return events;
118     }
119   }
120
121
122   //
123
// -- ADD / REMOVE LISTENERS -----------------------------------------------
124
//
125

126   public void addMessageEventListener(Body body) {
127     body.addMessageEventListener(messageEventListener);
128   }
129
130
131   public void removeMessageEventListener(Body body) {
132     body.removeMessageEventListener(messageEventListener);
133   }
134
135
136
137   //
138
// -- FRIENDLY METHODS -----------------------------------------------
139
//
140

141   void addBodyEventListener() {
142     LocalBodyStore.getInstance().addBodyEventListener(bodyEventListener);
143   }
144
145
146   void removeBodyEventListener() {
147     LocalBodyStore.getInstance().removeBodyEventListener(bodyEventListener);
148   }
149
150   void addFutureEventListener() {
151     FutureProxy.getFutureEventProducer().addFutureEventListener(futureEventListener);
152   }
153
154
155   void removeFutureEventListener() {
156     FutureProxy.getFutureEventProducer().addFutureEventListener(futureEventListener);
157   }
158
159
160   SpyEvent[] createSpyEventForExistingBodies(Body spyBody) {
161     BodyMap knownBodies = LocalBodyStore.getInstance().getLocalBodies();
162     // remove our body
163
knownBodies.removeBody(masterSpyID);
164     SpyEvent[] spyEvents = new SpyEvent[knownBodies.size()]; // messages to send bufferized
165
int i = 0;
166     java.util.Iterator JavaDoc bodiesIterator = knownBodies.bodiesIterator();
167     while (bodiesIterator.hasNext()) {
168       Body activeObjectBody = (Body) bodiesIterator.next();
169       if (activeObjectBody.isActive()) {
170         addListenersToNewBody(activeObjectBody);
171         spyEvents[i] = new BodyCreationSpyEvent(activeObjectBody.getID(), activeObjectBody.getNodeURL(), activeObjectBody.getName(), activeObjectBody.isActive());
172         i++;
173       }
174     }
175     if (i < knownBodies.size()) {
176       SpyEvent[] newSpyEvents = new SpyEvent[i];
177       if (i > 0) {
178         System.arraycopy(spyEvents, 0, newSpyEvents, 0, i);
179       }
180       return newSpyEvents;
181     } else {
182       return spyEvents;
183     }
184   }
185
186   //
187
// -- PRIVATE METHODS -----------------------------------------------
188
//
189

190   private void addListenersToNewBody(Body body) {
191     if (body.getRequestQueue() != null)
192       body.getRequestQueue().addRequestQueueEventListener(requestQueueEventListener);
193     // we don't add it by default but wait until the order is sent
194
//addMessageEventListener(body);
195
}
196
197
198   private void removeListenersFromDeadBody(Body body) {
199     if (body.getRequestQueue() != null)
200       body.getRequestQueue().removeRequestQueueEventListener(requestQueueEventListener);
201     removeMessageEventListener(body);
202   }
203
204
205   // call when a request is sent
206
private SpyMessageEvent checkReplyReceivedEvent(MessageEvent requestSentEvent) {
207     long sequence = requestSentEvent.getSequenceNumber();
208     UniqueID requestSenderID = requestSentEvent.getSourceBodyID();
209     UniqueID requestReceiverID = requestSentEvent.getDestinationBodyID();
210     synchronized (replyReceivedEventsList) {
211       java.util.ListIterator JavaDoc l = replyReceivedEventsList.listIterator();
212       while (l.hasNext()) {
213         MessageEvent replyReceivedEvent = (MessageEvent) l.next();
214         if (sequence == replyReceivedEvent.getSequenceNumber()) {
215         }
216         if ((sequence == replyReceivedEvent.getSequenceNumber()) && (requestSenderID.equals(replyReceivedEvent.getDestinationBodyID())) && (requestReceiverID.equals(replyReceivedEvent.getSourceBodyID()))) {
217           l.remove();
218           return new SpyMessageEvent(SpyMessageEvent.REPLY_RECEIVED_MESSAGE_TYPE, replyReceivedEvent);
219         }
220       }
221       requestSentEventsList.add(requestSentEvent);
222       return null;
223     }
224   }
225
226
227   // call when a reply is received
228
private boolean checkRequestSentEvent(MessageEvent replyReceivedEvent) {
229     long sequence = replyReceivedEvent.getSequenceNumber();
230     UniqueID replySenderID = replyReceivedEvent.getSourceBodyID();
231     UniqueID replyReceiverID = replyReceivedEvent.getDestinationBodyID();
232     synchronized (replyReceivedEventsList) {
233       java.util.ListIterator JavaDoc l = requestSentEventsList.listIterator();
234       while (l.hasNext()) {
235         MessageEvent requestSentEvent = (MessageEvent)l.next();
236         if (sequence == requestSentEvent.getSequenceNumber()) {
237         }
238         if ((sequence == requestSentEvent.getSequenceNumber()) && (replySenderID.equals(requestSentEvent.getDestinationBodyID())) && (replyReceiverID.equals(requestSentEvent.getSourceBodyID()))) {
239           l.remove();
240           return true;
241         }
242       }
243       replyReceivedEventsList.add(replyReceivedEvent);
244       return false;
245     }
246   }
247
248
249   private void addEvent(SpyEvent event) {
250     //System.out.println("add event evt="+event+" size="+pendingSpyEvents.size());
251
synchronized(pendingSpyEvents) {
252       pendingSpyEvents.add(event);
253     }
254     checkMasterSpy();
255   }
256   
257   
258   private void checkMasterSpy() {
259     if (lastTimeMasterSpyCheck + MASTER_SPY_CHECK_INTERVAL < System.currentTimeMillis()) return;
260     lastTimeMasterSpyCheck = System.currentTimeMillis();
261     if (LocalBodyStore.getInstance().getLocalBody(masterSpyID) != null) return;
262     // the master Spy is gone : we have no more reason to exist.
263
// we deregister from everybody
264
removeBodyEventListener();
265     synchronized(pendingSpyEvents) {
266       java.util.ListIterator JavaDoc l = pendingSpyEvents.listIterator();
267       while (l.hasNext()) {
268         SpyEvent event = (SpyEvent)l.next();
269         UniqueID bodyID = event.getBodyID();
270         Body body = LocalBodyStore.getInstance().getLocalBody(bodyID);
271         if (body != null) {
272           removeListenersFromDeadBody(body);
273         }
274       }
275       pendingSpyEvents.clear();
276     }
277   }
278   
279   
280
281   //
282
// -- INNER CLASSES -----------------------------------------------
283
//
284

285    
286   /**
287    * BodyEventListener
288    */

289   private class MyBodyEventListener implements BodyEventListener {
290     //
291
// -- implements BodyEventListener -----------------------------------------------
292
//
293
public void bodyCreated(BodyEvent event) {
294       Body body = checkBody(event.getBody());
295       if (body == null) return;
296       //System.out.println("bodyCreated name="+body.getNodeURL());
297
addEvent(new BodyCreationSpyEvent(body.getID(), body.getNodeURL(), body.getName(), body.isActive()));
298       addListenersToNewBody(body);
299     }
300     
301     public void bodyDestroyed(BodyEvent event) {
302       Body body = checkBody(event.getBody());
303       if (body == null) return;
304       //System.out.println("bodyDestroyed name="+body.getNodeURL());
305
addEvent(new BodySpyEvent(body.getID(), false, false));
306       removeListenersFromDeadBody(body);
307     }
308     
309     public void bodyChanged(BodyEvent event) {
310       Body body = checkBody(event.getBody());
311       if (body == null) return;
312       //System.out.println("bodyChanged name="+body.getNodeURL()+" isActive="+body.isActive());
313
addEvent(new BodySpyEvent(body.getID(), body.isActive(), body.isAlive()));
314       removeListenersFromDeadBody(body);
315     }
316     
317     private Body checkBody(UniversalBody uBody) {
318       if (! (uBody instanceof Body)) return null;
319       Body body = (Body) uBody;
320       UniqueID bodyID = body.getID();
321       if (masterSpyID.equals(bodyID)) return null;
322       return body;
323     }
324     
325   } // end inner class MyBodyEventListener
326

327
328
329   /**
330    * MyRequestQueueEventListener
331    */

332   private class MyRequestQueueEventListener implements RequestQueueEventListener {
333     //
334
// -- implements RequestQueueEventListener -----------------------------------------------
335
//
336
public void requestQueueModified(RequestQueueEvent event) {
337       if (event.getType() == RequestQueueEvent.WAIT_FOR_REQUEST)
338         addEvent(new SpyEvent(SpyEvent.OBJECT_WAIT_FOR_REQUEST_TYPE, event.getOwnerID()));
339     }
340   } // end inner class MyRequestQueueEventListener
341

342
343   /**
344    * MyFutureEventListener
345    */

346   private class MyFutureEventListener implements FutureEventListener {
347     //
348
// -- implements FutureEventListener -----------------------------------------------
349
//
350
public void waitingForFuture(FutureEvent event) {
351       addEvent(new SpyFutureEvent(SpyEvent.OBJECT_WAIT_BY_NECESSITY_TYPE, event));
352     }
353
354     public void receivedFutureResult(FutureEvent event) {
355       addEvent(new SpyFutureEvent(SpyEvent.OBJECT_RECEIVED_FUTURE_RESULT_TYPE, event));
356     }
357
358   } // end inner class MyRequestQueueEventListener
359

360
361   /**
362    * RequestReceiverListener
363    */

364   private class MyMessageEventListener implements MessageEventListener {
365     //
366
// -- implements MessageEventListener -----------------------------------------------
367
//
368
public void requestSent(MessageEvent event) {
369       addEvent(new SpyMessageEvent(SpyEvent.REQUEST_SENT_MESSAGE_TYPE, event));
370       //Synchro purpose
371
if (! event.isOneWay()) {
372         // check is the reply event has already been received for this request
373
SpyMessageEvent replyEvent = checkReplyReceivedEvent(event);
374         if (replyEvent != null) {
375           addEvent(replyEvent);
376         }
377       }
378     }
379     
380     public void requestReceived(MessageEvent event) {
381       addEvent(new SpyMessageEvent(SpyEvent.REQUEST_RECEIVED_MESSAGE_TYPE, event));
382     }
383     
384     public void replySent(MessageEvent event) {
385       addEvent(new SpyMessageEvent(SpyEvent.REPLY_SENT_MESSAGE_TYPE, event));
386     }
387     
388     public void replyReceived(MessageEvent event) {
389       if (checkRequestSentEvent(event)) {
390         addEvent(new SpyMessageEvent(SpyEvent.REPLY_RECEIVED_MESSAGE_TYPE, event));
391       }
392     }
393
394     public void voidRequestServed(MessageEvent event) {
395       addEvent(new SpyMessageEvent(SpyEvent.VOID_REQUEST_SERVED_TYPE, event));
396     }
397     
398     public void servingStarted(MessageEvent event) {
399       addEvent(new SpyMessageEvent(SpyEvent.SERVING_STARTED_TYPE, event));
400     }
401     
402
403   } // end inner class MyMessageEventListener
404

405 }
Popular Tags