KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > jdbc > JDBCMessageStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.store.jdbc;
19
20 import java.io.IOException JavaDoc;
21 import java.sql.SQLException JavaDoc;
22 import java.util.concurrent.atomic.AtomicLong JavaDoc;
23 import org.apache.activemq.broker.ConnectionContext;
24 import org.apache.activemq.command.ActiveMQDestination;
25 import org.apache.activemq.command.Message;
26 import org.apache.activemq.command.MessageAck;
27 import org.apache.activemq.command.MessageId;
28 import org.apache.activemq.memory.UsageManager;
29 import org.apache.activemq.store.MessageRecoveryListener;
30 import org.apache.activemq.store.MessageStore;
31 import org.apache.activemq.util.ByteSequence;
32 import org.apache.activemq.util.ByteSequenceData;
33 import org.apache.activemq.util.IOExceptionSupport;
34 import org.apache.activemq.wireformat.WireFormat;
35
36
37 /**
38  * @version $Revision: 1.10 $
39  */

40 public class JDBCMessageStore implements MessageStore {
41
42     protected final WireFormat wireFormat;
43     protected final ActiveMQDestination destination;
44     protected final JDBCAdapter adapter;
45     protected final JDBCPersistenceAdapter persistenceAdapter;
46     protected AtomicLong JavaDoc lastMessageId = new AtomicLong JavaDoc(-1);
47
48     public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat,
49             ActiveMQDestination destination) {
50         this.persistenceAdapter = persistenceAdapter;
51         this.adapter = adapter;
52         this.wireFormat = wireFormat;
53         this.destination = destination;
54     }
55
56     public void addMessage(ConnectionContext context, Message message) throws IOException JavaDoc {
57         
58         // Serialize the Message..
59
byte data[];
60         try {
61             ByteSequence packet = wireFormat.marshal(message);
62             data = ByteSequenceData.toByteArray(packet);
63         } catch (IOException JavaDoc e) {
64             throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: "
65                     + e, e);
66         }
67
68         // Get a connection and insert the message into the DB.
69
TransactionContext c = persistenceAdapter.getTransactionContext(context);
70         try {
71             adapter.doAddMessage(c, message.getMessageId(), destination, data, message.getExpiration());
72         } catch (SQLException JavaDoc e) {
73             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
74             throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: "
75                     + e, e);
76         } finally {
77             c.close();
78         }
79     }
80
81     public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String JavaDoc messageRef) throws IOException JavaDoc {
82         // Get a connection and insert the message into the DB.
83
TransactionContext c = persistenceAdapter.getTransactionContext(context);
84         try {
85             adapter.doAddMessageReference(c, messageId, destination, expirationTime, messageRef);
86         } catch (SQLException JavaDoc e) {
87             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
88             throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: "
89                     + e, e);
90         } finally {
91             c.close();
92         }
93     }
94
95     public Message getMessage(MessageId messageId) throws IOException JavaDoc {
96
97         long id = messageId.getBrokerSequenceId();
98         
99         // Get a connection and pull the message out of the DB
100
TransactionContext c = persistenceAdapter.getTransactionContext();
101         try {
102             byte data[] = adapter.doGetMessage(c, id);
103             if (data == null)
104                 return null;
105
106             Message answer = (Message) wireFormat.unmarshal(new ByteSequence(data));
107             return answer;
108         } catch (IOException JavaDoc e) {
109             throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
110         } catch (SQLException JavaDoc e) {
111             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
112             throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
113         } finally {
114             c.close();
115         }
116     }
117     
118     public String JavaDoc getMessageReference(MessageId messageId) throws IOException JavaDoc {
119         long id = messageId.getBrokerSequenceId();
120         
121         // Get a connection and pull the message out of the DB
122
TransactionContext c = persistenceAdapter.getTransactionContext();
123         try {
124             return adapter.doGetMessageReference(c, id);
125         } catch (IOException JavaDoc e) {
126             throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
127         } catch (SQLException JavaDoc e) {
128             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
129             throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
130         } finally {
131             c.close();
132         }
133     }
134
135     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException JavaDoc {
136         long seq = ack.getLastMessageId().getBrokerSequenceId();
137
138         // Get a connection and remove the message from the DB
139
TransactionContext c = persistenceAdapter.getTransactionContext(context);
140         try {
141             adapter.doRemoveMessage(c, seq);
142         } catch (SQLException JavaDoc e) {
143             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
144             throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
145         } finally {
146             c.close();
147         }
148     }
149
150     public void recover(final MessageRecoveryListener listener) throws Exception JavaDoc {
151
152         // Get all the Message ids out of the database.
153
TransactionContext c = persistenceAdapter.getTransactionContext();
154         try {
155             c = persistenceAdapter.getTransactionContext();
156             adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
157                 public void recoverMessage(long sequenceId, byte[] data) throws Exception JavaDoc {
158                     Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
159                     msg.getMessageId().setBrokerSequenceId(sequenceId);
160                     listener.recoverMessage(msg);
161                 }
162                 public void recoverMessageReference(String JavaDoc reference) throws Exception JavaDoc {
163                     listener.recoverMessageReference(new MessageId(reference));
164                 }
165                 public void finished(){
166                     listener.finished();
167                 }
168             });
169         } catch (SQLException JavaDoc e) {
170             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
171             throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
172         } finally {
173             c.close();
174         }
175     }
176
177     public void start() {
178     }
179
180     public void stop() {
181     }
182
183     /**
184      * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
185      */

186     public void removeAllMessages(ConnectionContext context) throws IOException JavaDoc {
187         // Get a connection and remove the message from the DB
188
TransactionContext c = persistenceAdapter.getTransactionContext(context);
189         try {
190             adapter.doRemoveAllMessages(c, destination);
191         } catch (SQLException JavaDoc e) {
192             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
193             throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e);
194         } finally {
195             c.close();
196         }
197     }
198     
199     public ActiveMQDestination getDestination() {
200         return destination;
201     }
202
203     public void setUsageManager(UsageManager usageManager) {
204         // we can ignore since we don't buffer up messages.
205
}
206
207   
208     public int getMessageCount() throws IOException JavaDoc{
209         int result = 0;
210         TransactionContext c = persistenceAdapter.getTransactionContext();
211         try {
212             
213             result = adapter.doGetMessageCount(c, destination);
214                
215         } catch (SQLException JavaDoc e) {
216             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
217             throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
218         } finally {
219             c.close();
220         }
221         return result;
222     }
223
224     /**
225      * @param maxReturned
226      * @param listener
227      * @throws Exception
228      * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int, org.apache.activemq.store.MessageRecoveryListener)
229      */

230     public void recoverNextMessages(int maxReturned,final MessageRecoveryListener listener) throws Exception JavaDoc{
231         TransactionContext c=persistenceAdapter.getTransactionContext();
232         
233         try{
234             adapter.doRecoverNextMessages(c,destination,lastMessageId.get(),maxReturned,
235                     new JDBCMessageRecoveryListener(){
236
237                         public void recoverMessage(long sequenceId,byte[] data) throws Exception JavaDoc{
238                             if(listener.hasSpace()){
239                                 Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
240                                 msg.getMessageId().setBrokerSequenceId(sequenceId);
241                                 listener.recoverMessage(msg);
242                                 lastMessageId.set(sequenceId);
243                             }
244                         }
245
246                         public void recoverMessageReference(String JavaDoc reference) throws Exception JavaDoc{
247                             if(listener.hasSpace()) {
248                                 listener.recoverMessageReference(new MessageId(reference));
249                             }
250                         }
251
252                         public void finished(){
253                             listener.finished();
254                         }
255                     });
256         }catch(SQLException JavaDoc e){
257             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
258         }finally{
259             c.close();
260         }
261         
262     }
263
264     /**
265      *
266      * @see org.apache.activemq.store.MessageStore#resetBatching()
267      */

268     public void resetBatching(){
269         lastMessageId.set(-1);
270         
271     }
272
273 }
274
Popular Tags