1 10 11 package org.mule.providers.vm; 12 13 import java.io.ByteArrayInputStream ; 14 import java.io.InputStream ; 15 import java.io.PipedInputStream ; 16 import java.io.PipedOutputStream ; 17 18 import org.mule.MuleManager; 19 import org.mule.config.i18n.Message; 20 import org.mule.config.i18n.Messages; 21 import org.mule.impl.MuleMessage; 22 import org.mule.providers.AbstractMessageDispatcher; 23 import org.mule.transformers.simple.ObjectToByteArray; 24 import org.mule.umo.UMOEvent; 25 import org.mule.umo.UMOException; 26 import org.mule.umo.UMOMessage; 27 import org.mule.umo.endpoint.UMOEndpointURI; 28 import org.mule.umo.endpoint.UMOImmutableEndpoint; 29 import org.mule.umo.provider.DispatchException; 30 import org.mule.umo.provider.NoReceiverForEndpointException; 31 import org.mule.umo.provider.UMOConnector; 32 import org.mule.umo.provider.UMOStreamMessageAdapter; 33 import org.mule.util.queue.Queue; 34 import org.mule.util.queue.QueueSession; 35 36 40 public class VMMessageDispatcher extends AbstractMessageDispatcher 41 { 42 private final VMConnector connector; 43 private final ObjectToByteArray objectToByteArray; 44 45 public VMMessageDispatcher(UMOImmutableEndpoint endpoint) 46 { 47 super(endpoint); 48 this.connector = (VMConnector)endpoint.getConnector(); 49 objectToByteArray = new ObjectToByteArray(); 50 } 51 52 57 public Object getDelegateSession() throws UMOException 58 { 59 return null; 60 } 61 62 74 protected UMOMessage doReceive(UMOImmutableEndpoint endpoint, long timeout) throws Exception 75 { 76 if (!connector.isQueueEvents()) 77 { 78 throw new UnsupportedOperationException ("Receive only supported on the VM Queue Connector"); 79 } 80 81 try 82 { 83 QueueSession queueSession = connector.getQueueSession(); 84 Queue queue = queueSession.getQueue(endpoint.getEndpointURI().getAddress()); 85 if (queue == null) 86 { 87 if (logger.isDebugEnabled()) 88 { 89 logger.debug("No queue with name " + endpoint.getEndpointURI().getAddress()); 90 } 91 return null; 92 } 93 else 94 { 95 UMOEvent event = null; 96 if (logger.isDebugEnabled()) 97 { 98 logger.debug("Waiting for a message on " + endpoint.getEndpointURI().getAddress()); 99 } 100 try 101 { 102 event = (UMOEvent)queue.poll(timeout); 103 } 104 catch (InterruptedException e) 105 { 106 logger.error("Failed to receive event from queue: " + endpoint.getEndpointURI()); 107 } 108 if (event != null) 109 { 110 if (logger.isDebugEnabled()) 111 { 112 logger.debug("Event received: " + event); 113 } 114 return event.getMessage(); 115 } 116 else 117 { 118 if (logger.isDebugEnabled()) 119 { 120 logger.debug("No event received after " + timeout + " ms"); 121 } 122 return null; 123 } 124 } 125 } 126 catch (Exception e) 127 { 128 throw e; 129 } 130 } 131 132 137 protected void doDispatch(UMOEvent event) throws Exception 138 { 139 UMOEndpointURI endpointUri = event.getEndpoint().getEndpointURI(); 140 141 if (endpointUri == null) 142 { 143 throw new DispatchException(new Message(Messages.X_IS_NULL, "Endpoint"), event.getMessage(), 144 event.getEndpoint()); 145 } 146 if (connector.isQueueEvents()) 147 { 148 QueueSession session = connector.getQueueSession(); 149 Queue queue = session.getQueue(endpointUri.getAddress()); 150 queue.put(event); 151 } 152 else 153 { 154 VMMessageReceiver receiver = connector.getReceiver(event.getEndpoint().getEndpointURI()); 155 if (receiver == null) 156 { 157 logger.warn("No receiver for endpointUri: " + event.getEndpoint().getEndpointURI()); 158 return; 159 } 160 161 if (event.isStreaming()) 162 { 163 164 PipedInputStream in = new PipedInputStream (); 165 PipedOutputStream out = new PipedOutputStream (in); 166 UMOStreamMessageAdapter sma = connector.getStreamMessageAdapter(in, out); 167 sma.write(event); 168 } 169 receiver.onEvent(event); 170 } 171 if (logger.isDebugEnabled()) 172 { 173 logger.debug("dispatched Event on endpointUri: " + endpointUri); 174 } 175 } 176 177 182 protected UMOMessage doSend(UMOEvent event) throws Exception 183 { 184 UMOMessage retMessage; 185 UMOEndpointURI endpointUri = event.getEndpoint().getEndpointURI(); 186 VMMessageReceiver receiver = connector.getReceiver(endpointUri); 187 if (receiver == null) 188 { 189 if (connector.isQueueEvents()) 190 { 191 if (logger.isDebugEnabled()) 192 { 193 logger.debug("Writing to queue as there is no receiver on connector: " 194 + connector.getName() + ", for endpointUri: " 195 + event.getEndpoint().getEndpointURI()); 196 } 197 doDispatch(event); 198 return null; 199 } 200 else 201 { 202 throw new NoReceiverForEndpointException(new Message(Messages.NO_RECEIVER_X_FOR_ENDPOINT_X, 203 connector.getName(), event.getEndpoint().getEndpointURI())); 204 } 205 } 206 if (event.isStreaming()) 207 { 208 PipedInputStream in = new PipedInputStream (); 209 PipedOutputStream out = new PipedOutputStream (in); 210 UMOStreamMessageAdapter sma = connector.getStreamMessageAdapter(in, out); 211 sma.write(event); 212 } 213 214 retMessage = (UMOMessage)receiver.onCall(event); 215 216 if (event.isStreaming() && retMessage != null) 217 { 218 InputStream in; 219 if (retMessage.getPayload() instanceof InputStream ) 220 { 221 in = (InputStream )retMessage.getPayload(); 222 } 223 else 224 { 225 in = new ByteArrayInputStream ((byte[])objectToByteArray.transform(retMessage.getPayload())); 226 } 227 UMOStreamMessageAdapter sma = connector.getStreamMessageAdapter(in, null); 228 retMessage = new MuleMessage(sma, retMessage); 229 } 230 231 if (logger.isDebugEnabled()) 232 { 233 logger.debug("sent event on endpointUri: " + event.getEndpoint().getEndpointURI()); 234 } 235 236 return retMessage; 237 } 238 239 244 public UMOConnector getConnector() 245 { 246 return connector; 247 } 248 249 protected void doDispose() 250 { 251 } 253 254 protected void doConnect(UMOImmutableEndpoint endpoint) throws Exception 255 { 256 if (connector.isQueueEvents()) 257 { 258 MuleManager.getConfiguration().getQueueProfile().configureQueue( 261 endpoint.getEndpointURI().getAddress()); 262 } 263 } 264 265 protected void doDisconnect() throws Exception 266 { 267 } 269 270 } 271 | Popular Tags |