KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > stomp > StompSubscription


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.stomp;
19
20 import java.io.IOException JavaDoc;
21 import java.util.Iterator JavaDoc;
22 import java.util.LinkedHashMap JavaDoc;
23 import java.util.Map JavaDoc;
24 import java.util.Map.Entry;
25
26 import javax.jms.JMSException JavaDoc;
27
28 import org.apache.activemq.command.ActiveMQDestination;
29 import org.apache.activemq.command.ActiveMQMessage;
30 import org.apache.activemq.command.ConsumerInfo;
31 import org.apache.activemq.command.MessageAck;
32 import org.apache.activemq.command.MessageDispatch;
33 import org.apache.activemq.command.MessageId;
34
35 /**
36  * Keeps track of the STOMP susbscription so that acking is correctly done.
37  *
38  * @author <a HREF="http://hiramchirino.com">chirino</a>
39  */

40 public class StompSubscription {
41     
42     public static final String JavaDoc AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
43     public static final String JavaDoc CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
44
45     private final ProtocolConverter protocolConverter;
46     private final String JavaDoc subscriptionId;
47     private final ConsumerInfo consumerInfo;
48     
49     private final LinkedHashMap JavaDoc dispatchedMessage = new LinkedHashMap JavaDoc();
50     
51     private String JavaDoc ackMode = AUTO_ACK;
52     private ActiveMQDestination destination;
53
54     
55     public StompSubscription(ProtocolConverter stompTransport, String JavaDoc subscriptionId, ConsumerInfo consumerInfo) {
56         this.protocolConverter = stompTransport;
57         this.subscriptionId = subscriptionId;
58         this.consumerInfo = consumerInfo;
59     }
60
61     void onMessageDispatch(MessageDispatch md) throws IOException JavaDoc, JMSException JavaDoc {
62
63         ActiveMQMessage message = (ActiveMQMessage) md.getMessage();
64         
65         if (ackMode == CLIENT_ACK) {
66             synchronized (this) {
67                 dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId());
68             }
69         } else if (ackMode == AUTO_ACK) {
70             MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
71             protocolConverter.getTransportFilter().sendToActiveMQ(ack);
72         }
73         
74         StompFrame command = protocolConverter.convertMessage(message);
75         
76         command.setAction(Stomp.Responses.MESSAGE);
77         if (subscriptionId!=null) {
78             command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
79         }
80         
81         protocolConverter.getTransportFilter().sendToStomp(command);
82     }
83     
84     synchronized MessageAck onStompMessageAck(String JavaDoc messageId) {
85         
86         if( !dispatchedMessage.containsKey(messageId) ) {
87             return null;
88         }
89         
90         MessageAck ack = new MessageAck();
91         ack.setDestination(consumerInfo.getDestination());
92         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
93         ack.setConsumerId(consumerInfo.getConsumerId());
94         
95         int count=0;
96         for (Iterator JavaDoc iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
97             
98             Map.Entry JavaDoc entry = (Entry) iter.next();
99             String JavaDoc id = (String JavaDoc) entry.getKey();
100             MessageId msgid = (MessageId) entry.getValue();
101             
102             if( ack.getFirstMessageId()==null )
103                 ack.setFirstMessageId(msgid);
104
105             iter.remove();
106             count++;
107
108             if( id.equals(messageId) ) {
109                 ack.setLastMessageId(msgid);
110                 break;
111             }
112             
113         }
114         
115         ack.setMessageCount(count);
116         return ack;
117     }
118
119     public String JavaDoc getAckMode() {
120         return ackMode;
121     }
122
123     public void setAckMode(String JavaDoc ackMode) {
124         this.ackMode = ackMode;
125     }
126
127     public String JavaDoc getSubscriptionId() {
128         return subscriptionId;
129     }
130
131     public void setDestination(ActiveMQDestination destination) {
132         this.destination = destination;
133     }
134
135     public ActiveMQDestination getDestination() {
136         return destination;
137     }
138
139     public ConsumerInfo getConsumerInfo() {
140         return consumerInfo;
141     }
142
143 }
144
Popular Tags