1 18 package org.apache.activemq.store.jpa; 19 20 import java.io.File ; 21 import java.io.IOException ; 22 import java.util.HashSet ; 23 import java.util.List ; 24 import java.util.Properties ; 25 import java.util.Set ; 26 27 import javax.persistence.EntityManager; 28 import javax.persistence.EntityManagerFactory; 29 import javax.persistence.Persistence; 30 import javax.persistence.Query; 31 32 import org.apache.activemq.broker.ConnectionContext; 33 import org.apache.activemq.command.ActiveMQDestination; 34 import org.apache.activemq.command.ActiveMQQueue; 35 import org.apache.activemq.command.ActiveMQTopic; 36 import org.apache.activemq.memory.UsageManager; 37 import org.apache.activemq.openwire.OpenWireFormatFactory; 38 import org.apache.activemq.store.MessageStore; 39 import org.apache.activemq.store.PersistenceAdapter; 40 import org.apache.activemq.store.TopicMessageStore; 41 import org.apache.activemq.store.TransactionStore; 42 import org.apache.activemq.store.memory.MemoryTransactionStore; 43 import org.apache.activemq.util.IOExceptionSupport; 44 import org.apache.activemq.wireformat.WireFormat; 45 import org.apache.commons.logging.Log; 46 import org.apache.commons.logging.LogFactory; 47 48 56 public class JPAPersistenceAdapter implements PersistenceAdapter { 57 58 private static final Log log = LogFactory.getLog(JPAPersistenceAdapter.class); 59 String entityManagerName = "activemq"; 60 Properties entityManagerProperties = System.getProperties(); 61 EntityManagerFactory entityManagerFactory; 62 private WireFormat wireFormat; 63 private MemoryTransactionStore transactionStore; 64 65 public void beginTransaction(ConnectionContext context) throws IOException { 66 if( context.getLongTermStoreContext()!=null ) 67 throw new IOException ("Transation already started."); 68 69 EntityManager manager = getEntityManagerFactory().createEntityManager(); 70 manager.getTransaction().begin(); 71 context.setLongTermStoreContext(manager); 72 } 73 74 public void commitTransaction(ConnectionContext context) throws IOException { 75 EntityManager manager = (EntityManager) context.getLongTermStoreContext(); 76 if( manager==null ) 77 throw new IOException ("Transation not started."); 78 context.setLongTermStoreContext(null); 79 manager.getTransaction().commit(); 80 manager.close(); 81 } 82 83 public void rollbackTransaction(ConnectionContext context) throws IOException { 84 EntityManager manager = (EntityManager) context.getLongTermStoreContext(); 85 if( manager==null ) 86 throw new IOException ("Transation not started."); 87 context.setLongTermStoreContext(null); 88 manager.getTransaction().rollback(); 89 manager.close(); 90 } 91 92 public EntityManager beginEntityManager(ConnectionContext context) { 93 if( context==null || context.getLongTermStoreContext()==null ) { 94 EntityManager manager = getEntityManagerFactory().createEntityManager(); 95 manager.getTransaction().begin(); 96 return manager; 97 } else { 98 return (EntityManager) context.getLongTermStoreContext(); 99 } 100 } 101 102 public void commitEntityManager(ConnectionContext context, EntityManager manager) { 103 if( context==null || context.getLongTermStoreContext()==null ) { 104 manager.getTransaction().commit(); 105 manager.close(); 106 } 107 } 108 109 public void rollbackEntityManager(ConnectionContext context, EntityManager manager) { 110 if( context==null || context.getLongTermStoreContext()==null ) { 111 manager.getTransaction().rollback(); 112 manager.close(); 113 } 114 } 115 116 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 117 MessageStore rc = new JPAMessageStore(this, destination); 118 if (transactionStore != null) { 119 rc = transactionStore.proxy(rc); 120 } 121 return rc; 122 } 123 124 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 125 TopicMessageStore rc = new JPATopicMessageStore(this, destination); 126 if (transactionStore != null) { 127 rc = transactionStore.proxy(rc); 128 } 129 return rc; 130 } 131 132 public TransactionStore createTransactionStore() throws IOException { 133 if (transactionStore == null) { 134 transactionStore = new MemoryTransactionStore(); 135 } 136 return this.transactionStore; 137 } 138 139 public void deleteAllMessages() throws IOException { 140 EntityManager manager = beginEntityManager(null); 141 try { 142 Query query = manager.createQuery("delete from StoredMessage m"); 143 query.executeUpdate(); 144 query = manager.createQuery("delete from StoredSubscription ss"); 145 query.executeUpdate(); 146 } catch (Throwable e) { 147 rollbackEntityManager(null,manager); 148 throw IOExceptionSupport.create(e); 149 } 150 commitEntityManager(null,manager); 151 } 152 153 public Set <ActiveMQDestination> getDestinations() { 154 HashSet <ActiveMQDestination> rc = new HashSet <ActiveMQDestination>(); 155 156 EntityManager manager = beginEntityManager(null); 157 try { 158 Query query = manager.createQuery("select distinct m.destination from StoredMessage m"); 159 for (String dest : (List <String >)query.getResultList()) { 160 rc.add(ActiveMQDestination.createDestination(dest,ActiveMQDestination.QUEUE_TYPE)); 161 } 162 } catch (RuntimeException e) { 163 rollbackEntityManager(null,manager); 164 throw e; 165 } 166 commitEntityManager(null,manager); 167 return rc; 168 } 169 170 public long getLastMessageBrokerSequenceId() throws IOException { 171 long rc=0; 172 EntityManager manager = beginEntityManager(null); 173 try { 174 Query query = manager.createQuery("select max(m.id) from StoredMessage m"); 175 Long t = (Long ) query.getSingleResult(); 176 if( t != null ) { 177 rc = t; 178 } 179 } catch (Throwable e) { 180 rollbackEntityManager(null,manager); 181 throw IOExceptionSupport.create(e); 182 } 183 commitEntityManager(null,manager); 184 return rc; 185 } 186 187 public boolean isUseExternalMessageReferences() { 188 return false; 189 } 190 191 public void setUsageManager(UsageManager usageManager) { 192 } 193 194 public void start() throws Exception { 195 } 196 197 public void stop() throws Exception { 198 if( entityManagerFactory !=null ) { 199 entityManagerFactory.close(); 200 } 201 } 202 203 public EntityManagerFactory getEntityManagerFactory() { 204 if( entityManagerFactory == null ) { 205 entityManagerFactory = createEntityManagerFactory(); 206 } 207 return entityManagerFactory; 208 } 209 protected EntityManagerFactory createEntityManagerFactory() { 210 return Persistence.createEntityManagerFactory(getEntityManagerName(), getEntityManagerProperties()); 211 } 212 213 public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) { 214 this.entityManagerFactory = entityManagerFactory; 215 } 216 217 public Properties getEntityManagerProperties() { 218 return entityManagerProperties; 219 } 220 public void setEntityManagerProperties( 221 Properties entityManagerProperties) { 222 this.entityManagerProperties = entityManagerProperties; 223 } 224 225 public String getEntityManagerName() { 226 return entityManagerName; 227 } 228 public void setEntityManagerName(String entityManager) { 229 this.entityManagerName = entityManager; 230 } 231 232 public WireFormat getWireFormat() { 233 if(wireFormat==null) { 234 wireFormat = createWireFormat(); 235 } 236 return wireFormat; 237 } 238 239 private WireFormat createWireFormat() { 240 OpenWireFormatFactory wff = new OpenWireFormatFactory(); 241 return wff.createWireFormat(); 242 } 243 244 public void setWireFormat(WireFormat wireFormat) { 245 this.wireFormat = wireFormat; 246 } 247 248 public void checkpoint(boolean sync) throws IOException { 249 } 250 251 public void setBrokerName(String brokerName){ 252 } 253 254 public void setDirectory(File dir){ 255 } 256 257 } 258 | Popular Tags |