KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > jpa > JPATopicReferenceStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.store.jpa;
19
20 import java.io.IOException JavaDoc;
21 import java.util.ArrayList JavaDoc;
22 import java.util.List JavaDoc;
23 import java.util.Map JavaDoc;
24 import java.util.concurrent.ConcurrentHashMap JavaDoc;
25 import java.util.concurrent.atomic.AtomicLong JavaDoc;
26
27 import javax.persistence.EntityManager;
28 import javax.persistence.Query;
29
30 import org.apache.activemq.broker.ConnectionContext;
31 import org.apache.activemq.command.ActiveMQDestination;
32 import org.apache.activemq.command.MessageId;
33 import org.apache.activemq.command.SubscriptionInfo;
34 import org.apache.activemq.store.MessageRecoveryListener;
35 import org.apache.activemq.store.TopicReferenceStore;
36 import org.apache.activemq.store.jpa.model.StoredMessageReference;
37 import org.apache.activemq.store.jpa.model.StoredSubscription;
38 import org.apache.activemq.store.jpa.model.StoredSubscription.SubscriptionId;
39 import org.apache.activemq.util.IOExceptionSupport;
40
41 public class JPATopicReferenceStore extends JPAReferenceStore implements TopicReferenceStore {
42     private Map JavaDoc<SubscriptionId,AtomicLong JavaDoc> subscriberLastMessageMap=new ConcurrentHashMap JavaDoc<SubscriptionId,AtomicLong JavaDoc>();
43
44     public JPATopicReferenceStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
45         super(adapter, destination);
46     }
47
48     public void acknowledge(ConnectionContext context, String JavaDoc clientId, String JavaDoc subscriptionName, MessageId messageId) throws IOException JavaDoc {
49         EntityManager manager = adapter.beginEntityManager(context);
50         try {
51             StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
52             ss.setLastAckedId(messageId.getBrokerSequenceId());
53         } catch (Throwable JavaDoc e) {
54             adapter.rollbackEntityManager(context,manager);
55             throw IOExceptionSupport.create(e);
56         }
57         adapter.commitEntityManager(context,manager);
58     }
59
60     public void addSubsciption(String JavaDoc clientId, String JavaDoc subscriptionName, String JavaDoc selector, boolean retroactive) throws IOException JavaDoc {
61         EntityManager manager = adapter.beginEntityManager(null);
62         try {
63             StoredSubscription ss = new StoredSubscription();
64             ss.setClientId(clientId);
65             ss.setSubscriptionName(subscriptionName);
66             ss.setDestination(destinationName);
67             ss.setSelector(selector);
68             ss.setLastAckedId(-1);
69             
70             if( !retroactive ) {
71                 Query query = manager.createQuery("select max(m.id) from StoredMessageReference m");
72                 Long JavaDoc rc = (Long JavaDoc) query.getSingleResult();
73                 if( rc != null ) {
74                     ss.setLastAckedId(rc);
75                 }
76             }
77             
78             manager.persist(ss);
79         } catch (Throwable JavaDoc e) {
80             adapter.rollbackEntityManager(null,manager);
81             throw IOExceptionSupport.create(e);
82         }
83         adapter.commitEntityManager(null,manager);
84     }
85
86     public void deleteSubscription(String JavaDoc clientId, String JavaDoc subscriptionName) throws IOException JavaDoc {
87         EntityManager manager = adapter.beginEntityManager(null);
88         try {
89             StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
90             manager.remove(ss);
91         } catch (Throwable JavaDoc e) {
92             adapter.rollbackEntityManager(null,manager);
93             throw IOExceptionSupport.create(e);
94         }
95         adapter.commitEntityManager(null,manager);
96     }
97
98     private StoredSubscription findStoredSubscription(EntityManager manager, String JavaDoc clientId, String JavaDoc subscriptionName) {
99         Query query = manager.createQuery(
100                 "select ss from StoredSubscription ss " +
101                 "where ss.clientId=?1 " +
102                 "and ss.subscriptionName=?2 " +
103                 "and ss.destination=?3");
104         query.setParameter(1, clientId);
105         query.setParameter(2, subscriptionName);
106         query.setParameter(3, destinationName);
107         List JavaDoc<StoredSubscription> resultList = query.getResultList();
108         if( resultList.isEmpty() )
109             return null;
110         return resultList.get(0);
111     }
112
113     public SubscriptionInfo[] getAllSubscriptions() throws IOException JavaDoc {
114         SubscriptionInfo rc[];
115         EntityManager manager = adapter.beginEntityManager(null);
116         try {
117             ArrayList JavaDoc<SubscriptionInfo> l = new ArrayList JavaDoc<SubscriptionInfo>();
118             
119             Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1");
120             query.setParameter(1, destinationName);
121             for (StoredSubscription ss : (List JavaDoc<StoredSubscription>)query.getResultList()) {
122                 SubscriptionInfo info = new SubscriptionInfo();
123                 info.setClientId(ss.getClientId());
124                 info.setDestination(destination);
125                 info.setSelector(ss.getSelector());
126                 info.setSubcriptionName(ss.getSubscriptionName());
127                 l.add(info);
128             }
129             
130             rc = new SubscriptionInfo[l.size()];
131             l.toArray(rc);
132         } catch (Throwable JavaDoc e) {
133             adapter.rollbackEntityManager(null,manager);
134             throw IOExceptionSupport.create(e);
135         }
136         adapter.commitEntityManager(null,manager);
137         return rc;
138     }
139
140     public int getMessageCount(String JavaDoc clientId, String JavaDoc subscriptionName) throws IOException JavaDoc {
141         Long JavaDoc rc;
142         EntityManager manager = adapter.beginEntityManager(null);
143         try {
144             Query query = manager.createQuery(
145                     "select count(m) FROM StoredMessageReference m, StoredSubscription ss " +
146                     "where ss.clientId=?1 " +
147                     "and ss.subscriptionName=?2 " +
148                     "and ss.destination=?3 " +
149                     "and m.destination=ss.destination and m.id > ss.lastAckedId");
150             query.setParameter(1, clientId);
151             query.setParameter(2, subscriptionName);
152             query.setParameter(3, destinationName);
153             rc = (Long JavaDoc) query.getSingleResult();
154         } catch (Throwable JavaDoc e) {
155             adapter.rollbackEntityManager(null,manager);
156             throw IOExceptionSupport.create(e);
157         }
158         adapter.commitEntityManager(null,manager);
159         return rc.intValue();
160     }
161
162     public SubscriptionInfo lookupSubscription(String JavaDoc clientId, String JavaDoc subscriptionName) throws IOException JavaDoc {
163         SubscriptionInfo rc=null;
164         EntityManager manager = adapter.beginEntityManager(null);
165         try {
166             StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
167             if( ss != null ) {
168                 rc = new SubscriptionInfo();
169                 rc.setClientId(ss.getClientId());
170                 rc.setDestination(destination);
171                 rc.setSelector(ss.getSelector());
172                 rc.setSubcriptionName(ss.getSubscriptionName());
173             }
174         } catch (Throwable JavaDoc e) {
175             adapter.rollbackEntityManager(null,manager);
176             throw IOExceptionSupport.create(e);
177         }
178         adapter.commitEntityManager(null,manager);
179         return rc;
180     }
181
182     public void recoverNextMessages(String JavaDoc clientId, String JavaDoc subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception JavaDoc {
183         EntityManager manager = adapter.beginEntityManager(null);
184         try {
185             SubscriptionId id = new SubscriptionId();
186             id.setClientId(clientId);
187             id.setSubscriptionName(subscriptionName);
188             id.setDestination(destinationName);
189     
190             AtomicLong JavaDoc last=subscriberLastMessageMap.get(id);
191             if(last==null){
192                 StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
193                 last=new AtomicLong JavaDoc(ss.getLastAckedId());
194                 subscriberLastMessageMap.put(id,last);
195             }
196             final AtomicLong JavaDoc lastMessageId=last;
197             
198             Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
199             query.setParameter(1, destinationName);
200             query.setParameter(2, lastMessageId.get());
201             query.setMaxResults(maxReturned);
202             int count = 0;
203             for (StoredMessageReference m : (List JavaDoc<StoredMessageReference>)query.getResultList()) {
204                 MessageId mid = new MessageId(m.getMessageId());
205                 mid.setBrokerSequenceId(m.getId());
206                 listener.recoverMessageReference(mid);
207
208                 lastMessageId.set(m.getId());
209                 count++;
210                 if( count >= maxReturned ) {
211                     return;
212                 }
213             }
214         } catch (Throwable JavaDoc e) {
215             adapter.rollbackEntityManager(null,manager);
216             throw IOExceptionSupport.create(e);
217         }
218         adapter.commitEntityManager(null,manager);
219     }
220
221     public void recoverSubscription(String JavaDoc clientId, String JavaDoc subscriptionName, MessageRecoveryListener listener) throws Exception JavaDoc {
222         EntityManager manager = adapter.beginEntityManager(null);
223         try {
224     
225             StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
226             
227             Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
228             query.setParameter(1, destinationName);
229             query.setParameter(2, ss.getLastAckedId());
230             for (StoredMessageReference m : (List JavaDoc<StoredMessageReference>)query.getResultList()) {
231                 MessageId mid = new MessageId(m.getMessageId());
232                 mid.setBrokerSequenceId(m.getId());
233                 listener.recoverMessageReference(mid);
234             }
235         } catch (Throwable JavaDoc e) {
236             adapter.rollbackEntityManager(null,manager);
237             throw IOExceptionSupport.create(e);
238         }
239         adapter.commitEntityManager(null,manager);
240     }
241
242     public void resetBatching(String JavaDoc clientId, String JavaDoc subscriptionName) {
243         SubscriptionId id = new SubscriptionId();
244         id.setClientId(clientId);
245         id.setSubscriptionName(subscriptionName);
246         id.setDestination(destinationName);
247         subscriberLastMessageMap.remove(id);
248     }
249
250 }
251
Popular Tags