1 10 11 package org.mule.providers; 12 13 import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; 14 15 import javax.resource.spi.work.Work ; 16 17 import java.util.Iterator ; 18 import java.util.List ; 19 20 import org.mule.config.ThreadingProfile; 21 import org.mule.transaction.TransactionCallback; 22 import org.mule.transaction.TransactionTemplate; 23 import org.mule.umo.UMOComponent; 24 import org.mule.umo.UMOException; 25 import org.mule.umo.endpoint.UMOEndpoint; 26 import org.mule.umo.lifecycle.InitialisationException; 27 import org.mule.umo.provider.UMOConnector; 28 29 38 public abstract class TransactedPollingMessageReceiver extends PollingMessageReceiver 39 { 40 41 protected boolean receiveMessagesInTransaction = true; 42 43 44 protected boolean useMultipleReceivers = true; 45 46 public TransactedPollingMessageReceiver(UMOConnector connector, 47 UMOComponent component, 48 final UMOEndpoint endpoint, 49 Long frequency) throws InitialisationException 50 { 51 super(connector, component, endpoint, frequency); 52 53 if (endpoint.getTransactionConfig().getFactory() != null) 54 { 55 receiveMessagesInTransaction = true; 56 } 57 else 58 { 59 receiveMessagesInTransaction = false; 60 } 61 } 62 63 public void doStart() throws UMOException 64 { 65 useMultipleReceivers = connector.isCreateMultipleTransactedReceivers(); 67 ThreadingProfile tp = connector.getReceiverThreadingProfile(); 68 if (useMultipleReceivers && receiveMessagesInTransaction && tp.isDoThreading()) 69 { 70 for (int i = 0; i < tp.getMaxThreadsActive(); i++) 71 { 72 super.doStart(); 73 } 74 } 75 else 76 { 77 super.doStart(); 78 } 79 } 80 81 public void poll() throws Exception 82 { 83 TransactionTemplate tt = new TransactionTemplate(endpoint.getTransactionConfig(), 84 connector.getExceptionListener()); 85 if (receiveMessagesInTransaction) 86 { 87 TransactionCallback cb = new TransactionCallback() 91 { 92 public Object doInTransaction() throws Exception 93 { 94 List messages = getMessages(); 95 if (messages != null && messages.size() > 0) 96 { 97 for (Iterator it = messages.iterator(); it.hasNext();) 98 { 99 Object message = it.next(); 100 if (logger.isTraceEnabled()) 101 { 102 logger.trace("Received Message: " + message); 103 } 104 processMessage(message); 105 } 106 } 107 return null; 108 } 109 }; 110 tt.execute(cb); 111 } 112 else 113 { 114 List messages = getMessages(); 117 if (messages != null && messages.size() > 0) 118 { 119 final CountDownLatch countdown = new CountDownLatch(messages.size()); 120 for (Iterator it = messages.iterator(); it.hasNext();) 121 { 122 final Object message = it.next(); 123 if (logger.isTraceEnabled()) 124 { 125 logger.trace("Received Message: " + message); 126 } 127 try 128 { 129 getWorkManager().scheduleWork(new MessageProcessorWorker(tt, countdown, message)); 130 } 131 catch (Exception e) 132 { 133 countdown.countDown(); 134 throw e; 135 } 136 } 137 countdown.await(); 138 } 139 } 140 } 141 142 protected class MessageProcessorWorker implements Work , TransactionCallback 143 { 144 private TransactionTemplate tt; 145 private Object message; 146 private CountDownLatch latch; 147 148 public MessageProcessorWorker(TransactionTemplate tt, CountDownLatch latch, Object message) 149 { 150 this.tt = tt; 151 this.message = message; 152 this.latch = latch; 153 } 154 155 public void release() 156 { 157 } 159 160 public void run() 161 { 162 try 163 { 164 tt.execute(this); 165 } 166 catch (Exception e) 167 { 168 handleException(e); 169 } 170 finally 171 { 172 latch.countDown(); 173 } 174 } 175 176 public Object doInTransaction() throws Exception 177 { 178 processMessage(message); 179 return null; 180 } 181 182 } 183 184 protected abstract List getMessages() throws Exception ; 185 186 protected abstract void processMessage(Object message) throws Exception ; 187 188 } 189 | Popular Tags |