1 46 47 package org.mr.ra.inbound; 48 49 import java.lang.reflect.Method ; 50 51 import javax.jms.JMSException ; 52 import javax.jms.Message ; 53 import javax.jms.MessageListener ; 54 import javax.jms.MessageProducer ; 55 import javax.jms.ServerSession ; 56 import javax.jms.Session ; 57 import javax.resource.spi.endpoint.MessageEndpoint ; 58 import javax.resource.spi.work.Work ; 59 import javax.resource.spi.work.WorkException ; 60 import javax.resource.spi.work.WorkManager ; 61 62 import org.apache.commons.logging.Log; 63 import org.apache.commons.logging.LogFactory; 64 import org.mr.api.jms.MantaSession; 65 import org.mr.api.jms.MantaSession.DeliveryListener; 66 67 70 public class ServerSessionImpl implements ServerSession , 71 SessionAndProducer, 72 Work , 73 DeliveryListener { 74 75 public static final Method ON_MESSAGE_METHOD; 76 77 static { 78 try { 79 ON_MESSAGE_METHOD = MessageListener .class.getMethod("onMessage", new Class []{Message .class}); 80 } 81 catch (Exception e) { 82 throw new ExceptionInInitializerError (e); 83 } 84 } 85 86 private static int nextLogId = 0; 87 88 synchronized static private int getNextLogId() { 89 return nextLogId++; 90 } 91 92 private int serverSessionId = getNextLogId(); 93 94 private final Log log = LogFactory.getLog(ServerSessionImpl.class.getName()+":"+serverSessionId); 95 96 private MantaSession session; 97 private WorkManager workManager; 98 private MessageEndpoint endpoint; 99 private MessageProducer messageProducer; 100 private final ServerSessionPoolImpl pool; 101 102 private Object runControlMutex = new Object (); 103 private boolean runningFlag = false; 104 108 private boolean stale; 109 112 private final boolean useRAManagedTx; 113 116 private final int batchSize; 117 120 private int currentBatchSize; 121 122 public ServerSessionImpl(ServerSessionPoolImpl pool, 123 MantaSession session, 124 WorkManager workManager, 125 MessageEndpoint endpoint, 126 boolean useRAManagedTx, 127 int batchSize) throws JMSException { 128 this.pool = pool; 129 this.session = session; 130 this.workManager = workManager; 131 this.endpoint = endpoint; 132 this.useRAManagedTx = useRAManagedTx; 133 this.session.setMessageListener((MessageListener ) endpoint); 134 this.session.setDeliveryListener(this); 135 this.batchSize = batchSize; 136 } 137 138 public Session getSession() throws JMSException { 139 return session; 140 } 141 142 public MessageProducer getMessageProducer() throws JMSException { 143 if (messageProducer == null) { 144 messageProducer = getSession().createProducer(null); 145 } 146 return messageProducer; 147 } 148 149 152 public void start() throws JMSException { 153 synchronized (runControlMutex) { 154 if (runningFlag) { 155 log.debug("Start request ignored, allready running."); 156 return; 157 } 158 runningFlag = true; 159 } 160 161 log.debug("Starting run."); 163 try { 164 workManager.scheduleWork(this, WorkManager.INDEFINITE, null, null); 165 } 166 catch (WorkException e) { 167 throw (JMSException ) new JMSException ("Start failed: " + e).initCause(e); 168 } 169 } 170 171 174 synchronized public void run() { 175 while (true) { 176 try { 177 SessionAndProducerHelper.register(this); 178 currentBatchSize = 0; 179 session.run(); 180 } 181 catch (Throwable e) { 182 stale = true; 183 } 184 finally { 185 SessionAndProducerHelper.unregister(this); 186 synchronized (runControlMutex) { 187 if (stale) { 189 log.debug("Session stale - removing from pool"); 190 runningFlag = false; 191 pool.removeFromPool(this); 192 break; 193 } 194 if (!session.hasConsumerMessages()) { 195 runningFlag = false; 196 pool.returnToPool(this); 197 break; 198 } 199 } 200 } 201 } 202 } 203 204 205 209 public void beforeDelivery(MantaSession session, Message msg) { 210 if (currentBatchSize == 0) { 211 try { 212 endpoint.beforeDelivery(ON_MESSAGE_METHOD); 213 } catch (Throwable e) { 214 throw new RuntimeException ("Endpoint before delivery notification failure", e); 215 } 216 } 217 } 218 219 223 public void afterDelivery(MantaSession session, Message msg) { 224 if (++currentBatchSize >= batchSize || !session.hasConsumerMessages()) { 225 currentBatchSize = 0; 226 try { 227 endpoint.afterDelivery(); 228 } catch (Throwable e) { 229 throw new RuntimeException ("Endpoint after delivery notification failure", e); 230 } 231 finally { 233 if (session.getTransactionContext().isInLocalTransaction()) { 234 if (!useRAManagedTx) { 235 log.warn("Local transaction had not been commited. Commiting now."); 238 } 239 try { 240 session.commit(); 241 } catch (JMSException e) { 242 log.info("Commit failed:", e); 243 } 244 } 245 } 246 } 247 } 248 249 252 public void release() { 253 log.debug("release called"); 254 } 255 256 259 public String toString() { 260 return "ServerSessionImpl:"+serverSessionId; 261 } 262 263 public void close() { 264 try { 265 endpoint.release(); 266 } catch (Throwable e) { 267 log.debug("Endpoint did not release properly: "+e,e); 268 } 269 try { 270 session.close(); 271 } catch (Throwable e) { 272 log.debug("Session did not close properly: "+e,e); 273 } 274 } 275 276 } 277 | Popular Tags |