1 18 package org.apache.activemq.store.jpa; 19 20 import java.io.IOException ; 21 import java.util.ArrayList ; 22 import java.util.List ; 23 import java.util.Map ; 24 import java.util.concurrent.ConcurrentHashMap ; 25 import java.util.concurrent.atomic.AtomicLong ; 26 27 import javax.persistence.EntityManager; 28 import javax.persistence.Query; 29 30 import org.apache.activemq.broker.ConnectionContext; 31 import org.apache.activemq.command.ActiveMQDestination; 32 import org.apache.activemq.command.MessageId; 33 import org.apache.activemq.command.SubscriptionInfo; 34 import org.apache.activemq.store.MessageRecoveryListener; 35 import org.apache.activemq.store.TopicReferenceStore; 36 import org.apache.activemq.store.jpa.model.StoredMessageReference; 37 import org.apache.activemq.store.jpa.model.StoredSubscription; 38 import org.apache.activemq.store.jpa.model.StoredSubscription.SubscriptionId; 39 import org.apache.activemq.util.IOExceptionSupport; 40 41 public class JPATopicReferenceStore extends JPAReferenceStore implements TopicReferenceStore { 42 private Map <SubscriptionId,AtomicLong > subscriberLastMessageMap=new ConcurrentHashMap <SubscriptionId,AtomicLong >(); 43 44 public JPATopicReferenceStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) { 45 super(adapter, destination); 46 } 47 48 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { 49 EntityManager manager = adapter.beginEntityManager(context); 50 try { 51 StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); 52 ss.setLastAckedId(messageId.getBrokerSequenceId()); 53 } catch (Throwable e) { 54 adapter.rollbackEntityManager(context,manager); 55 throw IOExceptionSupport.create(e); 56 } 57 adapter.commitEntityManager(context,manager); 58 } 59 60 public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException { 61 EntityManager manager = adapter.beginEntityManager(null); 62 try { 63 StoredSubscription ss = new StoredSubscription(); 64 ss.setClientId(clientId); 65 ss.setSubscriptionName(subscriptionName); 66 ss.setDestination(destinationName); 67 ss.setSelector(selector); 68 ss.setLastAckedId(-1); 69 70 if( !retroactive ) { 71 Query query = manager.createQuery("select max(m.id) from StoredMessageReference m"); 72 Long rc = (Long ) query.getSingleResult(); 73 if( rc != null ) { 74 ss.setLastAckedId(rc); 75 } 76 } 77 78 manager.persist(ss); 79 } catch (Throwable e) { 80 adapter.rollbackEntityManager(null,manager); 81 throw IOExceptionSupport.create(e); 82 } 83 adapter.commitEntityManager(null,manager); 84 } 85 86 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 87 EntityManager manager = adapter.beginEntityManager(null); 88 try { 89 StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); 90 manager.remove(ss); 91 } catch (Throwable e) { 92 adapter.rollbackEntityManager(null,manager); 93 throw IOExceptionSupport.create(e); 94 } 95 adapter.commitEntityManager(null,manager); 96 } 97 98 private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) { 99 Query query = manager.createQuery( 100 "select ss from StoredSubscription ss " + 101 "where ss.clientId=?1 " + 102 "and ss.subscriptionName=?2 " + 103 "and ss.destination=?3"); 104 query.setParameter(1, clientId); 105 query.setParameter(2, subscriptionName); 106 query.setParameter(3, destinationName); 107 List <StoredSubscription> resultList = query.getResultList(); 108 if( resultList.isEmpty() ) 109 return null; 110 return resultList.get(0); 111 } 112 113 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 114 SubscriptionInfo rc[]; 115 EntityManager manager = adapter.beginEntityManager(null); 116 try { 117 ArrayList <SubscriptionInfo> l = new ArrayList <SubscriptionInfo>(); 118 119 Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1"); 120 query.setParameter(1, destinationName); 121 for (StoredSubscription ss : (List <StoredSubscription>)query.getResultList()) { 122 SubscriptionInfo info = new SubscriptionInfo(); 123 info.setClientId(ss.getClientId()); 124 info.setDestination(destination); 125 info.setSelector(ss.getSelector()); 126 info.setSubcriptionName(ss.getSubscriptionName()); 127 l.add(info); 128 } 129 130 rc = new SubscriptionInfo[l.size()]; 131 l.toArray(rc); 132 } catch (Throwable e) { 133 adapter.rollbackEntityManager(null,manager); 134 throw IOExceptionSupport.create(e); 135 } 136 adapter.commitEntityManager(null,manager); 137 return rc; 138 } 139 140 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 141 Long rc; 142 EntityManager manager = adapter.beginEntityManager(null); 143 try { 144 Query query = manager.createQuery( 145 "select count(m) FROM StoredMessageReference m, StoredSubscription ss " + 146 "where ss.clientId=?1 " + 147 "and ss.subscriptionName=?2 " + 148 "and ss.destination=?3 " + 149 "and m.destination=ss.destination and m.id > ss.lastAckedId"); 150 query.setParameter(1, clientId); 151 query.setParameter(2, subscriptionName); 152 query.setParameter(3, destinationName); 153 rc = (Long ) query.getSingleResult(); 154 } catch (Throwable e) { 155 adapter.rollbackEntityManager(null,manager); 156 throw IOExceptionSupport.create(e); 157 } 158 adapter.commitEntityManager(null,manager); 159 return rc.intValue(); 160 } 161 162 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 163 SubscriptionInfo rc=null; 164 EntityManager manager = adapter.beginEntityManager(null); 165 try { 166 StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); 167 if( ss != null ) { 168 rc = new SubscriptionInfo(); 169 rc.setClientId(ss.getClientId()); 170 rc.setDestination(destination); 171 rc.setSelector(ss.getSelector()); 172 rc.setSubcriptionName(ss.getSubscriptionName()); 173 } 174 } catch (Throwable e) { 175 adapter.rollbackEntityManager(null,manager); 176 throw IOExceptionSupport.create(e); 177 } 178 adapter.commitEntityManager(null,manager); 179 return rc; 180 } 181 182 public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception { 183 EntityManager manager = adapter.beginEntityManager(null); 184 try { 185 SubscriptionId id = new SubscriptionId(); 186 id.setClientId(clientId); 187 id.setSubscriptionName(subscriptionName); 188 id.setDestination(destinationName); 189 190 AtomicLong last=subscriberLastMessageMap.get(id); 191 if(last==null){ 192 StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); 193 last=new AtomicLong (ss.getLastAckedId()); 194 subscriberLastMessageMap.put(id,last); 195 } 196 final AtomicLong lastMessageId=last; 197 198 Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc"); 199 query.setParameter(1, destinationName); 200 query.setParameter(2, lastMessageId.get()); 201 query.setMaxResults(maxReturned); 202 int count = 0; 203 for (StoredMessageReference m : (List <StoredMessageReference>)query.getResultList()) { 204 MessageId mid = new MessageId(m.getMessageId()); 205 mid.setBrokerSequenceId(m.getId()); 206 listener.recoverMessageReference(mid); 207 208 lastMessageId.set(m.getId()); 209 count++; 210 if( count >= maxReturned ) { 211 return; 212 } 213 } 214 } catch (Throwable e) { 215 adapter.rollbackEntityManager(null,manager); 216 throw IOExceptionSupport.create(e); 217 } 218 adapter.commitEntityManager(null,manager); 219 } 220 221 public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { 222 EntityManager manager = adapter.beginEntityManager(null); 223 try { 224 225 StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); 226 227 Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc"); 228 query.setParameter(1, destinationName); 229 query.setParameter(2, ss.getLastAckedId()); 230 for (StoredMessageReference m : (List <StoredMessageReference>)query.getResultList()) { 231 MessageId mid = new MessageId(m.getMessageId()); 232 mid.setBrokerSequenceId(m.getId()); 233 listener.recoverMessageReference(mid); 234 } 235 } catch (Throwable e) { 236 adapter.rollbackEntityManager(null,manager); 237 throw IOExceptionSupport.create(e); 238 } 239 adapter.commitEntityManager(null,manager); 240 } 241 242 public void resetBatching(String clientId, String subscriptionName) { 243 SubscriptionId id = new SubscriptionId(); 244 id.setClientId(clientId); 245 id.setSubscriptionName(subscriptionName); 246 id.setDestination(destinationName); 247 subscriberLastMessageMap.remove(id); 248 } 249 250 } 251 | Popular Tags |