KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > jpa > JPAPersistenceAdapter


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.store.jpa;
19
20 import java.io.File JavaDoc;
21 import java.io.IOException JavaDoc;
22 import java.util.HashSet JavaDoc;
23 import java.util.List JavaDoc;
24 import java.util.Properties JavaDoc;
25 import java.util.Set JavaDoc;
26
27 import javax.persistence.EntityManager;
28 import javax.persistence.EntityManagerFactory;
29 import javax.persistence.Persistence;
30 import javax.persistence.Query;
31
32 import org.apache.activemq.broker.ConnectionContext;
33 import org.apache.activemq.command.ActiveMQDestination;
34 import org.apache.activemq.command.ActiveMQQueue;
35 import org.apache.activemq.command.ActiveMQTopic;
36 import org.apache.activemq.memory.UsageManager;
37 import org.apache.activemq.openwire.OpenWireFormatFactory;
38 import org.apache.activemq.store.MessageStore;
39 import org.apache.activemq.store.PersistenceAdapter;
40 import org.apache.activemq.store.TopicMessageStore;
41 import org.apache.activemq.store.TransactionStore;
42 import org.apache.activemq.store.memory.MemoryTransactionStore;
43 import org.apache.activemq.util.IOExceptionSupport;
44 import org.apache.activemq.wireformat.WireFormat;
45 import org.apache.commons.logging.Log;
46 import org.apache.commons.logging.LogFactory;
47
48 /**
49  * An implementation of {@link PersistenceAdapter} that uses JPA to
50  * store it's messages.
51  *
52  * @org.apache.xbean.XBean element="jpaPersistenceAdapter"
53  *
54  * @version $Revision: 1.17 $
55  */

56 public class JPAPersistenceAdapter implements PersistenceAdapter {
57
58     private static final Log log = LogFactory.getLog(JPAPersistenceAdapter.class);
59     String JavaDoc entityManagerName = "activemq";
60     Properties JavaDoc entityManagerProperties = System.getProperties();
61     EntityManagerFactory entityManagerFactory;
62     private WireFormat wireFormat;
63     private MemoryTransactionStore transactionStore;
64     
65     public void beginTransaction(ConnectionContext context) throws IOException JavaDoc {
66         if( context.getLongTermStoreContext()!=null )
67             throw new IOException JavaDoc("Transation already started.");
68         
69         EntityManager manager = getEntityManagerFactory().createEntityManager();
70         manager.getTransaction().begin();
71         context.setLongTermStoreContext(manager);
72     }
73
74     public void commitTransaction(ConnectionContext context) throws IOException JavaDoc {
75         EntityManager manager = (EntityManager) context.getLongTermStoreContext();
76         if( manager==null )
77             throw new IOException JavaDoc("Transation not started.");
78         context.setLongTermStoreContext(null);
79         manager.getTransaction().commit();
80         manager.close();
81     }
82     
83     public void rollbackTransaction(ConnectionContext context) throws IOException JavaDoc {
84         EntityManager manager = (EntityManager) context.getLongTermStoreContext();
85         if( manager==null )
86             throw new IOException JavaDoc("Transation not started.");
87         context.setLongTermStoreContext(null);
88         manager.getTransaction().rollback();
89         manager.close();
90     }
91     
92     public EntityManager beginEntityManager(ConnectionContext context) {
93         if( context==null || context.getLongTermStoreContext()==null ) {
94             EntityManager manager = getEntityManagerFactory().createEntityManager();
95             manager.getTransaction().begin();
96             return manager;
97         } else {
98             return (EntityManager) context.getLongTermStoreContext();
99         }
100     }
101     
102     public void commitEntityManager(ConnectionContext context, EntityManager manager) {
103         if( context==null || context.getLongTermStoreContext()==null ) {
104             manager.getTransaction().commit();
105             manager.close();
106         }
107     }
108     
109     public void rollbackEntityManager(ConnectionContext context, EntityManager manager) {
110         if( context==null || context.getLongTermStoreContext()==null ) {
111             manager.getTransaction().rollback();
112             manager.close();
113         }
114     }
115
116     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException JavaDoc {
117         MessageStore rc = new JPAMessageStore(this, destination);
118         if (transactionStore != null) {
119             rc = transactionStore.proxy(rc);
120         }
121         return rc;
122     }
123
124     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException JavaDoc {
125         TopicMessageStore rc = new JPATopicMessageStore(this, destination);
126         if (transactionStore != null) {
127             rc = transactionStore.proxy(rc);
128         }
129         return rc;
130     }
131
132     public TransactionStore createTransactionStore() throws IOException JavaDoc {
133         if (transactionStore == null) {
134             transactionStore = new MemoryTransactionStore();
135         }
136         return this.transactionStore;
137     }
138
139     public void deleteAllMessages() throws IOException JavaDoc {
140         EntityManager manager = beginEntityManager(null);
141         try {
142             Query query = manager.createQuery("delete from StoredMessage m");
143             query.executeUpdate();
144             query = manager.createQuery("delete from StoredSubscription ss");
145             query.executeUpdate();
146         } catch (Throwable JavaDoc e) {
147             rollbackEntityManager(null,manager);
148             throw IOExceptionSupport.create(e);
149         }
150         commitEntityManager(null,manager);
151     }
152
153     public Set JavaDoc<ActiveMQDestination> getDestinations() {
154         HashSet JavaDoc<ActiveMQDestination> rc = new HashSet JavaDoc<ActiveMQDestination>();
155         
156         EntityManager manager = beginEntityManager(null);
157         try {
158             Query query = manager.createQuery("select distinct m.destination from StoredMessage m");
159             for (String JavaDoc dest : (List JavaDoc<String JavaDoc>)query.getResultList()) {
160                 rc.add(ActiveMQDestination.createDestination(dest,ActiveMQDestination.QUEUE_TYPE));
161             }
162         } catch (RuntimeException JavaDoc e) {
163             rollbackEntityManager(null,manager);
164             throw e;
165         }
166         commitEntityManager(null,manager);
167         return rc;
168     }
169
170     public long getLastMessageBrokerSequenceId() throws IOException JavaDoc {
171         long rc=0;
172         EntityManager manager = beginEntityManager(null);
173         try {
174             Query query = manager.createQuery("select max(m.id) from StoredMessage m");
175             Long JavaDoc t = (Long JavaDoc) query.getSingleResult();
176             if( t != null ) {
177                 rc = t;
178             }
179         } catch (Throwable JavaDoc e) {
180             rollbackEntityManager(null,manager);
181             throw IOExceptionSupport.create(e);
182         }
183         commitEntityManager(null,manager);
184         return rc;
185     }
186
187     public boolean isUseExternalMessageReferences() {
188         return false;
189     }
190
191     public void setUsageManager(UsageManager usageManager) {
192     }
193
194     public void start() throws Exception JavaDoc {
195     }
196
197     public void stop() throws Exception JavaDoc {
198         if( entityManagerFactory !=null ) {
199             entityManagerFactory.close();
200         }
201     }
202
203     public EntityManagerFactory getEntityManagerFactory() {
204         if( entityManagerFactory == null ) {
205             entityManagerFactory = createEntityManagerFactory();
206         }
207         return entityManagerFactory;
208     }
209     protected EntityManagerFactory createEntityManagerFactory() {
210         return Persistence.createEntityManagerFactory(getEntityManagerName(), getEntityManagerProperties());
211     }
212
213     public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) {
214         this.entityManagerFactory = entityManagerFactory;
215     }
216
217     public Properties JavaDoc getEntityManagerProperties() {
218         return entityManagerProperties;
219     }
220     public void setEntityManagerProperties(
221             Properties JavaDoc entityManagerProperties) {
222         this.entityManagerProperties = entityManagerProperties;
223     }
224
225     public String JavaDoc getEntityManagerName() {
226         return entityManagerName;
227     }
228     public void setEntityManagerName(String JavaDoc entityManager) {
229         this.entityManagerName = entityManager;
230     }
231
232     public WireFormat getWireFormat() {
233         if(wireFormat==null) {
234             wireFormat = createWireFormat();
235         }
236         return wireFormat;
237     }
238
239     private WireFormat createWireFormat() {
240         OpenWireFormatFactory wff = new OpenWireFormatFactory();
241         return wff.createWireFormat();
242     }
243
244     public void setWireFormat(WireFormat wireFormat) {
245         this.wireFormat = wireFormat;
246     }
247
248     public void checkpoint(boolean sync) throws IOException JavaDoc{
249     }
250
251     public void setBrokerName(String JavaDoc brokerName){
252     }
253
254     public void setDirectory(File JavaDoc dir){
255     }
256
257 }
258
Popular Tags