KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > jpa > JPAReferenceStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.store.jpa;
19
20 import java.io.IOException JavaDoc;
21 import java.util.List JavaDoc;
22 import java.util.concurrent.atomic.AtomicLong JavaDoc;
23
24 import javax.persistence.EntityManager;
25 import javax.persistence.Query;
26
27 import org.apache.activemq.broker.ConnectionContext;
28 import org.apache.activemq.command.ActiveMQDestination;
29 import org.apache.activemq.command.Message;
30 import org.apache.activemq.command.MessageAck;
31 import org.apache.activemq.command.MessageId;
32 import org.apache.activemq.memory.UsageManager;
33 import org.apache.activemq.store.MessageRecoveryListener;
34 import org.apache.activemq.store.ReferenceStore;
35 import org.apache.activemq.store.jpa.model.StoredMessageReference;
36 import org.apache.activemq.util.IOExceptionSupport;
37 import org.apache.activemq.wireformat.WireFormat;
38
39 public class JPAReferenceStore implements ReferenceStore {
40     
41     protected final JPAPersistenceAdapter adapter;
42     protected final WireFormat wireFormat;
43     protected final ActiveMQDestination destination;
44     protected final String JavaDoc destinationName;
45     protected AtomicLong JavaDoc lastMessageId = new AtomicLong JavaDoc(-1);
46     
47     public JPAReferenceStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
48         this.adapter = adapter;
49         this.destination = destination;
50         this.destinationName = destination.getQualifiedName();
51         this.wireFormat = this.adapter.getWireFormat();
52     }
53     
54     public ActiveMQDestination getDestination() {
55         return destination;
56     }
57
58     public void addMessage(ConnectionContext context, Message message) throws IOException JavaDoc {
59         throw new RuntimeException JavaDoc("Use addMessageReference instead");
60     }
61     
62     public Message getMessage(MessageId identity) throws IOException JavaDoc {
63         throw new RuntimeException JavaDoc("Use addMessageReference instead");
64     }
65     
66     public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException JavaDoc {
67         EntityManager manager = adapter.beginEntityManager(context);
68         try {
69             
70             StoredMessageReference sm = new StoredMessageReference();
71             sm.setDestination(destinationName);
72             sm.setId(messageId.getBrokerSequenceId());
73             sm.setMessageId(messageId.toString());
74             sm.setExiration(data.getExpiration());
75             sm.setFileId(data.getFileId());
76             sm.setOffset(data.getOffset());
77         
78             manager.persist(sm);
79             
80         } catch (Throwable JavaDoc e) {
81             adapter.rollbackEntityManager(context,manager);
82             throw IOExceptionSupport.create(e);
83         }
84         adapter.commitEntityManager(context,manager);
85     }
86
87     public ReferenceData getMessageReference(MessageId identity) throws IOException JavaDoc {
88         ReferenceData rc=null;
89         EntityManager manager = adapter.beginEntityManager(null);
90         try {
91             StoredMessageReference message=null;
92             if( identity.getBrokerSequenceId()!= 0 ) {
93                 message = manager.find(StoredMessageReference.class, identity.getBrokerSequenceId());
94             } else {
95                 Query query = manager.createQuery("select m from StoredMessageReference m where m.messageId=?1");
96                 query.setParameter(1, identity.toString());
97                 message = (StoredMessageReference) query.getSingleResult();
98             }
99             if( message !=null ) {
100                 rc = new ReferenceData();
101                 rc.setExpiration(message.getExiration());
102                 rc.setFileId(message.getFileId());
103                 rc.setOffset(message.getOffset());
104             }
105         } catch (Throwable JavaDoc e) {
106             adapter.rollbackEntityManager(null,manager);
107             throw IOExceptionSupport.create(e);
108         }
109         adapter.commitEntityManager(null,manager);
110         return rc;
111     }
112
113     public int getMessageCount() throws IOException JavaDoc {
114         Long JavaDoc rc;
115         EntityManager manager = adapter.beginEntityManager(null);
116         try {
117             Query query = manager.createQuery("select count(m) from StoredMessageReference m");
118             rc = (Long JavaDoc) query.getSingleResult();
119         } catch (Throwable JavaDoc e) {
120             adapter.rollbackEntityManager(null,manager);
121             throw IOExceptionSupport.create(e);
122         }
123         adapter.commitEntityManager(null,manager);
124         return rc.intValue();
125     }
126
127     public void recover(MessageRecoveryListener container) throws Exception JavaDoc {
128         EntityManager manager = adapter.beginEntityManager(null);
129         try {
130             Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 order by m.id asc");
131             query.setParameter(1, destinationName);
132             for (StoredMessageReference m : (List JavaDoc<StoredMessageReference>)query.getResultList()) {
133                 MessageId id = new MessageId(m.getMessageId());
134                 id.setBrokerSequenceId(m.getId());
135                 container.recoverMessageReference(id);
136             }
137         } catch (Throwable JavaDoc e) {
138             adapter.rollbackEntityManager(null,manager);
139             throw IOExceptionSupport.create(e);
140         }
141         adapter.commitEntityManager(null,manager);
142     }
143
144     public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception JavaDoc {
145         
146         EntityManager manager = adapter.beginEntityManager(null);
147         try {
148             
149             Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
150             query.setParameter(1, destinationName);
151             query.setParameter(2, lastMessageId.get());
152             query.setMaxResults(maxReturned);
153             int count = 0;
154             for (StoredMessageReference m : (List JavaDoc<StoredMessageReference>)query.getResultList()) {
155                 MessageId id = new MessageId(m.getMessageId());
156                 id.setBrokerSequenceId(m.getId());
157                 listener.recoverMessageReference(id);
158                 lastMessageId.set(m.getId());
159                 count++;
160                 if( count >= maxReturned ) {
161                     return;
162                 }
163             }
164
165         } catch (Throwable JavaDoc e) {
166             adapter.rollbackEntityManager(null,manager);
167             throw IOExceptionSupport.create(e);
168         }
169         adapter.commitEntityManager(null,manager);
170     }
171
172     public void removeAllMessages(ConnectionContext context) throws IOException JavaDoc {
173         EntityManager manager = adapter.beginEntityManager(context);
174         try {
175             Query query = manager.createQuery("delete from StoredMessageReference m where m.destination=?1");
176             query.setParameter(1, destinationName);
177             query.executeUpdate();
178         } catch (Throwable JavaDoc e) {
179             adapter.rollbackEntityManager(context,manager);
180             throw IOExceptionSupport.create(e);
181         }
182         adapter.commitEntityManager(context,manager);
183     }
184
185     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException JavaDoc {
186         EntityManager manager = adapter.beginEntityManager(context);
187         try {
188             Query query = manager.createQuery("delete from StoredMessageReference m where m.id=?1");
189             query.setParameter(1, ack.getLastMessageId().getBrokerSequenceId());
190             query.executeUpdate();
191         } catch (Throwable JavaDoc e) {
192             adapter.rollbackEntityManager(context,manager);
193             throw IOExceptionSupport.create(e);
194         }
195         adapter.commitEntityManager(context,manager);
196     }
197
198     public void resetBatching() {
199         lastMessageId.set(-1);
200     }
201
202     public void setUsageManager(UsageManager usageManager) {
203     }
204
205     public void start() throws Exception JavaDoc {
206     }
207
208     public void stop() throws Exception JavaDoc {
209     }
210
211     public void setBatch(MessageId startAfter){
212     }
213
214     public boolean supportsExternalBatchControl(){
215         return false;
216     }
217 }
218
Popular Tags