1 10 11 package org.mule.extras.client; 12 13 import edu.emory.mathcs.backport.java.util.concurrent.Callable; 14 import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; 15 import org.apache.commons.logging.Log; 16 import org.apache.commons.logging.LogFactory; 17 import org.mule.MuleManager; 18 import org.mule.config.MuleProperties; 19 import org.mule.impl.MuleEvent; 20 import org.mule.impl.MuleMessage; 21 import org.mule.impl.MuleSession; 22 import org.mule.impl.RequestContext; 23 import org.mule.impl.MuleSessionHandler; 24 import org.mule.impl.endpoint.MuleEndpoint; 25 import org.mule.impl.internal.notifications.AdminNotification; 26 import org.mule.impl.security.MuleCredentials; 27 import org.mule.providers.AbstractConnector; 28 import org.mule.providers.service.ConnectorFactory; 29 import org.mule.transformers.wire.SerializationWireFormat; 30 import org.mule.transformers.wire.WireFormat; 31 import org.mule.umo.FutureMessageResult; 32 import org.mule.umo.UMOEvent; 33 import org.mule.umo.UMOException; 34 import org.mule.umo.UMOMessage; 35 import org.mule.umo.endpoint.UMOEndpoint; 36 import org.mule.umo.lifecycle.Disposable; 37 import org.mule.umo.provider.DispatchException; 38 import org.mule.umo.provider.UMOMessageDispatcher; 39 import org.mule.umo.security.UMOCredentials; 40 import org.mule.util.MuleObjectHelper; 41 42 import java.io.ByteArrayInputStream ; 43 import java.io.ByteArrayOutputStream ; 44 import java.io.InputStream ; 45 import java.util.Map ; 46 47 55 56 public class RemoteDispatcher implements Disposable 57 { 58 59 62 protected static Log logger = LogFactory.getLog(RemoteDispatcher.class); 63 64 67 private UMOEndpoint serverEndpoint; 68 private UMOCredentials credentials = null; 69 70 73 private ExecutorService executor; 74 75 78 private WireFormat wireFormat; 79 80 protected RemoteDispatcher(String endpoint, UMOCredentials credentials) throws UMOException 81 { 82 this(endpoint); 83 this.credentials = credentials; 84 } 85 86 protected RemoteDispatcher(String endpoint) throws UMOException 87 { 88 serverEndpoint = new MuleEndpoint(endpoint, true); 89 wireFormat = new SerializationWireFormat(); 90 } 91 92 protected void setExecutorService(ExecutorService e) 93 { 94 this.executor = e; 95 } 96 97 109 public void dispatchToRemoteComponent(String component, Object payload, Map messageProperties) 110 throws UMOException 111 { 112 doToRemoteComponent(component, payload, messageProperties, true); 113 } 114 115 128 public UMOMessage sendToRemoteComponent(String component, Object payload, Map messageProperties) 129 throws UMOException 130 { 131 return doToRemoteComponent(component, payload, messageProperties, true); 132 } 133 134 151 public FutureMessageResult sendAsyncToRemoteComponent(final String component, 152 String transformers, 153 final Object payload, 154 final Map messageProperties) throws UMOException 155 { 156 Callable callable = new Callable() 157 { 158 public Object call() throws Exception 159 { 160 return doToRemoteComponent(component, payload, messageProperties, true); 161 } 162 }; 163 164 FutureMessageResult result = new FutureMessageResult(callable); 165 166 if (executor != null) 167 { 168 result.setExecutor(executor); 169 } 170 171 if (transformers != null) 172 { 173 result.setTransformer(MuleObjectHelper.getTransformer(transformers, ",")); 174 } 175 176 result.execute(); 177 return result; 178 } 179 180 public UMOMessage sendRemote(String endpoint, Object payload, Map messageProperties, int timeout) 181 throws UMOException 182 { 183 return doToRemote(endpoint, payload, messageProperties, true, timeout); 184 } 185 186 public UMOMessage sendRemote(String endpoint, Object payload, Map messageProperties) throws UMOException 187 { 188 return doToRemote(endpoint, payload, messageProperties, true, MuleManager.getConfiguration() 189 .getSynchronousEventTimeout()); 190 } 191 192 public void dispatchRemote(String endpoint, Object payload, Map messageProperties) throws UMOException 193 { 194 doToRemote(endpoint, payload, messageProperties, false, -1); 195 } 196 197 public FutureMessageResult sendAsyncRemote(final String endpoint, 198 final Object payload, 199 final Map messageProperties) throws UMOException 200 { 201 Callable callable = new Callable() 202 { 203 public Object call() throws Exception 204 { 205 return doToRemote(endpoint, payload, messageProperties, true, -1); 206 } 207 }; 208 209 FutureMessageResult result = new FutureMessageResult(callable); 210 211 if (executor != null) 212 { 213 result.setExecutor(executor); 214 } 215 216 result.execute(); 217 return result; 218 } 219 220 public UMOMessage receiveRemote(String endpoint, int timeout) throws UMOException 221 { 222 AdminNotification action = new AdminNotification(null, AdminNotification.ACTION_RECEIVE, endpoint); 223 action.setProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY, "true"); 224 action.setProperty(MuleProperties.MULE_EVENT_TIMEOUT_PROPERTY, new Long (timeout)); 225 UMOMessage result = dispatchAction(action, true, timeout); 226 return result; 227 } 228 229 public FutureMessageResult asyncReceiveRemote(final String endpoint, final int timeout) 230 throws UMOException 231 { 232 Callable callable = new Callable() 233 { 234 public Object call() throws Exception 235 { 236 return receiveRemote(endpoint, timeout); 237 } 238 }; 239 240 FutureMessageResult result = new FutureMessageResult(callable); 241 242 if (executor != null) 243 { 244 result.setExecutor(executor); 245 } 246 247 result.execute(); 248 return result; 249 } 250 251 protected UMOMessage doToRemoteComponent(String component, 252 Object payload, 253 Map messageProperties, 254 boolean synchronous) throws UMOException 255 { 256 UMOMessage message = new MuleMessage(payload, messageProperties); 257 message.setBooleanProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY, synchronous); 258 setCredentials(message); 259 AdminNotification action = new AdminNotification(message, AdminNotification.ACTION_INVOKE, 260 "mule://" + component); 261 UMOMessage result = dispatchAction(action, synchronous, MuleManager.getConfiguration() 262 .getSynchronousEventTimeout()); 263 return result; 264 } 265 266 protected UMOMessage doToRemote(String endpoint, 267 Object payload, 268 Map messageProperties, 269 boolean synchronous, 270 int timeout) throws UMOException 271 { 272 UMOMessage message = new MuleMessage(payload, messageProperties); 273 message.setProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY, String.valueOf(synchronous)); 274 setCredentials(message); 275 AdminNotification action = new AdminNotification(message, (synchronous 276 ? AdminNotification.ACTION_SEND : AdminNotification.ACTION_DISPATCH), endpoint); 277 278 UMOMessage result = dispatchAction(action, synchronous, timeout); 279 return result; 280 } 281 282 protected UMOMessage dispatchAction(AdminNotification action, boolean synchronous, int timeout) 283 throws UMOException 284 { 285 286 UMOEndpoint endpoint = ConnectorFactory.createEndpoint(serverEndpoint.getEndpointURI(), 287 UMOEndpoint.ENDPOINT_TYPE_SENDER); 288 endpoint.setRemoteSync(synchronous); 289 updateContext(new MuleMessage(action), endpoint, synchronous); 290 291 ByteArrayOutputStream out = new ByteArrayOutputStream (); 292 wireFormat.write(out, action); 293 byte[] payload = out.toByteArray(); 294 295 UMOMessage message = action.getMessage(); 296 297 if (message == null) 298 { 299 message = new MuleMessage(payload); 300 } 301 else 302 { 303 message = new MuleMessage(payload, message); 304 } 305 306 message.addProperties(action.getProperties()); 307 MuleSession session = new MuleSession(message, 308 ((AbstractConnector)endpoint.getConnector()).getSessionHandler()); 309 310 UMOEvent event = new MuleEvent(message, endpoint, session, true); 311 event.setTimeout(timeout); 312 if (logger.isDebugEnabled()) 313 { 314 logger.debug("MuleClient sending remote call to: " + action.getResourceIdentifier() + ". At " 315 + serverEndpoint.toString() + " .Event is: " + event); 316 } 317 318 UMOMessageDispatcher dispatcher = endpoint.getConnector().getDispatcher(serverEndpoint); 319 320 UMOMessage result = null; 321 322 try 323 { 324 if (synchronous) 325 { 326 result = dispatcher.send(event); 327 } 328 else 329 { 330 dispatcher.dispatch(event); 331 return null; 332 } 333 if (result != null) 334 { 335 if (result.getPayload() != null) 336 { 337 Object response; 338 if (result.getPayload() instanceof InputStream ) 339 { 340 response = wireFormat.read((InputStream )result.getPayload()); 341 } 342 else 343 { 344 ByteArrayInputStream in = new ByteArrayInputStream (result.getPayloadAsBytes()); 345 response = wireFormat.read(in); 346 } 347 348 if (response instanceof AdminNotification) 349 { 350 response = ((AdminNotification)response).getMessage(); 351 } 352 return (UMOMessage)response; 353 } 354 else 355 { 356 return result; 357 } 358 } 359 360 } 361 catch (Exception e) 362 { 363 throw new DispatchException(event.getMessage(), event.getEndpoint(), e); 364 } 365 366 if (logger.isDebugEnabled()) 367 { 368 logger.debug("Result of MuleClient remote call is: " 369 + (result == null ? "null" : result.getPayload())); 370 } 371 372 return result; 373 } 374 375 public void dispose() 376 { 377 } 379 380 protected void setCredentials(UMOMessage message) 381 { 382 if (credentials != null) 383 { 384 message.setProperty(MuleProperties.MULE_USER_PROPERTY, MuleCredentials.createHeader( 385 credentials.getUsername(), credentials.getPassword())); 386 } 387 } 388 389 public WireFormat getWireFormat() 390 { 391 return wireFormat; 392 } 393 394 public void setWireFormat(WireFormat wireFormat) 395 { 396 this.wireFormat = wireFormat; 397 } 398 399 protected void updateContext(UMOMessage message, UMOEndpoint endpoint, boolean synchronous) 400 throws UMOException 401 { 402 403 RequestContext.setEvent(new MuleEvent(message, endpoint, new MuleSession(message, 404 new MuleSessionHandler()), synchronous)); 405 } 406 } 407 | Popular Tags |