KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > jpa > JPATopicMessageStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.store.jpa;
19
20 import java.io.IOException JavaDoc;
21 import java.util.ArrayList JavaDoc;
22 import java.util.List JavaDoc;
23 import java.util.Map JavaDoc;
24 import java.util.concurrent.ConcurrentHashMap JavaDoc;
25 import java.util.concurrent.atomic.AtomicLong JavaDoc;
26
27 import javax.persistence.EntityManager;
28 import javax.persistence.Query;
29
30 import org.apache.activemq.broker.ConnectionContext;
31 import org.apache.activemq.command.ActiveMQDestination;
32 import org.apache.activemq.command.Message;
33 import org.apache.activemq.command.MessageId;
34 import org.apache.activemq.command.SubscriptionInfo;
35 import org.apache.activemq.store.MessageRecoveryListener;
36 import org.apache.activemq.store.TopicMessageStore;
37 import org.apache.activemq.store.jpa.model.StoredMessage;
38 import org.apache.activemq.store.jpa.model.StoredSubscription;
39 import org.apache.activemq.store.jpa.model.StoredSubscription.SubscriptionId;
40 import org.apache.activemq.util.ByteSequence;
41 import org.apache.activemq.util.IOExceptionSupport;
42
43 public class JPATopicMessageStore extends JPAMessageStore implements TopicMessageStore {
44     private Map JavaDoc<SubscriptionId,AtomicLong JavaDoc> subscriberLastMessageMap=new ConcurrentHashMap JavaDoc<SubscriptionId,AtomicLong JavaDoc>();
45
46     public JPATopicMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
47         super(adapter, destination);
48     }
49
50     public void acknowledge(ConnectionContext context, String JavaDoc clientId, String JavaDoc subscriptionName, MessageId messageId) throws IOException JavaDoc {
51         EntityManager manager = adapter.beginEntityManager(context);
52         try {
53             StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
54             ss.setLastAckedId(messageId.getBrokerSequenceId());
55         } catch (Throwable JavaDoc e) {
56             adapter.rollbackEntityManager(context,manager);
57             throw IOExceptionSupport.create(e);
58         }
59         adapter.commitEntityManager(context,manager);
60     }
61
62     public void addSubsciption(String JavaDoc clientId, String JavaDoc subscriptionName, String JavaDoc selector, boolean retroactive) throws IOException JavaDoc {
63         EntityManager manager = adapter.beginEntityManager(null);
64         try {
65             StoredSubscription ss = new StoredSubscription();
66             ss.setClientId(clientId);
67             ss.setSubscriptionName(subscriptionName);
68             ss.setDestination(destinationName);
69             ss.setSelector(selector);
70             ss.setLastAckedId(-1);
71             
72             if( !retroactive ) {
73                 Query query = manager.createQuery("select max(m.id) from StoredMessage m");
74                 Long JavaDoc rc = (Long JavaDoc) query.getSingleResult();
75                 if( rc != null ) {
76                     ss.setLastAckedId(rc);
77                 }
78             }
79             
80             manager.persist(ss);
81         } catch (Throwable JavaDoc e) {
82             adapter.rollbackEntityManager(null,manager);
83             throw IOExceptionSupport.create(e);
84         }
85         adapter.commitEntityManager(null,manager);
86     }
87
88     public void deleteSubscription(String JavaDoc clientId, String JavaDoc subscriptionName) throws IOException JavaDoc {
89         EntityManager manager = adapter.beginEntityManager(null);
90         try {
91             StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
92             manager.remove(ss);
93         } catch (Throwable JavaDoc e) {
94             adapter.rollbackEntityManager(null,manager);
95             throw IOExceptionSupport.create(e);
96         }
97         adapter.commitEntityManager(null,manager);
98     }
99
100     private StoredSubscription findStoredSubscription(EntityManager manager, String JavaDoc clientId, String JavaDoc subscriptionName) {
101         Query query = manager.createQuery(
102                 "select ss from StoredSubscription ss " +
103                 "where ss.clientId=?1 " +
104                 "and ss.subscriptionName=?2 " +
105                 "and ss.destination=?3");
106         query.setParameter(1, clientId);
107         query.setParameter(2, subscriptionName);
108         query.setParameter(3, destinationName);
109         List JavaDoc<StoredSubscription> resultList = query.getResultList();
110         if( resultList.isEmpty() )
111             return null;
112         return resultList.get(0);
113     }
114
115     public SubscriptionInfo[] getAllSubscriptions() throws IOException JavaDoc {
116         SubscriptionInfo rc[];
117         EntityManager manager = adapter.beginEntityManager(null);
118         try {
119             ArrayList JavaDoc<SubscriptionInfo> l = new ArrayList JavaDoc<SubscriptionInfo>();
120             
121             Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1");
122             query.setParameter(1, destinationName);
123             for (StoredSubscription ss : (List JavaDoc<StoredSubscription>)query.getResultList()) {
124                 SubscriptionInfo info = new SubscriptionInfo();
125                 info.setClientId(ss.getClientId());
126                 info.setDestination(destination);
127                 info.setSelector(ss.getSelector());
128                 info.setSubcriptionName(ss.getSubscriptionName());
129                 l.add(info);
130             }
131             
132             rc = new SubscriptionInfo[l.size()];
133             l.toArray(rc);
134         } catch (Throwable JavaDoc e) {
135             adapter.rollbackEntityManager(null,manager);
136             throw IOExceptionSupport.create(e);
137         }
138         adapter.commitEntityManager(null,manager);
139         return rc;
140     }
141
142     public int getMessageCount(String JavaDoc clientId, String JavaDoc subscriptionName) throws IOException JavaDoc {
143         Long JavaDoc rc;
144         EntityManager manager = adapter.beginEntityManager(null);
145         try {
146             Query query = manager.createQuery(
147                     "select count(m) FROM StoredMessage m, StoredSubscription ss " +
148                     "where ss.clientId=?1 " +
149                     "and ss.subscriptionName=?2 " +
150                     "and ss.destination=?3 " +
151                     "and m.destination=ss.destination and m.id > ss.lastAckedId");
152             query.setParameter(1, clientId);
153             query.setParameter(2, subscriptionName);
154             query.setParameter(3, destinationName);
155             rc = (Long JavaDoc) query.getSingleResult();
156         } catch (Throwable JavaDoc e) {
157             adapter.rollbackEntityManager(null,manager);
158             throw IOExceptionSupport.create(e);
159         }
160         adapter.commitEntityManager(null,manager);
161         return rc.intValue();
162     }
163
164     public SubscriptionInfo lookupSubscription(String JavaDoc clientId, String JavaDoc subscriptionName) throws IOException JavaDoc {
165         SubscriptionInfo rc=null;
166         EntityManager manager = adapter.beginEntityManager(null);
167         try {
168             StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
169             if( ss != null ) {
170                 rc = new SubscriptionInfo();
171                 rc.setClientId(ss.getClientId());
172                 rc.setDestination(destination);
173                 rc.setSelector(ss.getSelector());
174                 rc.setSubcriptionName(ss.getSubscriptionName());
175             }
176         } catch (Throwable JavaDoc e) {
177             adapter.rollbackEntityManager(null,manager);
178             throw IOExceptionSupport.create(e);
179         }
180         adapter.commitEntityManager(null,manager);
181         return rc;
182     }
183
184     public void recoverNextMessages(String JavaDoc clientId, String JavaDoc subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception JavaDoc {
185         EntityManager manager = adapter.beginEntityManager(null);
186         try {
187             SubscriptionId id = new SubscriptionId();
188             id.setClientId(clientId);
189             id.setSubscriptionName(subscriptionName);
190             id.setDestination(destinationName);
191     
192             AtomicLong JavaDoc last=subscriberLastMessageMap.get(id);
193             if(last==null){
194                 StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
195                 last=new AtomicLong JavaDoc(ss.getLastAckedId());
196                 subscriberLastMessageMap.put(id,last);
197             }
198             final AtomicLong JavaDoc lastMessageId=last;
199             
200             Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
201             query.setParameter(1, destinationName);
202             query.setParameter(2, lastMessageId.get());
203             query.setMaxResults(maxReturned);
204             int count = 0;
205             for (StoredMessage m : (List JavaDoc<StoredMessage>)query.getResultList()) {
206                 Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
207                 listener.recoverMessage(message);
208                 lastMessageId.set(m.getId());
209                 count++;
210                 if( count >= maxReturned ) {
211                     return;
212                 }
213             }
214         } catch (Throwable JavaDoc e) {
215             adapter.rollbackEntityManager(null,manager);
216             throw IOExceptionSupport.create(e);
217         }
218         adapter.commitEntityManager(null,manager);
219     }
220
221     public void recoverSubscription(String JavaDoc clientId, String JavaDoc subscriptionName, MessageRecoveryListener listener) throws Exception JavaDoc {
222         EntityManager manager = adapter.beginEntityManager(null);
223         try {
224     
225             StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
226             
227             Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
228             query.setParameter(1, destinationName);
229             query.setParameter(2, ss.getLastAckedId());
230             for (StoredMessage m : (List JavaDoc<StoredMessage>)query.getResultList()) {
231                 Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
232                 listener.recoverMessage(message);
233             }
234         } catch (Throwable JavaDoc e) {
235             adapter.rollbackEntityManager(null,manager);
236             throw IOExceptionSupport.create(e);
237         }
238         adapter.commitEntityManager(null,manager);
239     }
240
241     public void resetBatching(String JavaDoc clientId, String JavaDoc subscriptionName) {
242         SubscriptionId id = new SubscriptionId();
243         id.setClientId(clientId);
244         id.setSubscriptionName(subscriptionName);
245         id.setDestination(destinationName);
246
247         subscriberLastMessageMap.remove(id);
248     }
249
250 }
251
Popular Tags