1 18 package org.apache.activemq.transport.stomp; 19 20 import java.io.IOException ; 21 import java.util.Iterator ; 22 import java.util.LinkedHashMap ; 23 import java.util.Map ; 24 import java.util.Map.Entry; 25 26 import javax.jms.JMSException ; 27 28 import org.apache.activemq.command.ActiveMQDestination; 29 import org.apache.activemq.command.ActiveMQMessage; 30 import org.apache.activemq.command.ConsumerInfo; 31 import org.apache.activemq.command.MessageAck; 32 import org.apache.activemq.command.MessageDispatch; 33 import org.apache.activemq.command.MessageId; 34 35 40 public class StompSubscription { 41 42 public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO; 43 public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT; 44 45 private final ProtocolConverter protocolConverter; 46 private final String subscriptionId; 47 private final ConsumerInfo consumerInfo; 48 49 private final LinkedHashMap dispatchedMessage = new LinkedHashMap (); 50 51 private String ackMode = AUTO_ACK; 52 private ActiveMQDestination destination; 53 54 55 public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo) { 56 this.protocolConverter = stompTransport; 57 this.subscriptionId = subscriptionId; 58 this.consumerInfo = consumerInfo; 59 } 60 61 void onMessageDispatch(MessageDispatch md) throws IOException , JMSException { 62 63 ActiveMQMessage message = (ActiveMQMessage) md.getMessage(); 64 65 if (ackMode == CLIENT_ACK) { 66 synchronized (this) { 67 dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId()); 68 } 69 } else if (ackMode == AUTO_ACK) { 70 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 71 protocolConverter.getTransportFilter().sendToActiveMQ(ack); 72 } 73 74 StompFrame command = protocolConverter.convertMessage(message); 75 76 command.setAction(Stomp.Responses.MESSAGE); 77 if (subscriptionId!=null) { 78 command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId); 79 } 80 81 protocolConverter.getTransportFilter().sendToStomp(command); 82 } 83 84 synchronized MessageAck onStompMessageAck(String messageId) { 85 86 if( !dispatchedMessage.containsKey(messageId) ) { 87 return null; 88 } 89 90 MessageAck ack = new MessageAck(); 91 ack.setDestination(consumerInfo.getDestination()); 92 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 93 ack.setConsumerId(consumerInfo.getConsumerId()); 94 95 int count=0; 96 for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { 97 98 Map.Entry entry = (Entry) iter.next(); 99 String id = (String ) entry.getKey(); 100 MessageId msgid = (MessageId) entry.getValue(); 101 102 if( ack.getFirstMessageId()==null ) 103 ack.setFirstMessageId(msgid); 104 105 iter.remove(); 106 count++; 107 108 if( id.equals(messageId) ) { 109 ack.setLastMessageId(msgid); 110 break; 111 } 112 113 } 114 115 ack.setMessageCount(count); 116 return ack; 117 } 118 119 public String getAckMode() { 120 return ackMode; 121 } 122 123 public void setAckMode(String ackMode) { 124 this.ackMode = ackMode; 125 } 126 127 public String getSubscriptionId() { 128 return subscriptionId; 129 } 130 131 public void setDestination(ActiveMQDestination destination) { 132 this.destination = destination; 133 } 134 135 public ActiveMQDestination getDestination() { 136 return destination; 137 } 138 139 public ConsumerInfo getConsumerInfo() { 140 return consumerInfo; 141 } 142 143 } 144 | Popular Tags |