1 10 11 package org.mule.providers.vm; 12 13 import org.mule.MuleManager; 14 import org.mule.config.QueueProfile; 15 import org.mule.config.i18n.Message; 16 import org.mule.config.i18n.Messages; 17 import org.mule.impl.MuleMessage; 18 import org.mule.impl.endpoint.MuleEndpointURI; 19 import org.mule.providers.AbstractServiceEnabledConnector; 20 import org.mule.routing.filters.WildcardFilter; 21 import org.mule.transaction.TransactionCoordination; 22 import org.mule.umo.MessagingException; 23 import org.mule.umo.TransactionException; 24 import org.mule.umo.UMOComponent; 25 import org.mule.umo.UMOTransaction; 26 import org.mule.umo.endpoint.EndpointException; 27 import org.mule.umo.endpoint.UMOEndpoint; 28 import org.mule.umo.endpoint.UMOEndpointURI; 29 import org.mule.umo.lifecycle.InitialisationException; 30 import org.mule.umo.provider.MessageTypeNotSupportedException; 31 import org.mule.umo.provider.UMOMessageAdapter; 32 import org.mule.umo.provider.UMOMessageReceiver; 33 import org.mule.util.ClassUtils; 34 import org.mule.util.queue.QueueManager; 35 import org.mule.util.queue.QueueSession; 36 37 import java.util.Iterator ; 38 39 46 47 public class VMConnector extends AbstractServiceEnabledConnector 48 { 49 private boolean queueEvents = false; 50 private QueueProfile queueProfile; 51 private Class adapterClass = null; 52 private int queueTimeout = 2000; 53 54 59 public void doInitialise() throws InitialisationException 60 { 61 super.doInitialise(); 62 if (queueEvents) 63 { 64 if (queueProfile == null) 65 { 66 queueProfile = MuleManager.getConfiguration().getQueueProfile(); 67 } 68 } 69 70 try 71 { 72 adapterClass = ClassUtils.loadClass(serviceDescriptor.getMessageAdapter(), getClass()); 73 } 74 catch (ClassNotFoundException e) 75 { 76 throw new InitialisationException(new Message(Messages.FAILED_LOAD_X, 77 "Message Adapter: " + serviceDescriptor.getMessageAdapter()), e); 78 } 79 } 80 81 87 public UMOMessageReceiver createReceiver(UMOComponent component, UMOEndpoint endpoint) throws Exception 88 { 89 if (queueEvents) 90 { 91 queueProfile.configureQueue(endpoint.getEndpointURI().getAddress()); 92 } 93 return serviceDescriptor.createMessageReceiver(this, component, endpoint); 94 } 95 96 101 public UMOMessageAdapter getMessageAdapter(Object message) throws MessagingException 102 { 103 if (message == null) 104 { 105 throw new MessageTypeNotSupportedException(null, adapterClass); 106 } 107 else if (message instanceof MuleMessage) 108 { 109 return ((MuleMessage)message).getAdapter(); 110 } 111 else if (message instanceof UMOMessageAdapter) 112 { 113 return (UMOMessageAdapter)message; 114 } 115 else 116 { 117 throw new MessageTypeNotSupportedException(message, adapterClass); 118 } 119 } 120 121 126 public String getProtocol() 127 { 128 return "VM"; 129 } 130 131 136 protected void doDispose() 137 { 138 } 140 141 public boolean isQueueEvents() 142 { 143 return queueEvents; 144 } 145 146 public void setQueueEvents(boolean queueEvents) 147 { 148 this.queueEvents = queueEvents; 149 } 150 151 public QueueProfile getQueueProfile() 152 { 153 return queueProfile; 154 } 155 156 public void setQueueProfile(QueueProfile queueProfile) 157 { 158 this.queueProfile = queueProfile; 159 } 160 161 VMMessageReceiver getReceiver(UMOEndpointURI endpointUri) throws EndpointException 162 { 163 return (VMMessageReceiver)getReceiverByEndpoint(endpointUri); 164 } 165 166 QueueSession getQueueSession() throws InitialisationException 167 { 168 QueueManager qm = MuleManager.getInstance().getQueueManager(); 169 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 170 if (tx != null) 171 { 172 if (tx.hasResource(qm)) 173 { 174 if (logger.isTraceEnabled()) 175 { 176 } 179 return (QueueSession)tx.getResource(qm); 180 } 181 } 182 183 if (logger.isTraceEnabled()) 184 { 185 } 187 188 QueueSession session = qm.getQueueSession(); 189 if (tx != null) 190 { 191 logger.debug("Binding queue session to current transaction"); 192 try 193 { 194 tx.bindResource(qm, session); 195 } 196 catch (TransactionException e) 197 { 198 throw new RuntimeException ("Could not bind queue session to current transaction", e); 199 } 200 } 201 return session; 202 } 203 204 protected UMOMessageReceiver getReceiverByEndpoint(UMOEndpointURI endpointUri) throws EndpointException 205 { 206 if (logger.isDebugEnabled()) 207 { 208 logger.debug("Looking up vm receiver for address: " + endpointUri.toString()); 209 } 210 211 UMOMessageReceiver receiver; 212 receiver = (UMOMessageReceiver)receivers.get(endpointUri.getAddress()); 214 if (receiver != null) 215 { 216 if (logger.isDebugEnabled()) 217 { 218 logger.debug("Found exact receiver match on endpointUri: " + endpointUri); 219 } 220 return receiver; 221 } 222 223 for (Iterator iterator = receivers.values().iterator(); iterator.hasNext();) 225 { 226 receiver = (UMOMessageReceiver)iterator.next(); 227 String filterAddress = receiver.getEndpointURI().getAddress(); 228 WildcardFilter filter = new WildcardFilter(filterAddress); 229 if (filter.accept(endpointUri.getAddress())) 230 { 231 receiver.getEndpoint().setEndpointURI(new MuleEndpointURI(endpointUri, filterAddress)); 232 233 if (logger.isDebugEnabled()) 234 { 235 logger.debug("Found receiver match on endpointUri: " + receiver.getEndpointURI() 236 + " against " + endpointUri); 237 } 238 return receiver; 239 } 240 } 241 if (logger.isDebugEnabled()) 242 { 243 logger.debug("No receiver found for endpointUri: " + endpointUri); 244 } 245 return null; 246 } 247 248 public boolean isRemoteSyncEnabled() 249 { 250 return true; 251 } 252 253 public int getQueueTimeout() 254 { 255 return queueTimeout; 256 } 257 258 public void setQueueTimeout(int queueTimeout) 259 { 260 this.queueTimeout = queueTimeout; 261 } 262 263 } 264 | Popular Tags |