1 31 package org.objectweb.proactive.ic2d.spy; 32 33 import org.objectweb.proactive.Body; 34 import org.objectweb.proactive.core.UniqueID; 35 import org.objectweb.proactive.core.body.BodyMap; 36 import org.objectweb.proactive.core.body.LocalBodyStore; 37 import org.objectweb.proactive.core.body.UniversalBody; 38 import org.objectweb.proactive.core.body.future.FutureProxy; 39 import org.objectweb.proactive.core.event.BodyEvent; 40 import org.objectweb.proactive.core.event.BodyEventListener; 41 import org.objectweb.proactive.core.event.FutureEvent; 42 import org.objectweb.proactive.core.event.FutureEventListener; 43 import org.objectweb.proactive.core.event.MessageEvent; 44 import org.objectweb.proactive.core.event.MessageEventListener; 45 import org.objectweb.proactive.core.event.RequestQueueEvent; 46 import org.objectweb.proactive.core.event.RequestQueueEventListener; 47 48 51 public class SpyEventManager { 52 53 protected static final int MASTER_SPY_CHECK_INTERVAL = 100000; 55 protected MessageEventListener messageEventListener; 56 57 protected RequestQueueEventListener requestQueueEventListener; 58 59 protected BodyEventListener bodyEventListener; 60 61 protected FutureEventListener futureEventListener; 62 63 67 protected java.util.LinkedList requestSentEventsList; 68 protected java.util.LinkedList replyReceivedEventsList; 69 70 71 74 protected java.util.List pendingSpyEvents; 75 76 protected UniqueID masterSpyID; 77 78 81 protected long lastTimeMasterSpyCheck; 82 83 87 public SpyEventManager(UniqueID masterSpyID) { 88 this.masterSpyID = masterSpyID; 89 messageEventListener = new MyMessageEventListener(); 90 requestQueueEventListener = new MyRequestQueueEventListener(); 91 bodyEventListener = new MyBodyEventListener(); 92 futureEventListener = new MyFutureEventListener(); 93 94 pendingSpyEvents = new java.util.ArrayList (); 96 requestSentEventsList = new java.util.LinkedList (); 98 replyReceivedEventsList = new java.util.LinkedList (); 100 101 lastTimeMasterSpyCheck = System.currentTimeMillis(); 102 } 103 104 105 106 110 public SpyEvent[] collectPendingSpyEvents() { 111 synchronized (pendingSpyEvents) { 112 SpyEvent[] events = new SpyEvent[pendingSpyEvents.size()]; 113 if (pendingSpyEvents.size() > 0) { 114 pendingSpyEvents.toArray(events); 115 pendingSpyEvents.clear(); 116 } 117 return events; 118 } 119 } 120 121 122 126 public void addMessageEventListener(Body body) { 127 body.addMessageEventListener(messageEventListener); 128 } 129 130 131 public void removeMessageEventListener(Body body) { 132 body.removeMessageEventListener(messageEventListener); 133 } 134 135 136 137 141 void addBodyEventListener() { 142 LocalBodyStore.getInstance().addBodyEventListener(bodyEventListener); 143 } 144 145 146 void removeBodyEventListener() { 147 LocalBodyStore.getInstance().removeBodyEventListener(bodyEventListener); 148 } 149 150 void addFutureEventListener() { 151 FutureProxy.getFutureEventProducer().addFutureEventListener(futureEventListener); 152 } 153 154 155 void removeFutureEventListener() { 156 FutureProxy.getFutureEventProducer().addFutureEventListener(futureEventListener); 157 } 158 159 160 SpyEvent[] createSpyEventForExistingBodies(Body spyBody) { 161 BodyMap knownBodies = LocalBodyStore.getInstance().getLocalBodies(); 162 knownBodies.removeBody(masterSpyID); 164 SpyEvent[] spyEvents = new SpyEvent[knownBodies.size()]; int i = 0; 166 java.util.Iterator bodiesIterator = knownBodies.bodiesIterator(); 167 while (bodiesIterator.hasNext()) { 168 Body activeObjectBody = (Body) bodiesIterator.next(); 169 if (activeObjectBody.isActive()) { 170 addListenersToNewBody(activeObjectBody); 171 spyEvents[i] = new BodyCreationSpyEvent(activeObjectBody.getID(), activeObjectBody.getNodeURL(), activeObjectBody.getName(), activeObjectBody.isActive()); 172 i++; 173 } 174 } 175 if (i < knownBodies.size()) { 176 SpyEvent[] newSpyEvents = new SpyEvent[i]; 177 if (i > 0) { 178 System.arraycopy(spyEvents, 0, newSpyEvents, 0, i); 179 } 180 return newSpyEvents; 181 } else { 182 return spyEvents; 183 } 184 } 185 186 190 private void addListenersToNewBody(Body body) { 191 if (body.getRequestQueue() != null) 192 body.getRequestQueue().addRequestQueueEventListener(requestQueueEventListener); 193 } 196 197 198 private void removeListenersFromDeadBody(Body body) { 199 if (body.getRequestQueue() != null) 200 body.getRequestQueue().removeRequestQueueEventListener(requestQueueEventListener); 201 removeMessageEventListener(body); 202 } 203 204 205 private SpyMessageEvent checkReplyReceivedEvent(MessageEvent requestSentEvent) { 207 long sequence = requestSentEvent.getSequenceNumber(); 208 UniqueID requestSenderID = requestSentEvent.getSourceBodyID(); 209 UniqueID requestReceiverID = requestSentEvent.getDestinationBodyID(); 210 synchronized (replyReceivedEventsList) { 211 java.util.ListIterator l = replyReceivedEventsList.listIterator(); 212 while (l.hasNext()) { 213 MessageEvent replyReceivedEvent = (MessageEvent) l.next(); 214 if (sequence == replyReceivedEvent.getSequenceNumber()) { 215 } 216 if ((sequence == replyReceivedEvent.getSequenceNumber()) && (requestSenderID.equals(replyReceivedEvent.getDestinationBodyID())) && (requestReceiverID.equals(replyReceivedEvent.getSourceBodyID()))) { 217 l.remove(); 218 return new SpyMessageEvent(SpyMessageEvent.REPLY_RECEIVED_MESSAGE_TYPE, replyReceivedEvent); 219 } 220 } 221 requestSentEventsList.add(requestSentEvent); 222 return null; 223 } 224 } 225 226 227 private boolean checkRequestSentEvent(MessageEvent replyReceivedEvent) { 229 long sequence = replyReceivedEvent.getSequenceNumber(); 230 UniqueID replySenderID = replyReceivedEvent.getSourceBodyID(); 231 UniqueID replyReceiverID = replyReceivedEvent.getDestinationBodyID(); 232 synchronized (replyReceivedEventsList) { 233 java.util.ListIterator l = requestSentEventsList.listIterator(); 234 while (l.hasNext()) { 235 MessageEvent requestSentEvent = (MessageEvent)l.next(); 236 if (sequence == requestSentEvent.getSequenceNumber()) { 237 } 238 if ((sequence == requestSentEvent.getSequenceNumber()) && (replySenderID.equals(requestSentEvent.getDestinationBodyID())) && (replyReceiverID.equals(requestSentEvent.getSourceBodyID()))) { 239 l.remove(); 240 return true; 241 } 242 } 243 replyReceivedEventsList.add(replyReceivedEvent); 244 return false; 245 } 246 } 247 248 249 private void addEvent(SpyEvent event) { 250 synchronized(pendingSpyEvents) { 252 pendingSpyEvents.add(event); 253 } 254 checkMasterSpy(); 255 } 256 257 258 private void checkMasterSpy() { 259 if (lastTimeMasterSpyCheck + MASTER_SPY_CHECK_INTERVAL < System.currentTimeMillis()) return; 260 lastTimeMasterSpyCheck = System.currentTimeMillis(); 261 if (LocalBodyStore.getInstance().getLocalBody(masterSpyID) != null) return; 262 removeBodyEventListener(); 265 synchronized(pendingSpyEvents) { 266 java.util.ListIterator l = pendingSpyEvents.listIterator(); 267 while (l.hasNext()) { 268 SpyEvent event = (SpyEvent)l.next(); 269 UniqueID bodyID = event.getBodyID(); 270 Body body = LocalBodyStore.getInstance().getLocalBody(bodyID); 271 if (body != null) { 272 removeListenersFromDeadBody(body); 273 } 274 } 275 pendingSpyEvents.clear(); 276 } 277 } 278 279 280 281 285 286 289 private class MyBodyEventListener implements BodyEventListener { 290 public void bodyCreated(BodyEvent event) { 294 Body body = checkBody(event.getBody()); 295 if (body == null) return; 296 addEvent(new BodyCreationSpyEvent(body.getID(), body.getNodeURL(), body.getName(), body.isActive())); 298 addListenersToNewBody(body); 299 } 300 301 public void bodyDestroyed(BodyEvent event) { 302 Body body = checkBody(event.getBody()); 303 if (body == null) return; 304 addEvent(new BodySpyEvent(body.getID(), false, false)); 306 removeListenersFromDeadBody(body); 307 } 308 309 public void bodyChanged(BodyEvent event) { 310 Body body = checkBody(event.getBody()); 311 if (body == null) return; 312 addEvent(new BodySpyEvent(body.getID(), body.isActive(), body.isAlive())); 314 removeListenersFromDeadBody(body); 315 } 316 317 private Body checkBody(UniversalBody uBody) { 318 if (! (uBody instanceof Body)) return null; 319 Body body = (Body) uBody; 320 UniqueID bodyID = body.getID(); 321 if (masterSpyID.equals(bodyID)) return null; 322 return body; 323 } 324 325 } 327 328 329 332 private class MyRequestQueueEventListener implements RequestQueueEventListener { 333 public void requestQueueModified(RequestQueueEvent event) { 337 if (event.getType() == RequestQueueEvent.WAIT_FOR_REQUEST) 338 addEvent(new SpyEvent(SpyEvent.OBJECT_WAIT_FOR_REQUEST_TYPE, event.getOwnerID())); 339 } 340 } 342 343 346 private class MyFutureEventListener implements FutureEventListener { 347 public void waitingForFuture(FutureEvent event) { 351 addEvent(new SpyFutureEvent(SpyEvent.OBJECT_WAIT_BY_NECESSITY_TYPE, event)); 352 } 353 354 public void receivedFutureResult(FutureEvent event) { 355 addEvent(new SpyFutureEvent(SpyEvent.OBJECT_RECEIVED_FUTURE_RESULT_TYPE, event)); 356 } 357 358 } 360 361 364 private class MyMessageEventListener implements MessageEventListener { 365 public void requestSent(MessageEvent event) { 369 addEvent(new SpyMessageEvent(SpyEvent.REQUEST_SENT_MESSAGE_TYPE, event)); 370 if (! event.isOneWay()) { 372 SpyMessageEvent replyEvent = checkReplyReceivedEvent(event); 374 if (replyEvent != null) { 375 addEvent(replyEvent); 376 } 377 } 378 } 379 380 public void requestReceived(MessageEvent event) { 381 addEvent(new SpyMessageEvent(SpyEvent.REQUEST_RECEIVED_MESSAGE_TYPE, event)); 382 } 383 384 public void replySent(MessageEvent event) { 385 addEvent(new SpyMessageEvent(SpyEvent.REPLY_SENT_MESSAGE_TYPE, event)); 386 } 387 388 public void replyReceived(MessageEvent event) { 389 if (checkRequestSentEvent(event)) { 390 addEvent(new SpyMessageEvent(SpyEvent.REPLY_RECEIVED_MESSAGE_TYPE, event)); 391 } 392 } 393 394 public void voidRequestServed(MessageEvent event) { 395 addEvent(new SpyMessageEvent(SpyEvent.VOID_REQUEST_SERVED_TYPE, event)); 396 } 397 398 public void servingStarted(MessageEvent event) { 399 addEvent(new SpyMessageEvent(SpyEvent.SERVING_STARTED_TYPE, event)); 400 } 401 402 403 } 405 } | Popular Tags |