1 31 package org.objectweb.proactive.core.body; 32 33 import org.objectweb.proactive.core.ProActiveRuntimeException; 34 import org.objectweb.proactive.core.UniqueID; 35 import org.objectweb.proactive.core.body.future.Future; 36 import org.objectweb.proactive.core.body.future.FuturePool; 37 import org.objectweb.proactive.core.body.message.MessageEventProducerImpl; 38 import org.objectweb.proactive.core.body.reply.Reply; 39 import org.objectweb.proactive.core.body.reply.ReplyReceiver; 40 import org.objectweb.proactive.core.body.request.BlockingRequestQueue; 41 import org.objectweb.proactive.core.body.request.Request; 42 import org.objectweb.proactive.core.body.request.RequestFactory; 43 import org.objectweb.proactive.core.body.request.RequestImpl; 44 import org.objectweb.proactive.core.body.request.RequestQueue; 45 import org.objectweb.proactive.core.body.request.RequestReceiver; 46 import org.objectweb.proactive.core.body.request.ServeException; 47 import org.objectweb.proactive.core.component.request.ComponentRequestImpl; 48 import org.objectweb.proactive.core.event.MessageEvent; 49 import org.objectweb.proactive.core.event.MessageEventListener; 50 import org.objectweb.proactive.core.mop.MethodCall; 51 import org.objectweb.proactive.ext.security.RenegotiateSessionException; 52 53 54 60 85 public abstract class BodyImpl extends AbstractBody 86 implements java.io.Serializable { 87 private static final String INACTIVE_BODY_EXCEPTION_MESSAGE = "Cannot perform this call because this body is inactive"; 91 92 96 97 protected ReplyReceiver replyReceiver; 98 99 100 protected RequestReceiver requestReceiver; 101 protected MessageEventProducerImpl messageEventProducer; 102 protected String JobID; 103 104 108 112 public BodyImpl() { 113 } 114 115 122 public BodyImpl(Object reifiedObject, String nodeURL, 123 MetaObjectFactory factory, String jobID) { 124 super(reifiedObject, nodeURL, factory); 125 this.JobID = jobID; 126 this.requestReceiver = factory.newRequestReceiverFactory() 127 .newRequestReceiver(); 128 this.replyReceiver = factory.newReplyReceiverFactory().newReplyReceiver(); 129 this.messageEventProducer = new MessageEventProducerImpl(); 130 setLocalBodyImpl(new ActiveLocalBodyStrategy(reifiedObject, 131 factory.newRequestQueueFactory().newRequestQueue(bodyID), 132 factory.newRequestFactory())); 133 this.localBodyStrategy.getFuturePool().setOwnerBody(this.getID()); 134 } 135 136 public String getJobID() { 140 return this.JobID; 141 } 142 143 public void addMessageEventListener(MessageEventListener listener) { 147 if (messageEventProducer != null) { 148 messageEventProducer.addMessageEventListener(listener); 149 } 150 } 151 152 public void removeMessageEventListener(MessageEventListener listener) { 153 if (messageEventProducer != null) { 154 messageEventProducer.removeMessageEventListener(listener); 155 } 156 } 157 158 162 168 protected void internalReceiveRequest(Request request) 169 throws java.io.IOException , RenegotiateSessionException { 170 if (messageEventProducer != null) { 171 messageEventProducer.notifyListeners(request, 172 MessageEvent.REQUEST_RECEIVED, bodyID, 173 getRequestQueue().size() + 1); 174 } 175 176 requestReceiver.receiveRequest(request, this); 179 } 180 181 186 protected void internalReceiveReply(Reply reply) throws java.io.IOException { 187 if (messageEventProducer != null) { 188 messageEventProducer.notifyListeners(reply, 189 MessageEvent.REPLY_RECEIVED, bodyID); 190 } 191 replyReceiver.receiveReply(reply, this, getFuturePool()); 192 } 193 194 197 protected void activityStopped() { 198 super.activityStopped(); 199 messageEventProducer = null; 200 setLocalBodyImpl(new InactiveLocalBodyStrategy()); 201 } 202 203 public void setImmediateService(String methodName) 208 throws java.io.IOException { 209 this.requestReceiver.setImmediateService(methodName); 210 } 211 212 private class ActiveLocalBodyStrategy implements LocalBodyStrategy, 219 java.io.Serializable { 220 221 222 protected FuturePool futures; 223 224 225 protected Object reifiedObject; 226 protected BlockingRequestQueue requestQueue; 227 protected RequestFactory internalRequestFactory; 228 private long absoluteSequenceID; 229 230 public ActiveLocalBodyStrategy(Object reifiedObject, 234 BlockingRequestQueue requestQueue, RequestFactory requestFactory) { 235 this.reifiedObject = reifiedObject; 236 this.futures = new FuturePool(); 237 this.requestQueue = requestQueue; 238 this.internalRequestFactory = requestFactory; 239 } 240 241 public FuturePool getFuturePool() { 248 return futures; 249 } 250 251 public BlockingRequestQueue getRequestQueue() { 252 return requestQueue; 253 } 254 255 public Object getReifiedObject() { 256 return reifiedObject; 257 } 258 259 public String getName() { 260 return reifiedObject.getClass().getName(); 261 } 262 263 266 public void serve(Request request) { 267 if (request == null) { 268 return; 269 } 270 try { 271 messageEventProducer.notifyListeners(request, 272 MessageEvent.SERVING_STARTED, bodyID, 273 getRequestQueue().size()); 274 Reply reply = request.serve(BodyImpl.this); 275 if (reply == null) { 276 if (!isActive()) { 277 return; } 279 messageEventProducer.notifyListeners(request, 280 MessageEvent.VOID_REQUEST_SERVED, bodyID, 281 getRequestQueue().size()); 282 return; 283 } 284 UniqueID destinationBodyId = request.getSourceBodyID(); 285 if ((destinationBodyId != null) && 286 (messageEventProducer != null)) { 287 messageEventProducer.notifyListeners(reply, 288 MessageEvent.REPLY_SENT, destinationBodyId, 289 getRequestQueue().size()); 290 } 291 this.getFuturePool().registerDestination(request.getSender()); 292 reply.send(request.getSender()); 293 this.getFuturePool().removeDestination(); 294 } catch (ServeException e) { 295 throw new ProActiveRuntimeException("Exception in serve (Still not handled) : throws killer RuntimeException", 297 e); 298 } catch (java.io.IOException e) { 299 throw new ProActiveRuntimeException("Exception in sending reply (Still not handled) : throws killer RuntimeException", 301 e); 302 } 303 } 304 305 public void sendRequest(MethodCall methodCall, Future future, 306 UniversalBody destinationBody) 307 throws java.io.IOException , RenegotiateSessionException { 308 long sequenceID = getNextSequenceID(); 309 Request request = internalRequestFactory.newRequest(methodCall, 310 BodyImpl.this, future == null, sequenceID); 311 312 if (methodCall.getTag() != null) { 314 if (methodCall.getTag().equals(MethodCall.COMPONENT_TAG)) { 315 request = new ComponentRequestImpl((RequestImpl) request); 316 } 317 } 318 if (future != null) { 319 future.setID(sequenceID); 320 futures.receiveFuture(future); 321 } 322 messageEventProducer.notifyListeners(request, 323 MessageEvent.REQUEST_SENT, destinationBody.getID()); 324 request.send(destinationBody); 325 } 326 327 331 335 private synchronized long getNextSequenceID() { 336 return ++absoluteSequenceID; 337 } 338 } 339 340 private class InactiveLocalBodyStrategy implements LocalBodyStrategy, 342 java.io.Serializable { 343 public InactiveLocalBodyStrategy() { 347 } 348 349 public FuturePool getFuturePool() { 356 return null; 358 } 359 360 public BlockingRequestQueue getRequestQueue() { 361 throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE); 362 } 363 364 public RequestQueue getHighPriorityRequestQueue() { 365 throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE); 366 } 367 368 public Object getReifiedObject() { 369 throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE); 370 } 371 372 public String getName() { 373 return "inactive body"; 374 } 375 376 public void serve(Request request) { 377 throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE); 378 } 379 380 public void sendRequest(MethodCall methodCall, Future future, 381 UniversalBody destinationBody) throws java.io.IOException { 382 throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE); 383 } 384 } 385 386 } 388 | Popular Tags |