1 31 package org.objectweb.proactive.core.body; 32 33 import org.objectweb.proactive.core.ProActiveRuntimeException; 34 import org.objectweb.proactive.core.body.future.Future; 35 import org.objectweb.proactive.core.body.future.FuturePool; 36 import org.objectweb.proactive.core.body.reply.Reply; 37 import org.objectweb.proactive.core.body.reply.ReplyReceiver; 38 import org.objectweb.proactive.core.body.request.BlockingRequestQueue; 39 import org.objectweb.proactive.core.body.request.Request; 40 import org.objectweb.proactive.core.body.request.RequestFactory; 41 import org.objectweb.proactive.core.body.request.RequestImpl; 42 import org.objectweb.proactive.core.body.request.RequestQueue; 43 import org.objectweb.proactive.core.component.request.ComponentRequestImpl; 44 import org.objectweb.proactive.core.event.MessageEventListener; 45 import org.objectweb.proactive.core.mop.MethodCall; 46 import org.objectweb.proactive.core.runtime.ProActiveRuntimeImpl; 47 import org.objectweb.proactive.ext.security.CommunicationForbiddenException; 48 import org.objectweb.proactive.ext.security.InternalBodySecurity; 49 import org.objectweb.proactive.ext.security.ProActiveSecurity; 50 import org.objectweb.proactive.ext.security.RenegotiateSessionException; 51 import org.objectweb.proactive.ext.security.SecurityContext; 52 import org.objectweb.proactive.ext.security.SecurityNotAvailableException; 53 import org.objectweb.proactive.ext.security.crypto.AuthenticationException; 54 55 import java.security.cert.X509Certificate ; 56 57 58 public class HalfBody extends AbstractBody { 59 private static final String HALF_BODY_EXCEPTION_MESSAGE = "This method is not implemented in class HalfBody."; 63 private static final String NAME = "Other thread"; 64 65 66 private ReplyReceiver replyReceiver; 67 68 public synchronized static HalfBody getHalfBody(MetaObjectFactory factory) { 69 return new HalfBody(factory); 70 } 71 72 private HalfBody(MetaObjectFactory factory) { 76 super(new Object (), "LOCAL", factory); 78 internalBodySecurity = new InternalBodySecurity(null); 88 89 this.replyReceiver = factory.newReplyReceiverFactory().newReplyReceiver(); 90 setLocalBodyImpl(new HalfLocalBodyStrategy(factory.newRequestFactory())); 91 this.localBodyStrategy.getFuturePool().setOwnerBody(this.getID()); 92 } 93 94 public void addMessageEventListener(MessageEventListener listener) { 101 } 102 103 public void removeMessageEventListener(MessageEventListener listener) { 104 } 105 106 110 116 protected void internalReceiveRequest(Request request) 117 throws java.io.IOException { 118 throw new ProActiveRuntimeException( 119 "The method 'receiveRequest' is not implemented in class HalfBody."); 120 } 121 122 127 protected void internalReceiveReply(Reply reply) throws java.io.IOException { 128 try { 129 if (reply.isCiphered()) { 130 reply.decrypt(psm); 131 } 132 } catch (Exception e) { 133 e.printStackTrace(); 134 } 135 replyReceiver.receiveReply(reply, this, getFuturePool()); 136 } 137 138 public void setImmediateService(String methodName) { 139 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE); 140 } 141 142 145 public String getJobID() { 146 return ProActiveRuntimeImpl.getProActiveRuntime().getJobID(); 147 } 148 149 private class HalfLocalBodyStrategy implements LocalBodyStrategy, 153 java.io.Serializable { 154 155 156 protected FuturePool futures; 157 protected RequestFactory internalRequestFactory; 158 private long absoluteSequenceID; 159 160 public HalfLocalBodyStrategy(RequestFactory requestFactory) { 164 this.futures = new FuturePool(); 165 this.internalRequestFactory = requestFactory; 166 } 167 168 public FuturePool getFuturePool() { 175 return futures; 176 } 177 178 public BlockingRequestQueue getRequestQueue() { 179 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE); 180 } 181 182 public RequestQueue getHighPriorityRequestQueue() { 183 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE); 184 } 185 186 public Object getReifiedObject() { 187 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE); 188 } 189 190 public String getName() { 191 return NAME; 192 } 193 194 public void serve(Request request) { 195 throw new ProActiveRuntimeException(HALF_BODY_EXCEPTION_MESSAGE); 196 } 197 198 public void sendRequest(MethodCall methodCall, Future future, 199 UniversalBody destinationBody) 200 throws java.io.IOException , RenegotiateSessionException { 201 long sequenceID = getNextSequenceID(); 202 Request request = internalRequestFactory.newRequest(methodCall, 203 HalfBody.this, future == null, sequenceID); 204 205 if (methodCall.getTag() != null) { 207 if (methodCall.getTag().equals(MethodCall.COMPONENT_TAG)) { 208 request = new ComponentRequestImpl((RequestImpl) request); 209 } 210 } 211 if (future != null) { 212 future.setID(sequenceID); 213 futures.receiveFuture(future); 214 } 215 216 long sessionID = 0; 218 219 try { 222 try { 223 if (!isSecurityOn) { 224 logger.debug("security is off"); 225 throw new SecurityNotAvailableException(); 226 } 227 if (internalBodySecurity.isLocalBody()) { 228 byte[] certE = destinationBody.getRemoteAdapter() 229 .getCertificateEncoded(); 230 X509Certificate cert = ProActiveSecurity.decodeCertificate(certE); 231 if ((sessionID = psm.getSessionIDTo(cert)) == 0) { 232 psm.initiateSession(SecurityContext.COMMUNICATION_SEND_REPLY_TO, 233 destinationBody.getRemoteAdapter()); 234 sessionID = psm.getSessionIDTo(cert); 235 } 236 } 237 } catch (SecurityNotAvailableException e) { 238 logger.debug("communication without security"); 240 } 242 request.send(destinationBody); 243 } catch (RenegotiateSessionException e) { 244 updateLocation(destinationBody.getID(), e.getUniversalBody()); 246 psm.terminateSession(sessionID); 247 logger.debug("renegotiate session"); 248 sendRequest(methodCall, future, e.getUniversalBody()); 249 } catch (CommunicationForbiddenException e) { 250 logger.warn(e); 251 } catch (AuthenticationException e) { 253 e.printStackTrace(); 254 } 255 } 256 257 261 265 private synchronized long getNextSequenceID() { 266 return ++absoluteSequenceID; 267 } 268 } 269 270 } 272 | Popular Tags |