1 4 package org.jfox.jms.connector; 5 6 import java.util.ArrayList ; 7 import java.util.Collections ; 8 import java.util.HashMap ; 9 import java.util.List ; 10 import java.util.Map ; 11 import javax.jms.JMSException ; 12 13 import org.jfox.jms.message.JMSMessage; 14 15 18 19 public class ConsumerMeta { 20 21 private String consumerId; 22 23 private SessionMeta sessionMeta; 24 25 private List <JMSMessage> messages = new ArrayList <JMSMessage>(); 26 private Map <String , JMSMessage> unackMessages = new HashMap <String , JMSMessage>(); 27 28 private boolean async = false; 29 30 public ConsumerMeta(String consumerId, SessionMeta sessionMeta) { 31 this.consumerId = consumerId; 32 this.sessionMeta = sessionMeta; 33 } 34 35 public String getSessionId() { 36 return sessionMeta.getSessionId(); 37 } 38 39 public void addMessage(JMSMessage message) { 40 messages.add(message); 41 if (this.isAsync()) { 43 synchronized (sessionMeta) { 44 sessionMeta.notifyAll(); 45 } 46 } 47 } 48 49 public synchronized JMSMessage popMessage() throws JMSException { 50 Collections.sort(messages); 51 JMSMessage message = null; 52 while (!messages.isEmpty()) { 53 JMSMessage msg = messages.remove(0); 54 if (msg.getJMSExpiration() == 0 || (System.currentTimeMillis() < msg.getJMSExpiration())) { 56 message = msg; 57 unackMessages.put(message.getJMSMessageID(), message); 59 break; 60 } 61 } 62 63 return message; 64 65 } 66 67 public String getConsumerId() { 68 return consumerId; 69 } 70 71 public void setAsync(boolean isAsync) { 72 this.async = isAsync; 73 } 74 75 public boolean isAsync() { 76 return async; 77 } 78 79 public boolean hasMessage() { 80 return !messages.isEmpty(); 81 } 82 83 public void acknowlege(String messageId) throws JMSException { 84 if (!unackMessages.containsKey(messageId)) { 85 throw new JMSException ("message " + messageId + " not exists."); 86 } 87 unackMessages.remove(messageId); 88 } 89 90 public static void main(String [] args) { 91 92 } 93 } 94 95 | Popular Tags |