1 46 47 package org.mr.api.jms; 48 49 import java.util.ArrayList ; 50 51 import javax.jms.ConnectionConsumer ; 52 import javax.jms.Destination ; 53 import javax.jms.JMSException ; 54 import javax.jms.Message ; 55 import javax.jms.MessageListener ; 56 import javax.jms.ServerSession ; 57 import javax.jms.ServerSessionPool ; 58 import javax.jms.Session ; 59 60 import org.apache.commons.logging.Log; 61 import org.apache.commons.logging.LogFactory; 62 import org.mr.IMessageListener; 63 import org.mr.api.jms.selector.syntax.Selector; 64 import org.mr.core.protocol.MantaBusMessage; 65 66 73 public class MantaConnectionConsumer implements ConnectionConsumer , IMessageListener, MessageListener { 74 75 private Destination destination; 77 78 private String messageSelector; 80 81 protected boolean holdMessages; 83 84 private ArrayList msgs; 87 88 private Log log; 90 91 private MantaMessageConsumer consumer; 93 94 private ServerSessionPool sessionPool; 96 97 private MantaSession session; 99 100 private MantaConnection connection; 101 102 private boolean close = false; 104 105 106 115 protected MantaConnectionConsumer (MantaConnection connection, Destination dest, String msgSel, ServerSessionPool pool, int maxMsg) throws JMSException { 116 117 if (pool == null) { 118 throw new JMSException ("MNJMS00030 : APPLICATION SERVER ERROR. ServerSessionPool EMPTY. ERROR IN CONSTRUCTING MantaConnectionConsumer."); 119 } 120 121 new Selector(msgSel); 123 124 log = LogFactory.getLog("MantaConnectionConsumer"); 125 this.sessionPool = pool; 126 this.connection = connection; 127 this.destination = dest; 128 this.messageSelector = msgSel; 129 130 try { 131 session = (MantaSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 137 consumer = (MantaMessageConsumer)session.createConsumer(this.destination, this.messageSelector); 138 consumer.setMessageListener(this); 142 session.setBusMessageListener(this); 143 } 144 catch (JMSException e) { 145 this.close(); 146 throw new JMSException ("MNJMS00031 : APPLICATION SERVER ERROR. FAILED TO RECEIVE SESSION. ERROR IN CONSTRUCTING MantaConnectionConsumer."); 147 } 148 } 149 150 protected MantaConnectionConsumer (MantaConnection connection, Destination dest, String msgSel, ServerSessionPool pool, int maxMsg,boolean hold) throws JMSException { 151 this(connection,dest,msgSel,pool,maxMsg); 152 msgs = new ArrayList (); 153 holdMessages=hold; 154 } 155 156 157 160 public void close() throws JMSException { 161 162 if (close) { 163 return; 164 } 165 166 close = true; 168 169 if (session != null) { 171 session.close(); 172 session = null; 173 } 174 175 if (consumer != null) { 177 consumer.close(); 178 consumer = null; 179 } 180 181 messageSelector = null; 182 destination = null; 183 } 184 185 public void onMessage(MantaBusMessage message) { 188 try { 189 ServerSession serverSession = sessionPool.getServerSession(); 191 MantaSession session = (MantaSession)serverSession.getSession(); 192 session.addConsumerMessage(message); 193 serverSession.start(); 194 } 195 catch (JMSException e) { 196 log.error("Exception while forwarding a message to the Application Server.",e); 197 } 198 } 199 200 201 204 public ServerSessionPool getServerSessionPool() throws JMSException { 205 return this.sessionPool; 206 } 207 208 209 public void onMessage(Message message) { 213 } 215 216 217 public Message receive() { 218 return receive(0); 219 } 220 221 public MantaMessage receive(long timeout) { 222 MantaMessage mm = null; 223 224 if (timeout == 0) 225 timeout = Long.MAX_VALUE; 226 227 synchronized(msgs) { 228 try { 229 if (msgs.isEmpty()) { 230 msgs.wait(timeout); 231 } 232 } 233 catch (InterruptedException ie) { 234 } 236 if (!msgs.isEmpty()) { 237 mm = (MantaMessage) msgs.remove(0); 238 } 239 } 240 return mm; 241 } 242 } 243 | Popular Tags |