KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > vm > VMMessageDispatcher


1 /*
2  * $Id: VMMessageDispatcher.java 3982 2006-11-22 14:28:01Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.vm;
12
13 import java.io.ByteArrayInputStream JavaDoc;
14 import java.io.InputStream JavaDoc;
15 import java.io.PipedInputStream JavaDoc;
16 import java.io.PipedOutputStream JavaDoc;
17
18 import org.mule.MuleManager;
19 import org.mule.config.i18n.Message;
20 import org.mule.config.i18n.Messages;
21 import org.mule.impl.MuleMessage;
22 import org.mule.providers.AbstractMessageDispatcher;
23 import org.mule.transformers.simple.ObjectToByteArray;
24 import org.mule.umo.UMOEvent;
25 import org.mule.umo.UMOException;
26 import org.mule.umo.UMOMessage;
27 import org.mule.umo.endpoint.UMOEndpointURI;
28 import org.mule.umo.endpoint.UMOImmutableEndpoint;
29 import org.mule.umo.provider.DispatchException;
30 import org.mule.umo.provider.NoReceiverForEndpointException;
31 import org.mule.umo.provider.UMOConnector;
32 import org.mule.umo.provider.UMOStreamMessageAdapter;
33 import org.mule.util.queue.Queue;
34 import org.mule.util.queue.QueueSession;
35
36 /**
37  * <code>VMMessageDispatcher</code> is used for providing in memory interaction
38  * between components.
39  */

40 public class VMMessageDispatcher extends AbstractMessageDispatcher
41 {
42     private final VMConnector connector;
43     private final ObjectToByteArray objectToByteArray;
44
45     public VMMessageDispatcher(UMOImmutableEndpoint endpoint)
46     {
47         super(endpoint);
48         this.connector = (VMConnector)endpoint.getConnector();
49         objectToByteArray = new ObjectToByteArray();
50     }
51
52     /*
53      * (non-Javadoc)
54      *
55      * @see org.mule.umo.provider.UMOMessageDispatcher#getDelegateSession()
56      */

57     public Object JavaDoc getDelegateSession() throws UMOException
58     {
59         return null;
60     }
61
62     /**
63      * Make a specific request to the underlying transport
64      *
65      * @param endpoint the endpoint to use when connecting to the resource
66      * @param timeout the maximum time the operation should block before returning.
67      * The call should return immediately if there is data available. If
68      * no data becomes available before the timeout elapses, null will be
69      * returned
70      * @return the result of the request wrapped in a UMOMessage object. Null will be
71      * returned if no data was available
72      * @throws Exception if the call to the underlying protocol causes an exception
73      */

74     protected UMOMessage doReceive(UMOImmutableEndpoint endpoint, long timeout) throws Exception JavaDoc
75     {
76         if (!connector.isQueueEvents())
77         {
78             throw new UnsupportedOperationException JavaDoc("Receive only supported on the VM Queue Connector");
79         }
80
81         try
82         {
83             QueueSession queueSession = connector.getQueueSession();
84             Queue queue = queueSession.getQueue(endpoint.getEndpointURI().getAddress());
85             if (queue == null)
86             {
87                 if (logger.isDebugEnabled())
88                 {
89                     logger.debug("No queue with name " + endpoint.getEndpointURI().getAddress());
90                 }
91                 return null;
92             }
93             else
94             {
95                 UMOEvent event = null;
96                 if (logger.isDebugEnabled())
97                 {
98                     logger.debug("Waiting for a message on " + endpoint.getEndpointURI().getAddress());
99                 }
100                 try
101                 {
102                     event = (UMOEvent)queue.poll(timeout);
103                 }
104                 catch (InterruptedException JavaDoc e)
105                 {
106                     logger.error("Failed to receive event from queue: " + endpoint.getEndpointURI());
107                 }
108                 if (event != null)
109                 {
110                     if (logger.isDebugEnabled())
111                     {
112                         logger.debug("Event received: " + event);
113                     }
114                     return event.getMessage();
115                 }
116                 else
117                 {
118                     if (logger.isDebugEnabled())
119                     {
120                         logger.debug("No event received after " + timeout + " ms");
121                     }
122                     return null;
123                 }
124             }
125         }
126         catch (Exception JavaDoc e)
127         {
128             throw e;
129         }
130     }
131
132     /*
133      * (non-Javadoc)
134      *
135      * @see org.mule.umo.provider.UMOConnector#dispatch(org.mule.umo.UMOEvent)
136      */

137     protected void doDispatch(UMOEvent event) throws Exception JavaDoc
138     {
139         UMOEndpointURI endpointUri = event.getEndpoint().getEndpointURI();
140
141         if (endpointUri == null)
142         {
143             throw new DispatchException(new Message(Messages.X_IS_NULL, "Endpoint"), event.getMessage(),
144                 event.getEndpoint());
145         }
146         if (connector.isQueueEvents())
147         {
148             QueueSession session = connector.getQueueSession();
149             Queue queue = session.getQueue(endpointUri.getAddress());
150             queue.put(event);
151         }
152         else
153         {
154             VMMessageReceiver receiver = connector.getReceiver(event.getEndpoint().getEndpointURI());
155             if (receiver == null)
156             {
157                 logger.warn("No receiver for endpointUri: " + event.getEndpoint().getEndpointURI());
158                 return;
159             }
160
161             if (event.isStreaming())
162             {
163
164                 PipedInputStream JavaDoc in = new PipedInputStream JavaDoc();
165                 PipedOutputStream JavaDoc out = new PipedOutputStream JavaDoc(in);
166                 UMOStreamMessageAdapter sma = connector.getStreamMessageAdapter(in, out);
167                 sma.write(event);
168             }
169             receiver.onEvent(event);
170         }
171         if (logger.isDebugEnabled())
172         {
173             logger.debug("dispatched Event on endpointUri: " + endpointUri);
174         }
175     }
176
177     /*
178      * (non-Javadoc)
179      *
180      * @see org.mule.umo.provider.UMOConnector#send(org.mule.umo.UMOEvent)
181      */

182     protected UMOMessage doSend(UMOEvent event) throws Exception JavaDoc
183     {
184         UMOMessage retMessage;
185         UMOEndpointURI endpointUri = event.getEndpoint().getEndpointURI();
186         VMMessageReceiver receiver = connector.getReceiver(endpointUri);
187         if (receiver == null)
188         {
189             if (connector.isQueueEvents())
190             {
191                 if (logger.isDebugEnabled())
192                 {
193                     logger.debug("Writing to queue as there is no receiver on connector: "
194                                  + connector.getName() + ", for endpointUri: "
195                                  + event.getEndpoint().getEndpointURI());
196                 }
197                 doDispatch(event);
198                 return null;
199             }
200             else
201             {
202                 throw new NoReceiverForEndpointException(new Message(Messages.NO_RECEIVER_X_FOR_ENDPOINT_X,
203                     connector.getName(), event.getEndpoint().getEndpointURI()));
204             }
205         }
206         if (event.isStreaming())
207         {
208             PipedInputStream JavaDoc in = new PipedInputStream JavaDoc();
209             PipedOutputStream JavaDoc out = new PipedOutputStream JavaDoc(in);
210             UMOStreamMessageAdapter sma = connector.getStreamMessageAdapter(in, out);
211             sma.write(event);
212         }
213
214         retMessage = (UMOMessage)receiver.onCall(event);
215
216         if (event.isStreaming() && retMessage != null)
217         {
218             InputStream JavaDoc in;
219             if (retMessage.getPayload() instanceof InputStream JavaDoc)
220             {
221                 in = (InputStream JavaDoc)retMessage.getPayload();
222             }
223             else
224             {
225                 in = new ByteArrayInputStream JavaDoc((byte[])objectToByteArray.transform(retMessage.getPayload()));
226             }
227             UMOStreamMessageAdapter sma = connector.getStreamMessageAdapter(in, null);
228             retMessage = new MuleMessage(sma, retMessage);
229         }
230
231         if (logger.isDebugEnabled())
232         {
233             logger.debug("sent event on endpointUri: " + event.getEndpoint().getEndpointURI());
234         }
235
236         return retMessage;
237     }
238
239     /*
240      * (non-Javadoc)
241      *
242      * @see org.mule.umo.provider.UMOMessageDispatcher#getConnector()
243      */

244     public UMOConnector getConnector()
245     {
246         return connector;
247     }
248
249     protected void doDispose()
250     {
251         // template method
252
}
253
254     protected void doConnect(UMOImmutableEndpoint endpoint) throws Exception JavaDoc
255     {
256         if (connector.isQueueEvents())
257         {
258             // use the default queue profile to configure this queue.
259
// Todo We may want to allow users to specify this at the connector level
260
MuleManager.getConfiguration().getQueueProfile().configureQueue(
261                 endpoint.getEndpointURI().getAddress());
262         }
263     }
264
265     protected void doDisconnect() throws Exception JavaDoc
266     {
267         // template method
268
}
269
270 }
271
Popular Tags