1 18 package org.apache.activemq.ra; 19 20 import java.lang.reflect.Method ; 21 22 import javax.jms.JMSException ; 23 import javax.jms.Message ; 24 import javax.jms.MessageListener ; 25 import javax.jms.MessageProducer ; 26 import javax.jms.ServerSession ; 27 import javax.jms.Session ; 28 import javax.resource.spi.endpoint.MessageEndpoint ; 29 import javax.resource.spi.work.Work ; 30 import javax.resource.spi.work.WorkEvent ; 31 import javax.resource.spi.work.WorkException ; 32 import javax.resource.spi.work.WorkListener ; 33 import javax.resource.spi.work.WorkManager ; 34 35 import org.apache.activemq.ActiveMQSession; 36 import org.apache.activemq.TransactionContext; 37 import org.apache.activemq.ActiveMQSession.DeliveryListener; 38 import org.apache.commons.logging.Log; 39 import org.apache.commons.logging.LogFactory; 40 41 44 public class ServerSessionImpl implements ServerSession , InboundContext, Work , DeliveryListener { 45 46 public static final Method ON_MESSAGE_METHOD; 47 48 static { 49 try { 50 ON_MESSAGE_METHOD = MessageListener .class.getMethod("onMessage", new Class []{Message .class}); 51 } 52 catch (Exception e) { 53 throw new ExceptionInInitializerError (e); 54 } 55 } 56 57 private static int nextLogId=0; 58 synchronized static private int getNextLogId() { 59 return nextLogId++; 60 } 61 62 private int serverSessionId = getNextLogId(); 63 private final Log log = LogFactory.getLog( ServerSessionImpl.class.getName()+":"+serverSessionId ); 64 65 private ActiveMQSession session; 66 private WorkManager workManager; 67 private MessageEndpoint endpoint; 68 private MessageProducer messageProducer; 69 private final ServerSessionPoolImpl pool; 70 71 private Object runControlMutex = new Object (); 72 private boolean runningFlag = false; 73 77 private boolean stale; 78 81 private final boolean useRAManagedTx; 82 85 private final int batchSize; 86 89 private int currentBatchSize; 90 91 public ServerSessionImpl(ServerSessionPoolImpl pool, ActiveMQSession session, WorkManager workManager, MessageEndpoint endpoint, boolean useRAManagedTx, int batchSize) throws JMSException { 92 this.pool = pool; 93 this.session = session; 94 this.workManager = workManager; 95 this.endpoint = endpoint; 96 this.useRAManagedTx = useRAManagedTx; 97 this.session.setMessageListener((MessageListener ) endpoint); 98 this.session.setDeliveryListener(this); 99 this.batchSize = batchSize; 100 } 101 102 public Session getSession() throws JMSException { 103 return session; 104 } 105 106 public MessageProducer getMessageProducer() throws JMSException { 107 if (messageProducer == null) { 108 messageProducer = getSession().createProducer(null); 109 } 110 return messageProducer; 111 } 112 113 116 public void start() throws JMSException { 117 118 synchronized (runControlMutex) { 119 if (runningFlag) { 120 log.debug("Start request ignored, already running."); 121 return; 122 } 123 runningFlag = true; 124 } 125 126 log.debug("Starting run."); 128 try { 129 workManager.scheduleWork(this, WorkManager.INDEFINITE, null, 130 new WorkListener () { 131 public void workAccepted(WorkEvent event) { 133 log.debug("Work accepted: " + event); 134 } 135 136 public void workRejected(WorkEvent event) { 137 log.debug("Work rejected: " + event); 138 } 139 140 public void workStarted(WorkEvent event) { 141 log.debug("Work started: " + event); 142 } 143 144 public void workCompleted(WorkEvent event) { 145 log.debug("Work completed: " + event); 146 } 147 148 }); 149 } 150 catch (WorkException e) { 151 throw (JMSException ) new JMSException ("Start failed: " + e).initCause(e); 152 } 153 } 154 155 158 public void run() { 159 log.debug("Running"); 160 while (true) { 161 log.debug("run loop start"); 162 try { 163 InboundContextSupport.register(this); 164 currentBatchSize = 0; 165 session.run(); 166 } 167 catch (Throwable e) { 168 stale=true; 169 log.debug("Endpoint failed to process message.", e); 170 log.info("Endpoint failed to process message. Reason: " + e); 171 } 172 finally { 173 InboundContextSupport.unregister(this); 174 log.debug("run loop end"); 175 synchronized (runControlMutex) { 176 if( stale) { 178 runningFlag = false; 179 pool.removeFromPool(this); 180 break; 181 } 182 if( !session.hasUncomsumedMessages() ) { 183 runningFlag = false; 184 pool.returnToPool(this); 185 break; 186 } 187 } 188 } 189 } 190 log.debug("Run finished"); 191 } 192 193 194 198 public void beforeDelivery(ActiveMQSession session, Message msg) { 199 if (currentBatchSize == 0) { 200 try { 201 endpoint.beforeDelivery(ON_MESSAGE_METHOD); 202 } catch (Throwable e) { 203 throw new RuntimeException ("Endpoint before delivery notification failure", e); 204 } 205 } 206 } 207 208 212 public void afterDelivery(ActiveMQSession session, Message msg) { 213 if (++currentBatchSize >= batchSize || !session.hasUncomsumedMessages()) { 214 currentBatchSize = 0; 215 try { 216 endpoint.afterDelivery(); 217 } catch (Throwable e) { 218 throw new RuntimeException ("Endpoint after delivery notification failure", e); 219 } finally { 220 TransactionContext transactionContext = session.getTransactionContext(); 221 if( transactionContext != null && transactionContext.isInLocalTransaction() ) { 222 if( !useRAManagedTx ) { 223 log.warn("Local transaction had not been commited. Commiting now."); 226 } 227 try { 228 session.commit(); 229 } catch (JMSException e) { 230 log.info("Commit failed:", e); 231 } 232 } 233 } 234 } 235 } 236 237 240 public void release() { 241 log.debug("release called"); 242 } 243 244 247 public String toString() { 248 return "ServerSessionImpl:"+serverSessionId; 249 } 250 251 public void close() { 252 try { 253 endpoint.release(); 254 } catch (Throwable e) { 255 log.debug("Endpoint did not release properly: "+e,e); 256 } 257 try { 258 session.close(); 259 } catch (Throwable e) { 260 log.debug("Session did not close properly: "+e,e); 261 } 262 } 263 264 } 265 | Popular Tags |