1 10 11 package org.mule.providers.space; 12 13 import org.mule.config.i18n.Message; 14 import org.mule.config.i18n.Messages; 15 import org.mule.impl.MuleMessage; 16 import org.mule.providers.AbstractMessageReceiver; 17 import org.mule.providers.ConnectException; 18 import org.mule.umo.UMOComponent; 19 import org.mule.umo.UMOMessage; 20 import org.mule.umo.endpoint.UMOEndpoint; 21 import org.mule.umo.lifecycle.InitialisationException; 22 import org.mule.umo.provider.UMOConnector; 23 import org.mule.umo.provider.UMOMessageAdapter; 24 import org.mule.umo.space.UMOSpace; 25 import org.mule.umo.space.UMOSpaceException; 26 27 import javax.resource.spi.work.Work ; 28 import javax.resource.spi.work.WorkException ; 29 import javax.resource.spi.work.WorkManager ; 30 import java.util.Properties ; 31 32 36 public class SpaceMessageReceiver extends AbstractMessageReceiver implements Work 37 { 38 39 private UMOSpace space; 40 private SpaceConnector connector; 41 42 public SpaceMessageReceiver(UMOConnector connector, UMOComponent component, UMOEndpoint endpoint) 43 throws InitialisationException 44 { 45 super(connector, component, endpoint); 46 this.connector = (SpaceConnector)connector; 47 } 48 49 public void doConnect() throws ConnectException 50 { 51 String destination = endpoint.getEndpointURI().getAddress(); 52 53 Properties props = new Properties (); 54 props.putAll(endpoint.getProperties()); 55 try 56 { 57 logger.info("Connecting to space: " + destination); 58 space = connector.getSpace(endpoint); 59 } 60 catch (UMOSpaceException e) 61 { 62 throw new ConnectException(new Message("space", 1, destination), e, this); 63 } 64 try 65 { 66 getWorkManager().scheduleWork(this, WorkManager.INDEFINITE, null, connector); 67 } 68 catch (WorkException e) 69 { 70 throw new ConnectException(new Message(Messages.FAILED_TO_SCHEDULE_WORK), e, this); 71 } 72 } 73 74 public void doDisconnect() throws ConnectException 75 { 76 } 79 80 public void run() 81 { 82 while (!disposing.get()) 83 { 84 if (connector.isStarted() && !disposing.get()) 85 { 86 if (logger.isTraceEnabled()) 87 { 88 logger.trace("Receiver starting on space: " + space); 89 } 90 91 try 92 { 93 Object message = space.take(Long.MAX_VALUE); 94 Work work = createWork(space, message); 95 try 96 { 97 getWorkManager().scheduleWork(work, WorkManager.INDEFINITE, null, connector); 98 } 99 catch (WorkException e) 100 { 101 logger.error("GS Server receiver Work was not processed: " + e.getMessage(), e); 102 } 103 } 104 catch (Exception e) 105 { 106 handleException(e); 107 } 108 109 } 110 } 111 } 112 113 public void release() 114 { 115 } 117 118 protected void doDispose() 119 { 120 } 122 123 protected Work createWork(UMOSpace space, Object message) throws Exception 124 { 125 return new SpaceWorker(space, message); 126 } 127 128 protected class SpaceWorker implements Work 129 { 130 private UMOSpace space; 131 private Object message; 132 133 public SpaceWorker(UMOSpace space, Object message) 134 { 135 this.space = space; 136 this.message = message; 137 } 138 139 public void release() 140 { 141 } 143 144 147 public void run() 148 { 149 try 150 { 151 if (logger.isTraceEnabled()) 152 { 153 logger.trace("worker listening on space " + space); 154 } 155 156 UMOMessageAdapter adapter = connector.getMessageAdapter(message); 158 UMOMessage returnMessage = routeMessage(new MuleMessage(adapter), endpoint.isSynchronous()); 159 163 } 164 catch (Exception e) 165 { 166 handleException(e); 167 } 168 finally 169 { 170 release(); 171 } 172 } 173 } 174 } 175 | Popular Tags |