1 18 package org.apache.activemq.store.jpa; 19 20 import java.io.IOException ; 21 import java.util.ArrayList ; 22 import java.util.List ; 23 import java.util.Map ; 24 import java.util.concurrent.ConcurrentHashMap ; 25 import java.util.concurrent.atomic.AtomicLong ; 26 27 import javax.persistence.EntityManager; 28 import javax.persistence.Query; 29 30 import org.apache.activemq.broker.ConnectionContext; 31 import org.apache.activemq.command.ActiveMQDestination; 32 import org.apache.activemq.command.Message; 33 import org.apache.activemq.command.MessageId; 34 import org.apache.activemq.command.SubscriptionInfo; 35 import org.apache.activemq.store.MessageRecoveryListener; 36 import org.apache.activemq.store.TopicMessageStore; 37 import org.apache.activemq.store.jpa.model.StoredMessage; 38 import org.apache.activemq.store.jpa.model.StoredSubscription; 39 import org.apache.activemq.store.jpa.model.StoredSubscription.SubscriptionId; 40 import org.apache.activemq.util.ByteSequence; 41 import org.apache.activemq.util.IOExceptionSupport; 42 43 public class JPATopicMessageStore extends JPAMessageStore implements TopicMessageStore { 44 private Map <SubscriptionId,AtomicLong > subscriberLastMessageMap=new ConcurrentHashMap <SubscriptionId,AtomicLong >(); 45 46 public JPATopicMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) { 47 super(adapter, destination); 48 } 49 50 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { 51 EntityManager manager = adapter.beginEntityManager(context); 52 try { 53 StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); 54 ss.setLastAckedId(messageId.getBrokerSequenceId()); 55 } catch (Throwable e) { 56 adapter.rollbackEntityManager(context,manager); 57 throw IOExceptionSupport.create(e); 58 } 59 adapter.commitEntityManager(context,manager); 60 } 61 62 public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException { 63 EntityManager manager = adapter.beginEntityManager(null); 64 try { 65 StoredSubscription ss = new StoredSubscription(); 66 ss.setClientId(clientId); 67 ss.setSubscriptionName(subscriptionName); 68 ss.setDestination(destinationName); 69 ss.setSelector(selector); 70 ss.setLastAckedId(-1); 71 72 if( !retroactive ) { 73 Query query = manager.createQuery("select max(m.id) from StoredMessage m"); 74 Long rc = (Long ) query.getSingleResult(); 75 if( rc != null ) { 76 ss.setLastAckedId(rc); 77 } 78 } 79 80 manager.persist(ss); 81 } catch (Throwable e) { 82 adapter.rollbackEntityManager(null,manager); 83 throw IOExceptionSupport.create(e); 84 } 85 adapter.commitEntityManager(null,manager); 86 } 87 88 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 89 EntityManager manager = adapter.beginEntityManager(null); 90 try { 91 StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); 92 manager.remove(ss); 93 } catch (Throwable e) { 94 adapter.rollbackEntityManager(null,manager); 95 throw IOExceptionSupport.create(e); 96 } 97 adapter.commitEntityManager(null,manager); 98 } 99 100 private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) { 101 Query query = manager.createQuery( 102 "select ss from StoredSubscription ss " + 103 "where ss.clientId=?1 " + 104 "and ss.subscriptionName=?2 " + 105 "and ss.destination=?3"); 106 query.setParameter(1, clientId); 107 query.setParameter(2, subscriptionName); 108 query.setParameter(3, destinationName); 109 List <StoredSubscription> resultList = query.getResultList(); 110 if( resultList.isEmpty() ) 111 return null; 112 return resultList.get(0); 113 } 114 115 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 116 SubscriptionInfo rc[]; 117 EntityManager manager = adapter.beginEntityManager(null); 118 try { 119 ArrayList <SubscriptionInfo> l = new ArrayList <SubscriptionInfo>(); 120 121 Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1"); 122 query.setParameter(1, destinationName); 123 for (StoredSubscription ss : (List <StoredSubscription>)query.getResultList()) { 124 SubscriptionInfo info = new SubscriptionInfo(); 125 info.setClientId(ss.getClientId()); 126 info.setDestination(destination); 127 info.setSelector(ss.getSelector()); 128 info.setSubcriptionName(ss.getSubscriptionName()); 129 l.add(info); 130 } 131 132 rc = new SubscriptionInfo[l.size()]; 133 l.toArray(rc); 134 } catch (Throwable e) { 135 adapter.rollbackEntityManager(null,manager); 136 throw IOExceptionSupport.create(e); 137 } 138 adapter.commitEntityManager(null,manager); 139 return rc; 140 } 141 142 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 143 Long rc; 144 EntityManager manager = adapter.beginEntityManager(null); 145 try { 146 Query query = manager.createQuery( 147 "select count(m) FROM StoredMessage m, StoredSubscription ss " + 148 "where ss.clientId=?1 " + 149 "and ss.subscriptionName=?2 " + 150 "and ss.destination=?3 " + 151 "and m.destination=ss.destination and m.id > ss.lastAckedId"); 152 query.setParameter(1, clientId); 153 query.setParameter(2, subscriptionName); 154 query.setParameter(3, destinationName); 155 rc = (Long ) query.getSingleResult(); 156 } catch (Throwable e) { 157 adapter.rollbackEntityManager(null,manager); 158 throw IOExceptionSupport.create(e); 159 } 160 adapter.commitEntityManager(null,manager); 161 return rc.intValue(); 162 } 163 164 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 165 SubscriptionInfo rc=null; 166 EntityManager manager = adapter.beginEntityManager(null); 167 try { 168 StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); 169 if( ss != null ) { 170 rc = new SubscriptionInfo(); 171 rc.setClientId(ss.getClientId()); 172 rc.setDestination(destination); 173 rc.setSelector(ss.getSelector()); 174 rc.setSubcriptionName(ss.getSubscriptionName()); 175 } 176 } catch (Throwable e) { 177 adapter.rollbackEntityManager(null,manager); 178 throw IOExceptionSupport.create(e); 179 } 180 adapter.commitEntityManager(null,manager); 181 return rc; 182 } 183 184 public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception { 185 EntityManager manager = adapter.beginEntityManager(null); 186 try { 187 SubscriptionId id = new SubscriptionId(); 188 id.setClientId(clientId); 189 id.setSubscriptionName(subscriptionName); 190 id.setDestination(destinationName); 191 192 AtomicLong last=subscriberLastMessageMap.get(id); 193 if(last==null){ 194 StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); 195 last=new AtomicLong (ss.getLastAckedId()); 196 subscriberLastMessageMap.put(id,last); 197 } 198 final AtomicLong lastMessageId=last; 199 200 Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc"); 201 query.setParameter(1, destinationName); 202 query.setParameter(2, lastMessageId.get()); 203 query.setMaxResults(maxReturned); 204 int count = 0; 205 for (StoredMessage m : (List <StoredMessage>)query.getResultList()) { 206 Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData())); 207 listener.recoverMessage(message); 208 lastMessageId.set(m.getId()); 209 count++; 210 if( count >= maxReturned ) { 211 return; 212 } 213 } 214 } catch (Throwable e) { 215 adapter.rollbackEntityManager(null,manager); 216 throw IOExceptionSupport.create(e); 217 } 218 adapter.commitEntityManager(null,manager); 219 } 220 221 public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { 222 EntityManager manager = adapter.beginEntityManager(null); 223 try { 224 225 StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); 226 227 Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc"); 228 query.setParameter(1, destinationName); 229 query.setParameter(2, ss.getLastAckedId()); 230 for (StoredMessage m : (List <StoredMessage>)query.getResultList()) { 231 Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData())); 232 listener.recoverMessage(message); 233 } 234 } catch (Throwable e) { 235 adapter.rollbackEntityManager(null,manager); 236 throw IOExceptionSupport.create(e); 237 } 238 adapter.commitEntityManager(null,manager); 239 } 240 241 public void resetBatching(String clientId, String subscriptionName) { 242 SubscriptionId id = new SubscriptionId(); 243 id.setClientId(clientId); 244 id.setSubscriptionName(subscriptionName); 245 id.setDestination(destinationName); 246 247 subscriberLastMessageMap.remove(id); 248 } 249 250 } 251 | Popular Tags |