KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > jpa > JPAMessageStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.store.jpa;
19
20 import java.io.IOException JavaDoc;
21 import java.util.List JavaDoc;
22 import java.util.concurrent.atomic.AtomicLong JavaDoc;
23
24 import javax.persistence.EntityManager;
25 import javax.persistence.Query;
26
27 import org.apache.activemq.broker.ConnectionContext;
28 import org.apache.activemq.command.ActiveMQDestination;
29 import org.apache.activemq.command.Message;
30 import org.apache.activemq.command.MessageAck;
31 import org.apache.activemq.command.MessageId;
32 import org.apache.activemq.memory.UsageManager;
33 import org.apache.activemq.store.MessageRecoveryListener;
34 import org.apache.activemq.store.MessageStore;
35 import org.apache.activemq.store.jpa.model.StoredMessage;
36 import org.apache.activemq.util.ByteSequence;
37 import org.apache.activemq.util.IOExceptionSupport;
38 import org.apache.activemq.wireformat.WireFormat;
39
40 public class JPAMessageStore implements MessageStore {
41
42     protected final JPAPersistenceAdapter adapter;
43     protected final WireFormat wireFormat;
44     protected final ActiveMQDestination destination;
45     protected final String JavaDoc destinationName;
46     protected AtomicLong JavaDoc lastMessageId = new AtomicLong JavaDoc(-1);
47
48     public JPAMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
49         this.adapter = adapter;
50         this.destination = destination;
51         this.destinationName = destination.getQualifiedName();
52         this.wireFormat = this.adapter.getWireFormat();
53     }
54
55     public void addMessage(ConnectionContext context, Message message) throws IOException JavaDoc {
56         
57         EntityManager manager = adapter.beginEntityManager(context);
58         try {
59             
60             ByteSequence sequence = wireFormat.marshal(message);
61             sequence.compact();
62             
63             StoredMessage sm = new StoredMessage();
64             sm.setDestination(destinationName);
65             sm.setId(message.getMessageId().getBrokerSequenceId());
66             sm.setMessageId(message.getMessageId().toString());
67             sm.setExiration(message.getExpiration());
68             sm.setData(sequence.data);
69         
70             manager.persist(sm);
71             
72         } catch (Throwable JavaDoc e) {
73             adapter.rollbackEntityManager(context,manager);
74             throw IOExceptionSupport.create(e);
75         }
76         adapter.commitEntityManager(context,manager);
77     }
78
79     public ActiveMQDestination getDestination() {
80         return destination;
81     }
82
83     public Message getMessage(MessageId identity) throws IOException JavaDoc {
84         Message rc;
85         EntityManager manager = adapter.beginEntityManager(null);
86         try {
87             StoredMessage message=null;
88             if( identity.getBrokerSequenceId()!= 0 ) {
89                 message = manager.find(StoredMessage.class, identity.getBrokerSequenceId());
90             } else {
91                 Query query = manager.createQuery("select m from StoredMessage m where m.messageId=?1");
92                 query.setParameter(1, identity.toString());
93                 message = (StoredMessage) query.getSingleResult();
94             }
95             
96             rc = (Message) wireFormat.unmarshal(new ByteSequence(message.getData()));
97         } catch (Throwable JavaDoc e) {
98             adapter.rollbackEntityManager(null,manager);
99             throw IOExceptionSupport.create(e);
100         }
101         adapter.commitEntityManager(null,manager);
102         return rc;
103     }
104
105     public int getMessageCount() throws IOException JavaDoc {
106         Long JavaDoc rc;
107         EntityManager manager = adapter.beginEntityManager(null);
108         try {
109             Query query = manager.createQuery("select count(m) from StoredMessage m");
110             rc = (Long JavaDoc) query.getSingleResult();
111         } catch (Throwable JavaDoc e) {
112             adapter.rollbackEntityManager(null,manager);
113             throw IOExceptionSupport.create(e);
114         }
115         adapter.commitEntityManager(null,manager);
116         return rc.intValue();
117     }
118
119     @SuppressWarnings JavaDoc("unchecked")
120     public void recover(MessageRecoveryListener container) throws Exception JavaDoc {
121         EntityManager manager = adapter.beginEntityManager(null);
122         try {
123             Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 order by m.id asc");
124             query.setParameter(1, destinationName);
125             for (StoredMessage m : (List JavaDoc<StoredMessage>)query.getResultList()) {
126                 Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
127                 container.recoverMessage(message);
128             }
129         } catch (Throwable JavaDoc e) {
130             adapter.rollbackEntityManager(null,manager);
131             throw IOExceptionSupport.create(e);
132         }
133         adapter.commitEntityManager(null,manager);
134     }
135
136     @SuppressWarnings JavaDoc("unchecked")
137     public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception JavaDoc {
138         
139         EntityManager manager = adapter.beginEntityManager(null);
140         try {
141             
142             Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
143             query.setParameter(1, destinationName);
144             query.setParameter(2, lastMessageId.get());
145             query.setMaxResults(maxReturned);
146             int count = 0;
147             for (StoredMessage m : (List JavaDoc<StoredMessage>)query.getResultList()) {
148                 Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
149                 listener.recoverMessage(message);
150                 lastMessageId.set(m.getId());
151                 count++;
152                 if( count >= maxReturned ) {
153                     return;
154                 }
155             }
156
157         } catch (Throwable JavaDoc e) {
158             adapter.rollbackEntityManager(null,manager);
159             throw IOExceptionSupport.create(e);
160         }
161         adapter.commitEntityManager(null,manager);
162     }
163
164     public void removeAllMessages(ConnectionContext context) throws IOException JavaDoc {
165         EntityManager manager = adapter.beginEntityManager(context);
166         try {
167             Query query = manager.createQuery("delete from StoredMessage m where m.destination=?1");
168             query.setParameter(1, destinationName);
169             query.executeUpdate();
170         } catch (Throwable JavaDoc e) {
171             adapter.rollbackEntityManager(context,manager);
172             throw IOExceptionSupport.create(e);
173         }
174         adapter.commitEntityManager(context,manager);
175     }
176
177     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException JavaDoc {
178         EntityManager manager = adapter.beginEntityManager(context);
179         try {
180             Query query = manager.createQuery("delete from StoredMessage m where m.id=?1");
181             query.setParameter(1, ack.getLastMessageId().getBrokerSequenceId());
182             query.executeUpdate();
183         } catch (Throwable JavaDoc e) {
184             adapter.rollbackEntityManager(context,manager);
185             throw IOExceptionSupport.create(e);
186         }
187         adapter.commitEntityManager(context,manager);
188     }
189
190     public void resetBatching() {
191         lastMessageId.set(-1);
192     }
193
194     public void setUsageManager(UsageManager usageManager) {
195     }
196
197     public void start() throws Exception JavaDoc {
198     }
199
200     public void stop() throws Exception JavaDoc {
201     }
202
203 }
204
Popular Tags