1 18 package org.apache.activemq.store.jpa; 19 20 import java.io.IOException ; 21 import java.util.HashSet ; 22 import java.util.List ; 23 import java.util.Set ; 24 25 import javax.persistence.EntityManager; 26 import javax.persistence.Query; 27 28 import org.apache.activemq.command.ActiveMQDestination; 29 import org.apache.activemq.command.ActiveMQQueue; 30 import org.apache.activemq.command.ActiveMQTopic; 31 import org.apache.activemq.store.MessageStore; 32 import org.apache.activemq.store.ReferenceStore; 33 import org.apache.activemq.store.ReferenceStoreAdapter; 34 import org.apache.activemq.store.TopicMessageStore; 35 import org.apache.activemq.store.TopicReferenceStore; 36 import org.apache.activemq.util.IOExceptionSupport; 37 38 46 public class JPAReferenceStoreAdapter extends JPAPersistenceAdapter implements ReferenceStoreAdapter { 47 48 @Override 49 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 50 throw new RuntimeException ("Use createQueueReferenceStore instead."); 51 } 52 53 @Override 54 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 55 throw new RuntimeException ("Use createTopicReferenceStore instead."); 56 } 57 58 public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException { 59 JPAReferenceStore rc = new JPAReferenceStore(this, destination); 60 return rc; 61 } 62 63 public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException { 64 JPATopicReferenceStore rc = new JPATopicReferenceStore(this, destination); 65 return rc; 66 } 67 68 69 public void deleteAllMessages() throws IOException { 70 EntityManager manager = beginEntityManager(null); 71 try { 72 Query query = manager.createQuery("delete from StoredMessageReference m"); 73 query.executeUpdate(); 74 query = manager.createQuery("delete from StoredSubscription ss"); 75 query.executeUpdate(); 76 } catch (Throwable e) { 77 rollbackEntityManager(null,manager); 78 throw IOExceptionSupport.create(e); 79 } 80 commitEntityManager(null,manager); 81 } 82 83 public Set <ActiveMQDestination> getDestinations() { 84 HashSet <ActiveMQDestination> rc = new HashSet <ActiveMQDestination>(); 85 86 EntityManager manager = beginEntityManager(null); 87 try { 88 Query query = manager.createQuery("select distinct m.destination from StoredMessageReference m"); 89 for (String dest : (List <String >)query.getResultList()) { 90 rc.add(ActiveMQDestination.createDestination(dest,ActiveMQDestination.QUEUE_TYPE)); 91 } 92 } catch (RuntimeException e) { 93 rollbackEntityManager(null,manager); 94 throw e; 95 } 96 commitEntityManager(null,manager); 97 return rc; 98 } 99 100 public long getLastMessageBrokerSequenceId() throws IOException { 101 long rc=0; 102 EntityManager manager = beginEntityManager(null); 103 try { 104 Query query = manager.createQuery("select max(m.id) from StoredMessageReference m"); 105 Long t = (Long ) query.getSingleResult(); 106 if( t != null ) { 107 rc = t; 108 } 109 } catch (Throwable e) { 110 rollbackEntityManager(null,manager); 111 throw IOExceptionSupport.create(e); 112 } 113 commitEntityManager(null,manager); 114 return rc; 115 } 116 117 public Set <Integer > getReferenceFileIdsInUse() throws IOException { 118 HashSet <Integer > rc=null; 119 EntityManager manager = beginEntityManager(null); 120 try { 121 Query query = manager.createQuery("select distinct m.fileId from StoredMessageReference m"); 122 rc=new HashSet <Integer >((List <Integer >)query.getResultList()); 123 } catch (Throwable e) { 124 rollbackEntityManager(null,manager); 125 throw IOExceptionSupport.create(e); 126 } 127 commitEntityManager(null,manager); 128 return rc; 129 } 130 131 135 public boolean isStoreValid(){ 136 return true; 137 } 138 139 } 140 | Popular Tags |