1 10 11 package org.mule.providers.jbi; 12 13 import org.mule.config.i18n.Message; 14 import org.mule.config.i18n.Messages; 15 import org.mule.impl.MuleMessage; 16 import org.mule.providers.AbstractMessageReceiver; 17 import org.mule.umo.UMOComponent; 18 import org.mule.umo.UMOException; 19 import org.mule.umo.UMOMessage; 20 import org.mule.umo.endpoint.UMOEndpoint; 21 import org.mule.umo.lifecycle.InitialisationException; 22 import org.mule.umo.lifecycle.LifecycleException; 23 import org.mule.umo.provider.UMOConnector; 24 25 import javax.jbi.component.ComponentContext; 26 import javax.jbi.messaging.DeliveryChannel; 27 import javax.jbi.messaging.ExchangeStatus; 28 import javax.jbi.messaging.Fault; 29 import javax.jbi.messaging.MessageExchange; 30 import javax.jbi.messaging.MessagingException; 31 import javax.jbi.messaging.NormalizedMessage; 32 import javax.resource.spi.work.Work ; 33 import javax.resource.spi.work.WorkException ; 34 35 43 public class JbiMessageReceiver extends AbstractMessageReceiver implements Work 44 { 45 46 protected ComponentContext context; 47 48 protected JbiConnector connector; 49 50 protected String name; 51 52 private DeliveryChannel deliveryChannel; 53 54 public JbiMessageReceiver(UMOConnector connector, UMOComponent component, UMOEndpoint endpoint) 55 throws InitialisationException 56 { 57 super(connector, component, endpoint); 58 name = component.getDescriptor().getName() + ".jbiReceiver"; 59 this.connector = (JbiConnector)connector; 60 context = this.connector.getComponentContext(); 61 deliveryChannel = this.connector.getDeliveryChannel(); 62 } 63 64 public void doConnect() throws Exception 65 { 66 } 68 69 public void doDisconnect() throws Exception 70 { 71 } 73 74 public void doStart() throws UMOException 75 { 76 try 77 { 78 getWorkManager().scheduleWork(this); 79 } 80 catch (WorkException e) 81 { 82 throw new LifecycleException(new Message(Messages.FAILED_TO_START_X, name), e, this); 83 } 84 } 85 86 public void release() 87 { 88 } 90 91 94 public void run() 95 { 96 while (connector.isStarted()) 97 { 98 try 99 { 100 final MessageExchange me = deliveryChannel.accept(); 101 if (me != null) 102 { 103 getWorkManager().scheduleWork(new MessageExchangeWorker(me)); 104 } 105 } 106 catch (Exception e) 107 { 108 handleException(e); 109 } 110 } 111 } 112 113 private class MessageExchangeWorker implements Work 114 { 115 private MessageExchange me; 116 117 public MessageExchangeWorker(MessageExchange me) 118 { 119 this.me = me; 120 } 121 122 public void release() 123 { 124 } 126 127 public void run() 128 { 129 try 130 { 131 try 132 { 133 NormalizedMessage nm = me.getMessage("IN"); 134 if (nm != null) 135 { 136 UMOMessage response = routeMessage(new MuleMessage(connector.getMessageAdapter(nm))); 137 if (response != null) 138 { 139 NormalizedMessage nmResposne = me.createMessage(); 140 JbiUtils.populateNormalizedMessage(response, nmResposne); 141 me.setMessage(nmResposne, "OUT"); 142 } 143 } 144 else 145 { 146 logger.debug("'IN' message on exchange was not set"); 147 } 148 149 done(me); 150 } 151 catch (MessagingException e) 152 { 153 error(me, e); 154 } 155 } 156 catch (Exception e) 157 { 158 handleException(e); 159 } 160 } 161 } 162 163 protected void error(MessageExchange me, Exception e) throws MessagingException 164 { 165 if (e instanceof Fault) 166 { 167 me.setFault((Fault)e); 168 } 169 else 170 { 171 me.setError(e); 172 } 173 me.setStatus(ExchangeStatus.ERROR); 174 deliveryChannel.send(me); 175 } 176 177 protected void done(MessageExchange me) throws MessagingException 178 { 179 me.setStatus(ExchangeStatus.DONE); 180 deliveryChannel.send(me); 181 } 182 } 183 | Popular Tags |