KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > jdbc > JDBCTopicMessageStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.store.jdbc;
19
20 import java.io.IOException JavaDoc;
21 import java.sql.SQLException JavaDoc;
22 import java.util.Map JavaDoc;
23 import java.util.concurrent.ConcurrentHashMap JavaDoc;
24 import java.util.concurrent.atomic.AtomicLong JavaDoc;
25 import org.apache.activemq.broker.ConnectionContext;
26 import org.apache.activemq.command.ActiveMQTopic;
27 import org.apache.activemq.command.Message;
28 import org.apache.activemq.command.MessageId;
29 import org.apache.activemq.command.SubscriptionInfo;
30 import org.apache.activemq.store.MessageRecoveryListener;
31 import org.apache.activemq.store.TopicMessageStore;
32 import org.apache.activemq.util.ByteSequence;
33 import org.apache.activemq.util.IOExceptionSupport;
34 import org.apache.activemq.wireformat.WireFormat;
35
36
37 /**
38  * @version $Revision: 1.6 $
39  */

40 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
41
42     private Map JavaDoc subscriberLastMessageMap=new ConcurrentHashMap JavaDoc();
43     public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat,
44             ActiveMQTopic topic) {
45         super(persistenceAdapter, adapter, wireFormat, topic);
46     }
47
48     public void acknowledge(ConnectionContext context, String JavaDoc clientId, String JavaDoc subscriptionName, MessageId messageId)
49             throws IOException JavaDoc {
50         long seq = messageId.getBrokerSequenceId();
51         // Get a connection and insert the message into the DB.
52
TransactionContext c = persistenceAdapter.getTransactionContext(context);
53         try {
54             adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq);
55         } catch (SQLException JavaDoc e) {
56             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
57             throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message "
58                     + messageId + " in container: " + e, e);
59         } finally {
60             c.close();
61         }
62     }
63
64     /**
65      * @throws Exception
66      *
67      */

68     public void recoverSubscription(String JavaDoc clientId, String JavaDoc subscriptionName, final MessageRecoveryListener listener)
69             throws Exception JavaDoc {
70
71         TransactionContext c = persistenceAdapter.getTransactionContext();
72         try {
73             adapter.doRecoverSubscription(c, destination, clientId, subscriptionName,
74                     new JDBCMessageRecoveryListener() {
75                         public void recoverMessage(long sequenceId, byte[] data) throws Exception JavaDoc {
76                             Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
77                             msg.getMessageId().setBrokerSequenceId(sequenceId);
78                             listener.recoverMessage(msg);
79                         }
80                         public void recoverMessageReference(String JavaDoc reference) throws Exception JavaDoc {
81                             listener.recoverMessageReference(new MessageId(reference));
82                         }
83                         
84                         public void finished(){
85                             listener.finished();
86                         }
87                     });
88         } catch (SQLException JavaDoc e) {
89             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
90             throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
91         } finally {
92             c.close();
93         }
94     }
95
96     public synchronized void recoverNextMessages(final String JavaDoc clientId,final String JavaDoc subscriptionName,
97             final int maxReturned,final MessageRecoveryListener listener) throws Exception JavaDoc{
98         TransactionContext c=persistenceAdapter.getTransactionContext();
99         String JavaDoc subcriberId=getSubscriptionKey(clientId,subscriptionName);
100         AtomicLong JavaDoc last=(AtomicLong JavaDoc)subscriberLastMessageMap.get(subcriberId);
101         if(last==null){
102             long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c,destination,clientId,subscriptionName);
103             last=new AtomicLong JavaDoc(lastAcked);
104             subscriberLastMessageMap.put(subcriberId,last);
105         }
106         final AtomicLong JavaDoc finalLast=last;
107         try{
108             adapter.doRecoverNextMessages(c,destination,clientId,subscriptionName,last.get(),maxReturned,
109                     new JDBCMessageRecoveryListener(){
110
111                         public void recoverMessage(long sequenceId,byte[] data) throws Exception JavaDoc{
112                             if(listener.hasSpace()){
113                                 Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
114                                 msg.getMessageId().setBrokerSequenceId(sequenceId);
115                                 listener.recoverMessage(msg);
116                                 finalLast.set(sequenceId);
117                             }
118                         }
119
120                         public void recoverMessageReference(String JavaDoc reference) throws Exception JavaDoc{
121                             listener.recoverMessageReference(new MessageId(reference));
122                         }
123
124                         public void finished(){
125                             listener.finished();
126                         }
127                     });
128         }catch(SQLException JavaDoc e){
129             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
130         }finally{
131             c.close();
132             last.set(finalLast.get());
133         }
134     }
135     
136     public void resetBatching(String JavaDoc clientId,String JavaDoc subscriptionName) {
137         String JavaDoc subcriberId=getSubscriptionKey(clientId,subscriptionName);
138         subscriberLastMessageMap.remove(subcriberId);
139     }
140     
141     /**
142      * @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
143      * boolean)
144      */

145     public void addSubsciption(String JavaDoc clientId, String JavaDoc subscriptionName, String JavaDoc selector, boolean retroactive)
146             throws IOException JavaDoc {
147         TransactionContext c = persistenceAdapter.getTransactionContext();
148         try {
149             c = persistenceAdapter.getTransactionContext();
150             adapter.doSetSubscriberEntry(c, destination, clientId, subscriptionName, selector, retroactive);
151         } catch (SQLException JavaDoc e) {
152             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
153             throw IOExceptionSupport
154                     .create("Failed to lookup subscription for info: " + clientId + ". Reason: " + e, e);
155         } finally {
156             c.close();
157         }
158     }
159
160     /**
161      * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
162      * String)
163      */

164     public SubscriptionInfo lookupSubscription(String JavaDoc clientId, String JavaDoc subscriptionName) throws IOException JavaDoc {
165         TransactionContext c = persistenceAdapter.getTransactionContext();
166         try {
167             return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
168         } catch (SQLException JavaDoc e) {
169             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
170             throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
171         } finally {
172             c.close();
173         }
174     }
175
176     public void deleteSubscription(String JavaDoc clientId, String JavaDoc subscriptionName) throws IOException JavaDoc {
177         TransactionContext c = persistenceAdapter.getTransactionContext();
178         try {
179             adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
180         } catch (SQLException JavaDoc e) {
181             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
182             throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
183         } finally {
184             c.close();
185             resetBatching(clientId,subscriptionName);
186         }
187     }
188
189     public SubscriptionInfo[] getAllSubscriptions() throws IOException JavaDoc {
190         TransactionContext c = persistenceAdapter.getTransactionContext();
191         try {
192             return adapter.doGetAllSubscriptions(c, destination);
193         } catch (SQLException JavaDoc e) {
194             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
195             throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
196         } finally {
197             c.close();
198         }
199     }
200
201     
202     
203     
204
205     public int getMessageCount(String JavaDoc clientId,String JavaDoc subscriberName) throws IOException JavaDoc{
206         int result = 0;
207         TransactionContext c = persistenceAdapter.getTransactionContext();
208         try {
209             result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName);
210                
211         } catch (SQLException JavaDoc e) {
212             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
213             throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
214         } finally {
215             c.close();
216         }
217         return result;
218     }
219     
220     protected String JavaDoc getSubscriptionKey(String JavaDoc clientId,String JavaDoc subscriberName){
221         String JavaDoc result=clientId+":";
222         result+=subscriberName!=null?subscriberName:"NOT_SET";
223         return result;
224     }
225
226     
227
228     
229
230 }
231
Popular Tags