1 23 package org.objectweb.joram.client.jms; 24 25 import java.util.Vector ; 26 27 import javax.jms.JMSException ; 28 import javax.jms.MessageListener ; 29 import javax.jms.ServerSession ; 30 import javax.jms.ServerSessionPool ; 31 32 import org.objectweb.joram.client.jms.connection.RequestMultiplexer; 33 import org.objectweb.joram.shared.client.ConsumerMessages; 34 import org.objectweb.util.monolog.api.BasicLevel; 35 import org.objectweb.util.monolog.api.Logger; 36 37 import fr.dyade.aaa.util.Daemon; 38 import fr.dyade.aaa.util.Debug; 39 import fr.dyade.aaa.util.Queue; 40 41 47 public class MultiSessionConsumer extends MessageConsumerListener 48 implements javax.jms.ConnectionConsumer { 49 50 private static final Logger logger = 51 Debug.getLogger(MultiSessionConsumer.class.getName()); 52 53 private ServerSessionPool sessPool; 54 55 private Connection cnx; 56 57 private int maxMsgs; 58 59 private Queue repliesIn; 60 61 65 private int nbActivatedListeners; 66 67 private MessageDispatcher msgDispatcher; 68 69 79 MultiSessionConsumer( 80 boolean queueMode, 81 boolean durable, 82 String selector, 83 String targetName, 84 ServerSessionPool sessionPool, 85 int queueMessageReadMax, 86 int topicActivationThreshold, int topicPassivationThreshold, 87 int topicAckBufferMax, 88 RequestMultiplexer reqMultiplexer, 89 Connection connection, 90 int maxMessages) { 91 super(queueMode, durable, selector, targetName, 92 null, queueMessageReadMax, 93 topicActivationThreshold, 94 topicPassivationThreshold, topicAckBufferMax, 95 reqMultiplexer); 96 if (logger.isLoggable(BasicLevel.DEBUG)) 97 logger.log(BasicLevel.DEBUG, "MultiSessionConsumer.<init>(" + 98 queueMode + ',' + durable + ',' + selector + ',' + 99 targetName + ',' + sessionPool + ',' + 100 queueMessageReadMax + ',' + 101 topicActivationThreshold + ',' + topicPassivationThreshold + ',' + 102 topicAckBufferMax + ',' + 103 reqMultiplexer + ',' + maxMessages + ')'); 104 sessPool = sessionPool; 105 cnx = connection; 106 maxMsgs = maxMessages; 107 msgDispatcher = new MessageDispatcher( 108 "MessageDispatcher[" + reqMultiplexer.getDemultiplexerDaemonName() + ']'); 109 repliesIn = new Queue(); 110 msgDispatcher.setDaemon(true); 111 msgDispatcher.start(); 112 } 113 114 117 public void pushMessages(ConsumerMessages cm) throws JMSException { 118 if (logger.isLoggable(BasicLevel.DEBUG)) 119 logger.log(BasicLevel.DEBUG, "MultiSessionConsumer.pushMessages(" + cm + ')'); 120 repliesIn.push(cm); 121 } 122 123 126 public ServerSessionPool getServerSessionPool() throws JMSException { 127 return sessPool; 128 } 129 130 public void close() throws JMSException { 131 if (logger.isLoggable(BasicLevel.DEBUG)) 132 logger.log(BasicLevel.DEBUG, "MultiSessionConsumer.close()"); 133 msgDispatcher.stop(); 134 135 if (logger.isLoggable(BasicLevel.DEBUG)) 136 logger.log(BasicLevel.DEBUG, 137 "MultiSessionConsumer -> dispatcher stopped"); 138 139 super.close(); 140 141 if (logger.isLoggable(BasicLevel.DEBUG)) 142 logger.log(BasicLevel.DEBUG, 143 "MultiSessionConsumer -> close connection consumer"); 144 145 cnx.closeConnectionConsumer(this); 146 147 if (logger.isLoggable(BasicLevel.DEBUG)) 148 logger.log(BasicLevel.DEBUG, 149 "MultiSessionConsumer -> connection consumer closed"); 150 } 151 152 public void onMessage( 153 Message msg, MessageListener listener, int ackMode) 154 throws JMSException { 155 if (logger.isLoggable(BasicLevel.DEBUG)) 156 logger.log(BasicLevel.DEBUG, "MultiSessionConsumer.onMessage(" + msg + ')'); 157 try { 158 synchronized (this) { 159 if (getStatus() == Status.CLOSE) { 160 throw new javax.jms.IllegalStateException ("Message listener closed"); 161 } else { 162 if (nbActivatedListeners == 0) { 163 setStatus(Status.ON_MSG); 164 } 165 nbActivatedListeners++; 166 } 167 } 168 activateListener(msg, listener, ackMode); 169 } finally { 170 synchronized (this) { 171 nbActivatedListeners--; 172 if (nbActivatedListeners == 0) { 173 setStatus(Status.RUN); 174 notifyAll(); 177 } 178 } 179 } 180 } 181 182 class MessageDispatcher extends Daemon { 183 184 MessageDispatcher(String name) { 185 super(name); 186 } 187 188 191 protected void close() { 192 194 } 195 196 199 protected void shutdown() { 200 202 } 203 204 207 public void stop() { 208 if (logger.isLoggable(BasicLevel.DEBUG)) 209 logger.log(BasicLevel.DEBUG, "MessageDispatcher.stop()"); 210 if (isCurrentThread()) { 211 finish(); 212 } else { 213 super.stop(); 214 } 215 } 216 217 220 public void run() { 221 try { 222 while (running) { 223 canStop = true; 224 ConsumerMessages cm = (ConsumerMessages) repliesIn.get(); 225 canStop = false; 226 227 Vector msgs = cm.getMessages(); 228 int sessionMsgCounter = maxMsgs + 1; 229 ServerSession serverSess = null; 230 Session sess = null; 231 for (int i = 0; i < msgs.size(); i++) { 232 if (sessionMsgCounter > maxMsgs) { 233 if (serverSess != null) 234 serverSess.start(); 235 serverSess = sessPool.getServerSession(); 236 Object obj = serverSess.getSession(); 239 if (obj instanceof Session) { 240 sess = (Session) obj; 241 } else if (obj instanceof XASession) { 242 sess = ((XASession) obj).sess; 243 } else { 244 throw new Error ("Unexpected session type: " + obj); 245 } 246 sess.setMessageConsumerListener(MultiSessionConsumer.this); 247 sessionMsgCounter = 1; 248 } 249 sess.onMessage((org.objectweb.joram.shared.messages.Message) msgs.get(i)); 250 sessionMsgCounter++; 251 } 252 serverSess.start(); 253 repliesIn.pop(); 254 } 255 } catch (InterruptedException exc) { 256 if (logger.isLoggable(BasicLevel.DEBUG)) { 257 logger.log(BasicLevel.DEBUG, "", exc); 258 } 259 } catch (Exception exc) { 260 if (logger.isLoggable(BasicLevel.DEBUG)) { 261 logger.log(BasicLevel.DEBUG, "", exc); 262 } 263 try { 264 MultiSessionConsumer.this.close(); 265 } catch (JMSException exc2) { 266 } 267 } finally { 268 finish(); 269 } 270 } 271 } 272 } 273 | Popular Tags |