1 17 package org.apache.servicemix.tck; 18 19 import org.apache.commons.logging.Log; 20 import org.apache.commons.logging.LogFactory; 21 22 import javax.jbi.JBIException; 23 import javax.jbi.component.ComponentContext; 24 import javax.jbi.component.ComponentLifeCycle; 25 import javax.jbi.messaging.DeliveryChannel; 26 import javax.jbi.messaging.ExchangeStatus; 27 import javax.jbi.messaging.MessageExchange; 28 import javax.jbi.messaging.MessagingException; 29 import javax.jbi.messaging.NormalizedMessage; 30 import javax.management.ObjectName ; 31 import javax.xml.namespace.QName ; 32 33 39 public class AsyncReceiverPojo implements ComponentLifeCycle, Receiver, Runnable { 40 41 public static final QName SERVICE = ReceiverPojo.SERVICE; 42 public static final String ENDPOINT = ReceiverPojo.ENDPOINT; 43 44 private static final Log log = LogFactory.getLog(AsyncReceiverPojo.class); 45 46 private ComponentContext context; 47 private MessageList messageList = new MessageList(); 48 private Thread runner; 49 private boolean running; 50 51 52 public void init(ComponentContext context) throws JBIException { 55 this.context = context; 56 context.activateEndpoint(SERVICE, ENDPOINT); 57 } 58 59 public void shutDown() throws JBIException { 60 } 61 62 public synchronized void start() throws JBIException { 63 if (!running) { 64 running = true; 65 runner = new Thread (this); 66 runner.start(); 67 } 68 } 69 70 public synchronized void stop() throws JBIException { 71 running = false; 72 } 73 74 public ObjectName getExtensionMBeanName() { 75 return null; 76 } 77 78 79 public MessageList getMessageList() { 82 return messageList; 83 } 84 85 public void run() { 88 while (running) { 89 try { 90 DeliveryChannel deliveryChannel = context.getDeliveryChannel(); 91 log.info("about to do an accept on deliveryChannel: " + deliveryChannel); 92 MessageExchange messageExchange = deliveryChannel.accept(); 93 log.info("received me: " + messageExchange); 94 onMessageExchange(messageExchange); 95 } 96 catch (MessagingException e) { 97 log.error("Failed to process inbound messages: " + e, e); 98 } 99 } 100 } 101 102 public void onMessageExchange(MessageExchange exchange) throws MessagingException { 103 NormalizedMessage inMessage = exchange.getMessage("in"); 104 messageList.addMessage(inMessage); 105 exchange.setStatus(ExchangeStatus.DONE); 106 context.getDeliveryChannel().send(exchange); 107 } 108 109 public ComponentContext getContext() { 110 return context; 111 } 112 } 113 | Popular Tags |