KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > proactive > core > body > BodyImpl


1 /*
2  * ################################################################
3  *
4  * ProActive: The Java(TM) library for Parallel, Distributed,
5  * Concurrent computing with Security and Mobility
6  *
7  * Copyright (C) 1997-2002 INRIA/University of Nice-Sophia Antipolis
8  * Contact: proactive-support@inria.fr
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License as published by the Free Software Foundation; either
13  * version 2.1 of the License, or any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18  * Lesser General Public License for more details.
19  *
20  * You should have received a copy of the GNU Lesser General Public
21  * License along with this library; if not, write to the Free Software
22  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
23  * USA
24  *
25  * Initial developer(s): The ProActive Team
26  * http://www.inria.fr/oasis/ProActive/contacts.html
27  * Contributor(s):
28  *
29  * ################################################################
30  */

31 package org.objectweb.proactive.core.body;
32
33 import org.objectweb.proactive.core.ProActiveRuntimeException;
34 import org.objectweb.proactive.core.UniqueID;
35 import org.objectweb.proactive.core.body.future.Future;
36 import org.objectweb.proactive.core.body.future.FuturePool;
37 import org.objectweb.proactive.core.body.message.MessageEventProducerImpl;
38 import org.objectweb.proactive.core.body.reply.Reply;
39 import org.objectweb.proactive.core.body.reply.ReplyReceiver;
40 import org.objectweb.proactive.core.body.request.BlockingRequestQueue;
41 import org.objectweb.proactive.core.body.request.Request;
42 import org.objectweb.proactive.core.body.request.RequestFactory;
43 import org.objectweb.proactive.core.body.request.RequestImpl;
44 import org.objectweb.proactive.core.body.request.RequestQueue;
45 import org.objectweb.proactive.core.body.request.RequestReceiver;
46 import org.objectweb.proactive.core.body.request.ServeException;
47 import org.objectweb.proactive.core.component.request.ComponentRequestImpl;
48 import org.objectweb.proactive.core.event.MessageEvent;
49 import org.objectweb.proactive.core.event.MessageEventListener;
50 import org.objectweb.proactive.core.mop.MethodCall;
51 import org.objectweb.proactive.ext.security.RenegotiateSessionException;
52
53
54 /**
55  * @author fhuet
56  *
57  *
58  *
59  */

60 /**
61  * <i><font size="-1" color="#FF0000">**For internal use only** </font></i><br>
62  * <p>
63  * This class gives a common implementation of the Body interface. It provides all
64  * the non specific behavior allowing sub-class to write the detail implementation.
65  * </p><p>
66  * Each body is identify by an unique identifier.
67  * </p><p>
68  * All active bodies that get created in one JVM register themselves into a table that allows
69  * to tack them done. The registering and deregistering is done by the AbstractBody and
70  * the table is managed here as well using some static methods.
71  * </p><p>
72  * In order to let somebody customize the body of an active object without subclassing it,
73  * AbstractBody delegates lot of tasks to satellite objects that implements a given
74  * interface. Abstract protected methods instantiate those objects allowing subclasses
75  * to create them as they want (using customizable factories or instance).
76  * </p>
77  *
78  * @author ProActive Team
79  * @version 1.0, 2001/10/23
80  * @since ProActive 0.9
81  * @see org.objectweb.proactive.Body
82  * @see UniqueID
83  *
84  */

85 public abstract class BodyImpl extends AbstractBody
86     implements java.io.Serializable JavaDoc {
87     //
88
// -- STATIC MEMBERS -----------------------------------------------
89
//
90
private static final String JavaDoc INACTIVE_BODY_EXCEPTION_MESSAGE = "Cannot perform this call because this body is inactive";
91
92     //
93
// -- PROTECTED MEMBERS -----------------------------------------------
94
//
95

96     /** The component in charge of receiving reply */
97     protected ReplyReceiver replyReceiver;
98
99     /** The component in charge of receiving request */
100     protected RequestReceiver requestReceiver;
101     protected MessageEventProducerImpl messageEventProducer;
102     protected String JavaDoc JobID;
103
104     //
105
// -- CONSTRUCTORS -----------------------------------------------
106
//
107

108     /**
109      * Creates a new AbstractBody.
110      * Used for serialization.
111      */

112     public BodyImpl() {
113     }
114
115     /**
116      * Creates a new AbstractBody for an active object attached to a given node.
117      * @param reifiedObject the active object that body is for
118      * @param nodeURL the URL of the node that body is attached to
119      * @param factory the factory able to construct new factories for each type of meta objects
120      * needed by this body
121      */

122     public BodyImpl(Object JavaDoc reifiedObject, String JavaDoc nodeURL,
123         MetaObjectFactory factory, String JavaDoc jobID) {
124         super(reifiedObject, nodeURL, factory);
125         this.JobID = jobID;
126         this.requestReceiver = factory.newRequestReceiverFactory()
127                                       .newRequestReceiver();
128         this.replyReceiver = factory.newReplyReceiverFactory().newReplyReceiver();
129         this.messageEventProducer = new MessageEventProducerImpl();
130         setLocalBodyImpl(new ActiveLocalBodyStrategy(reifiedObject,
131                 factory.newRequestQueueFactory().newRequestQueue(bodyID),
132                 factory.newRequestFactory()));
133         this.localBodyStrategy.getFuturePool().setOwnerBody(this.getID());
134     }
135
136     //
137
// -- PUBLIC METHODS -----------------------------------------------
138
//
139
public String JavaDoc getJobID() {
140         return this.JobID;
141     }
142
143     //
144
// -- implements MessageEventProducer -----------------------------------------------
145
//
146
public void addMessageEventListener(MessageEventListener listener) {
147         if (messageEventProducer != null) {
148             messageEventProducer.addMessageEventListener(listener);
149         }
150     }
151
152     public void removeMessageEventListener(MessageEventListener listener) {
153         if (messageEventProducer != null) {
154             messageEventProducer.removeMessageEventListener(listener);
155         }
156     }
157
158     //
159
// -- PROTECTED METHODS -----------------------------------------------
160
//
161

162     /**
163      * Receives a request for later processing. The call to this method is non blocking
164      * unless the body cannot temporary receive the request.
165      * @param request the request to process
166      * @exception java.io.IOException if the request cannot be accepted
167      */

168     protected void internalReceiveRequest(Request request)
169         throws java.io.IOException JavaDoc, RenegotiateSessionException {
170         if (messageEventProducer != null) {
171             messageEventProducer.notifyListeners(request,
172                 MessageEvent.REQUEST_RECEIVED, bodyID,
173                 getRequestQueue().size() + 1);
174         }
175
176         // request queue length = number of requests in queue
177
// + the one to add now
178
requestReceiver.receiveRequest(request, this);
179     }
180
181     /**
182      * Receives a reply in response to a former request.
183      * @param reply the reply received
184      * @exception java.io.IOException if the reply cannot be accepted
185      */

186     protected void internalReceiveReply(Reply reply) throws java.io.IOException JavaDoc {
187         if (messageEventProducer != null) {
188             messageEventProducer.notifyListeners(reply,
189                 MessageEvent.REPLY_RECEIVED, bodyID);
190         }
191         replyReceiver.receiveReply(reply, this, getFuturePool());
192     }
193
194     /**
195      * Signals that the activity of this body, managed by the active thread has just stopped.
196      */

197     protected void activityStopped() {
198         super.activityStopped();
199         messageEventProducer = null;
200         setLocalBodyImpl(new InactiveLocalBodyStrategy());
201     }
202
203     //protected void activityStopped2() {
204
// super.activityStopped2();
205
//
206
//}
207
public void setImmediateService(String JavaDoc methodName)
208         throws java.io.IOException JavaDoc {
209         this.requestReceiver.setImmediateService(methodName);
210     }
211
212     //
213
// -- PRIVATE METHODS -----------------------------------------------
214
//
215
//
216
// -- inner classes -----------------------------------------------
217
//
218
private class ActiveLocalBodyStrategy implements LocalBodyStrategy,
219         java.io.Serializable JavaDoc {
220
221         /** A pool future that contains the pending future objects */
222         protected FuturePool futures;
223
224         /** The reified object target of the request processed by this body */
225         protected Object JavaDoc reifiedObject;
226         protected BlockingRequestQueue requestQueue;
227         protected RequestFactory internalRequestFactory;
228         private long absoluteSequenceID;
229
230         //
231
// -- CONSTRUCTORS -----------------------------------------------
232
//
233
public ActiveLocalBodyStrategy(Object JavaDoc reifiedObject,
234             BlockingRequestQueue requestQueue, RequestFactory requestFactory) {
235             this.reifiedObject = reifiedObject;
236             this.futures = new FuturePool();
237             this.requestQueue = requestQueue;
238             this.internalRequestFactory = requestFactory;
239         }
240
241         //
242
// -- PUBLIC METHODS -----------------------------------------------
243
//
244
//
245
// -- implements LocalBody -----------------------------------------------
246
//
247
public FuturePool getFuturePool() {
248             return futures;
249         }
250
251         public BlockingRequestQueue getRequestQueue() {
252             return requestQueue;
253         }
254
255         public Object JavaDoc getReifiedObject() {
256             return reifiedObject;
257         }
258
259         public String JavaDoc getName() {
260             return reifiedObject.getClass().getName();
261         }
262
263         /** Serves the request. The request should be removed from the request queue
264          * before serving, which is correctly done by all methods of the Service class.
265          * However, this condition is not ensured for custom calls on serve. */

266         public void serve(Request request) {
267             if (request == null) {
268                 return;
269             }
270             try {
271                 messageEventProducer.notifyListeners(request,
272                     MessageEvent.SERVING_STARTED, bodyID,
273                     getRequestQueue().size());
274                 Reply reply = request.serve(BodyImpl.this);
275                 if (reply == null) {
276                     if (!isActive()) {
277                         return; //test if active in case of terminate() method otherwise eventProducer would be null
278
}
279                     messageEventProducer.notifyListeners(request,
280                         MessageEvent.VOID_REQUEST_SERVED, bodyID,
281                         getRequestQueue().size());
282                     return;
283                 }
284                 UniqueID destinationBodyId = request.getSourceBodyID();
285                 if ((destinationBodyId != null) &&
286                         (messageEventProducer != null)) {
287                     messageEventProducer.notifyListeners(reply,
288                         MessageEvent.REPLY_SENT, destinationBodyId,
289                         getRequestQueue().size());
290                 }
291                 this.getFuturePool().registerDestination(request.getSender());
292                 reply.send(request.getSender());
293                 this.getFuturePool().removeDestination();
294             } catch (ServeException e) {
295                 // handle error here
296
throw new ProActiveRuntimeException("Exception in serve (Still not handled) : throws killer RuntimeException",
297                     e);
298             } catch (java.io.IOException JavaDoc e) {
299                 // handle error here
300
throw new ProActiveRuntimeException("Exception in sending reply (Still not handled) : throws killer RuntimeException",
301                     e);
302             }
303         }
304
305         public void sendRequest(MethodCall methodCall, Future future,
306             UniversalBody destinationBody)
307             throws java.io.IOException JavaDoc, RenegotiateSessionException {
308             long sequenceID = getNextSequenceID();
309             Request request = internalRequestFactory.newRequest(methodCall,
310                     BodyImpl.this, future == null, sequenceID);
311
312             // COMPONENTS : generate ComponentRequest for component messages
313
if (methodCall.getTag() != null) {
314                 if (methodCall.getTag().equals(MethodCall.COMPONENT_TAG)) {
315                     request = new ComponentRequestImpl((RequestImpl) request);
316                 }
317             }
318             if (future != null) {
319                 future.setID(sequenceID);
320                 futures.receiveFuture(future);
321             }
322             messageEventProducer.notifyListeners(request,
323                 MessageEvent.REQUEST_SENT, destinationBody.getID());
324             request.send(destinationBody);
325         }
326
327         //
328
// -- PROTECTED METHODS -----------------------------------------------
329
//
330

331         /**
332          * Returns a unique identifier that can be used to tag a future, a request
333          * @return a unique identifier that can be used to tag a future, a request.
334          */

335         private synchronized long getNextSequenceID() {
336             return ++absoluteSequenceID;
337         }
338     }
339
340     // end inner class LocalBodyImpl
341
private class InactiveLocalBodyStrategy implements LocalBodyStrategy,
342         java.io.Serializable JavaDoc {
343         //
344
// -- CONSTRUCTORS -----------------------------------------------
345
//
346
public InactiveLocalBodyStrategy() {
347         }
348
349         //
350
// -- PUBLIC METHODS -----------------------------------------------
351
//
352
//
353
// -- implements LocalBody -----------------------------------------------
354
//
355
public FuturePool getFuturePool() {
356             //throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
357
return null;
358         }
359
360         public BlockingRequestQueue getRequestQueue() {
361             throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
362         }
363
364         public RequestQueue getHighPriorityRequestQueue() {
365             throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
366         }
367
368         public Object JavaDoc getReifiedObject() {
369             throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
370         }
371
372         public String JavaDoc getName() {
373             return "inactive body";
374         }
375
376         public void serve(Request request) {
377             throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
378         }
379
380         public void sendRequest(MethodCall methodCall, Future future,
381             UniversalBody destinationBody) throws java.io.IOException JavaDoc {
382             throw new ProActiveRuntimeException(INACTIVE_BODY_EXCEPTION_MESSAGE);
383         }
384     }
385
386     // end inner class LocalInactiveBody
387
}
388
Popular Tags