KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > vm > VMConnector


1 /*
2  * $Id: VMConnector.java 3798 2006-11-04 04:07:14Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.vm;
12
13 import org.mule.MuleManager;
14 import org.mule.config.QueueProfile;
15 import org.mule.config.i18n.Message;
16 import org.mule.config.i18n.Messages;
17 import org.mule.impl.MuleMessage;
18 import org.mule.impl.endpoint.MuleEndpointURI;
19 import org.mule.providers.AbstractServiceEnabledConnector;
20 import org.mule.routing.filters.WildcardFilter;
21 import org.mule.transaction.TransactionCoordination;
22 import org.mule.umo.MessagingException;
23 import org.mule.umo.TransactionException;
24 import org.mule.umo.UMOComponent;
25 import org.mule.umo.UMOTransaction;
26 import org.mule.umo.endpoint.EndpointException;
27 import org.mule.umo.endpoint.UMOEndpoint;
28 import org.mule.umo.endpoint.UMOEndpointURI;
29 import org.mule.umo.lifecycle.InitialisationException;
30 import org.mule.umo.provider.MessageTypeNotSupportedException;
31 import org.mule.umo.provider.UMOMessageAdapter;
32 import org.mule.umo.provider.UMOMessageReceiver;
33 import org.mule.util.ClassUtils;
34 import org.mule.util.queue.QueueManager;
35 import org.mule.util.queue.QueueSession;
36
37 import java.util.Iterator JavaDoc;
38
39 /**
40  * <code>VMConnector</code> A simple endpoint wrapper to allow a Mule component to
41  * <p/> be accessed from an endpoint
42  *
43  * @author <a HREF="mailto:ross.mason@symphonysoft.com">Ross Mason</a>
44  * @author <a HREF="mailto:gnt@codehaus.org">Guillaume Nodet</a>
45  */

46
47 public class VMConnector extends AbstractServiceEnabledConnector
48 {
49     private boolean queueEvents = false;
50     private QueueProfile queueProfile;
51     private Class JavaDoc adapterClass = null;
52     private int queueTimeout = 2000;
53
54     /*
55      * (non-Javadoc)
56      *
57      * @see org.mule.providers.AbstractConnector#create()
58      */

59     public void doInitialise() throws InitialisationException
60     {
61         super.doInitialise();
62         if (queueEvents)
63         {
64             if (queueProfile == null)
65             {
66                 queueProfile = MuleManager.getConfiguration().getQueueProfile();
67             }
68         }
69
70         try
71         {
72             adapterClass = ClassUtils.loadClass(serviceDescriptor.getMessageAdapter(), getClass());
73         }
74         catch (ClassNotFoundException JavaDoc e)
75         {
76             throw new InitialisationException(new Message(Messages.FAILED_LOAD_X,
77                 "Message Adapter: " + serviceDescriptor.getMessageAdapter()), e);
78         }
79     }
80
81     /*
82      * (non-Javadoc)
83      *
84      * @see org.mule.umo.provider.UMOConnector#registerListener(org.mule.umo.UMOSession,
85      * org.mule.umo.endpoint.UMOEndpoint)
86      */

87     public UMOMessageReceiver createReceiver(UMOComponent component, UMOEndpoint endpoint) throws Exception JavaDoc
88     {
89         if (queueEvents)
90         {
91             queueProfile.configureQueue(endpoint.getEndpointURI().getAddress());
92         }
93         return serviceDescriptor.createMessageReceiver(this, component, endpoint);
94     }
95
96     /*
97      * (non-Javadoc)
98      *
99      * @see org.mule.umo.provider.UMOConnector#getMessageAdapter(java.lang.Object)
100      */

101     public UMOMessageAdapter getMessageAdapter(Object JavaDoc message) throws MessagingException
102     {
103         if (message == null)
104         {
105             throw new MessageTypeNotSupportedException(null, adapterClass);
106         }
107         else if (message instanceof MuleMessage)
108         {
109             return ((MuleMessage)message).getAdapter();
110         }
111         else if (message instanceof UMOMessageAdapter)
112         {
113             return (UMOMessageAdapter)message;
114         }
115         else
116         {
117             throw new MessageTypeNotSupportedException(message, adapterClass);
118         }
119     }
120
121     /*
122      * (non-Javadoc)
123      *
124      * @see org.mule.umo.provider.UMOConnector#getProtocol()
125      */

126     public String JavaDoc getProtocol()
127     {
128         return "VM";
129     }
130
131     /*
132      * (non-Javadoc)
133      *
134      * @see org.mule.providers.AbstractConnector#doDispose()
135      */

136     protected void doDispose()
137     {
138         // template method
139
}
140
141     public boolean isQueueEvents()
142     {
143         return queueEvents;
144     }
145
146     public void setQueueEvents(boolean queueEvents)
147     {
148         this.queueEvents = queueEvents;
149     }
150
151     public QueueProfile getQueueProfile()
152     {
153         return queueProfile;
154     }
155
156     public void setQueueProfile(QueueProfile queueProfile)
157     {
158         this.queueProfile = queueProfile;
159     }
160
161     VMMessageReceiver getReceiver(UMOEndpointURI endpointUri) throws EndpointException
162     {
163         return (VMMessageReceiver)getReceiverByEndpoint(endpointUri);
164     }
165
166     QueueSession getQueueSession() throws InitialisationException
167     {
168         QueueManager qm = MuleManager.getInstance().getQueueManager();
169         UMOTransaction tx = TransactionCoordination.getInstance().getTransaction();
170         if (tx != null)
171         {
172             if (tx.hasResource(qm))
173             {
174                 if (logger.isTraceEnabled())
175                 {
176                     // logger.trace("Retrieving queue session from current
177
// transaction");
178
}
179                 return (QueueSession)tx.getResource(qm);
180             }
181         }
182
183         if (logger.isTraceEnabled())
184         {
185             // logger.trace("Retrieving new queue session from queue manager");
186
}
187
188         QueueSession session = qm.getQueueSession();
189         if (tx != null)
190         {
191             logger.debug("Binding queue session to current transaction");
192             try
193             {
194                 tx.bindResource(qm, session);
195             }
196             catch (TransactionException e)
197             {
198                 throw new RuntimeException JavaDoc("Could not bind queue session to current transaction", e);
199             }
200         }
201         return session;
202     }
203
204     protected UMOMessageReceiver getReceiverByEndpoint(UMOEndpointURI endpointUri) throws EndpointException
205     {
206         if (logger.isDebugEnabled())
207         {
208             logger.debug("Looking up vm receiver for address: " + endpointUri.toString());
209         }
210
211         UMOMessageReceiver receiver;
212         // If we have an exact match, use it
213
receiver = (UMOMessageReceiver)receivers.get(endpointUri.getAddress());
214         if (receiver != null)
215         {
216             if (logger.isDebugEnabled())
217             {
218                 logger.debug("Found exact receiver match on endpointUri: " + endpointUri);
219             }
220             return receiver;
221         }
222
223         // otherwise check each one against a wildcard match
224
for (Iterator JavaDoc iterator = receivers.values().iterator(); iterator.hasNext();)
225         {
226             receiver = (UMOMessageReceiver)iterator.next();
227             String JavaDoc filterAddress = receiver.getEndpointURI().getAddress();
228             WildcardFilter filter = new WildcardFilter(filterAddress);
229             if (filter.accept(endpointUri.getAddress()))
230             {
231                 receiver.getEndpoint().setEndpointURI(new MuleEndpointURI(endpointUri, filterAddress));
232
233                 if (logger.isDebugEnabled())
234                 {
235                     logger.debug("Found receiver match on endpointUri: " + receiver.getEndpointURI()
236                                  + " against " + endpointUri);
237                 }
238                 return receiver;
239             }
240         }
241         if (logger.isDebugEnabled())
242         {
243             logger.debug("No receiver found for endpointUri: " + endpointUri);
244         }
245         return null;
246     }
247
248     public boolean isRemoteSyncEnabled()
249     {
250         return true;
251     }
252
253     public int getQueueTimeout()
254     {
255         return queueTimeout;
256     }
257
258     public void setQueueTimeout(int queueTimeout)
259     {
260         this.queueTimeout = queueTimeout;
261     }
262
263 }
264
Popular Tags