1 10 11 package org.mule.providers.vm; 12 13 import org.mule.MuleException; 14 import org.mule.config.i18n.Message; 15 import org.mule.config.i18n.Messages; 16 import org.mule.impl.MuleMessage; 17 import org.mule.providers.TransactedPollingMessageReceiver; 18 import org.mule.umo.UMOComponent; 19 import org.mule.umo.UMOEvent; 20 import org.mule.umo.UMOException; 21 import org.mule.umo.UMOMessage; 22 import org.mule.umo.endpoint.UMOEndpoint; 23 import org.mule.umo.lifecycle.InitialisationException; 24 import org.mule.umo.provider.UMOConnector; 25 import org.mule.util.queue.Queue; 26 import org.mule.util.queue.QueueSession; 27 28 import java.util.List ; 29 30 38 public class VMMessageReceiver extends TransactedPollingMessageReceiver 39 { 40 private VMConnector connector; 41 private final Object lock = new Object (); 42 43 public VMMessageReceiver(UMOConnector connector, UMOComponent component, UMOEndpoint endpoint) 44 throws InitialisationException 45 { 46 super(connector, component, endpoint, new Long (10)); 47 this.connector = (VMConnector)connector; 48 receiveMessagesInTransaction = endpoint.getTransactionConfig().isTransacted(); 49 } 50 51 public void doConnect() throws Exception 52 { 53 if (connector.isQueueEvents()) 54 { 55 QueueSession queueSession = connector.getQueueSession(); 57 Queue q = queueSession.getQueue(endpoint.getEndpointURI().getAddress()); 58 if (logger.isDebugEnabled()) 59 { 60 logger.debug("Current queue depth for queue: " + endpoint.getEndpointURI().getAddress() 61 + " is: " + q.size()); 62 } 63 } 64 } 65 66 public void doDisconnect() throws Exception 67 { 68 } 70 71 76 public void onEvent(UMOEvent event) throws UMOException 77 { 78 if (connector.isQueueEvents()) 79 { 80 QueueSession queueSession = connector.getQueueSession(); 81 Queue queue = queueSession.getQueue(endpoint.getEndpointURI().getAddress()); 82 try 83 { 84 queue.put(event); 85 } 86 catch (InterruptedException e) 87 { 88 throw new MuleException(new Message(Messages.INTERRUPTED_QUEUING_EVENT_FOR_X, 89 this.endpoint.getEndpointURI()), e); 90 } 91 } 92 else 93 { 94 UMOMessage msg = new MuleMessage(event.getTransformedMessage(), event.getMessage()); 95 synchronized (lock) 96 { 97 routeMessage(msg); 98 } 99 } 100 } 101 102 public Object onCall(UMOEvent event) throws UMOException 103 { 104 return routeMessage(new MuleMessage(event.getTransformedMessage(), event.getMessage()), 105 event.isSynchronous()); 106 } 107 108 protected List getMessages() throws Exception 109 { 110 QueueSession qs = connector.getQueueSession(); 111 Queue queue = qs.getQueue(endpoint.getEndpointURI().getAddress()); 112 UMOEvent event = (UMOEvent)queue.poll(connector.getQueueTimeout()); 113 if (event != null) 114 { 115 routeMessage(new MuleMessage(event.getTransformedMessage(), event.getMessage())); 116 } 117 return null; 118 } 119 120 125 protected void processMessage(Object msg) throws Exception 126 { 127 } 129 130 } 131 | Popular Tags |