1 package com.ubermq.jms.client.impl; 2 3 import EDU.oswego.cs.dl.util.concurrent.*; 4 import com.ubermq.jms.client.*; 5 import com.ubermq.jms.common.datagram.*; 6 import com.ubermq.jms.common.routing.*; 7 import com.ubermq.jms.common.routing.impl.*; 8 import com.ubermq.kernel.*; 9 import com.ubermq.kernel.overflow.*; 10 import java.io.*; 11 import java.util.*; 12 import javax.jms.*; 13 14 19 abstract class AbstractConsumer 20 implements javax.jms.MessageConsumer , 21 IDatagramEndpoint, 22 IMessageSender, 23 IAcknowledgeHandler 24 { 25 private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AbstractConsumer.class); 26 27 30 private String selectorValue; 31 private Selector selector; 32 private MessageListener messageListener; 33 34 37 private Session session; 38 private IClientProcessor proc; 39 private IDeliveryManager delivery; 40 41 private Channel receiveQueue; 42 private boolean isClosing = false; 43 44 private List pausedQueue; 45 private boolean isPaused = false; 46 47 55 AbstractConsumer(Session session, 56 String selector, 57 IDeliveryManager delivery, 58 Channel receiveQueue) 59 throws InvalidSelectorException 60 { 61 if (selector != null && 62 selector.length() > 0) 63 { 64 this.selectorValue = selector; 65 this.selector = new SimpleSelector(this.selectorValue); 66 } 67 this.session = session; 68 this.proc = session.conn.getClientProcessor(); 69 this.delivery = delivery; 70 71 this.receiveQueue = receiveQueue; 74 75 session.addConsumer(this); 77 } 78 79 public String getMessageSelector() {return this.selectorValue;} 80 81 84 protected Session getSession() {return session;} 85 86 89 protected Connection getConnection() {return session.conn;} 90 91 94 protected IClientProcessor getClientProcessor() {return proc;} 95 96 105 public MessageListener getMessageListener() throws JMSException 106 { 107 return messageListener; 108 } 109 110 123 124 public void setMessageListener( MessageListener listener ) 125 throws JMSException 126 { 127 messageListener = listener; 128 session.requestDeliveryThread(); 129 session.checkDeliveryThread(); 130 } 131 132 133 144 public javax.jms.Message receive() 145 throws JMSException 146 { 147 if(isClosing() || 148 session.isClosing()) 149 { 150 throw new javax.jms.IllegalStateException ("Subscriber is closed"); 151 } 152 153 try { 154 javax.jms.Message m = (javax.jms.Message )receiveQueue.take(); 155 return m; 156 } catch(InterruptedException ie) { 157 throw new JMSException(ie.toString()); 158 } 159 } 160 161 171 public javax.jms.Message receive( long timeout ) throws JMSException 172 { 173 if(isClosing() || 174 session.isClosing()) 175 { 176 throw new javax.jms.IllegalStateException ("Subscriber is closed"); 177 } 178 179 try { 180 javax.jms.Message m = (javax.jms.Message )receiveQueue.poll(timeout); 181 return m; 182 } catch(InterruptedException ie) { 183 throw new JMSException(ie.toString()); 184 } 185 } 186 187 195 public javax.jms.Message receiveNoWait() throws JMSException 196 { 197 if(isClosing() || 198 session.isClosing() ) 199 { 200 throw new javax.jms.IllegalStateException ("Subscriber is closed"); 201 } 202 203 try { 204 javax.jms.Message m = (javax.jms.Message )receiveQueue.poll(0); 205 return m; 206 } catch(InterruptedException ie) { 207 return null; 208 } 209 } 210 211 public void close() throws JMSException 212 { 213 isClosing = true; 214 session.removeConsumer(this); 215 } 216 217 220 boolean isClosing() 221 { 222 return isClosing; 223 } 224 225 228 public void deliver(IDatagram d) 229 { 230 IMessageDatagram md = (IMessageDatagram)d; 233 javax.jms.Message msg = LocalMessage.getMessage(md, this); 234 235 if (session.ackMode == TopicSession.AUTO_ACKNOWLEDGE) { 237 try { 238 internalAcknowledge(md); 239 } 240 catch (IOException e) { 241 } 243 } 244 245 if (selector != null && 247 !selector.accept(md)) 248 { 249 return; 250 } 251 252 delivery.deliver(md.getSenderId(), 254 md.getSequence(), 255 msg, 256 this); 257 } 258 259 272 protected void internalAcknowledge(IMessageDatagram md) 273 throws IOException 274 { 275 getConnection().output(getConnection().factories.ackFactory().ack(md.getIncomingMessageId()), 276 new ExponentialBackoff()); 277 } 278 279 282 public void acknowledge(IMessageDatagram md) 283 throws IOException, javax.jms.IllegalStateException 284 { 285 if (session.isClosing()) 286 throw new javax.jms.IllegalStateException ("session is closed"); 287 288 if (session.ackMode == TopicSession.AUTO_ACKNOWLEDGE) { 289 } else if (session.ackMode == TopicSession.CLIENT_ACKNOWLEDGE) { 291 internalAcknowledge(md); 292 } else if (session.ackMode == TopicSession.DUPS_OK_ACKNOWLEDGE) { 293 } 295 } 296 297 302 public void sendMessage(javax.jms.Message msg) 303 { 304 try { 305 if (messageListener != null) 306 session.asyncDelivery(msg, this, messageListener); 307 else { 308 receiveQueue.put(msg); 309 } 310 } catch(Exception ie) { 311 log.error("", ie); 312 } 313 } 314 315 319 synchronized void pause() 320 { 321 if (!isPaused) 322 { 323 isPaused = true; 324 325 pausedQueue = new ArrayList(); 327 328 try 329 { 330 Object o; 331 while((o = receiveQueue.poll(0)) != null) 332 { 333 pausedQueue.add(o); 334 } 335 } 336 catch (InterruptedException e) {} 337 } 338 } 339 340 344 synchronized void resume() 345 { 346 if (isPaused) 347 { 348 isPaused = false; 349 350 Iterator iter = pausedQueue.iterator(); 351 while (iter.hasNext()) 352 { 353 try 354 { 355 receiveQueue.offer(iter.next(), 0); 356 } 357 catch (InterruptedException e) {} 358 iter.remove(); 359 } 360 } 361 } 362 363 } 364 | Popular Tags |