1 18 package org.apache.activemq.store.jdbc; 19 20 import java.io.IOException ; 21 import java.sql.SQLException ; 22 import java.util.concurrent.atomic.AtomicLong ; 23 import org.apache.activemq.broker.ConnectionContext; 24 import org.apache.activemq.command.ActiveMQDestination; 25 import org.apache.activemq.command.Message; 26 import org.apache.activemq.command.MessageAck; 27 import org.apache.activemq.command.MessageId; 28 import org.apache.activemq.memory.UsageManager; 29 import org.apache.activemq.store.MessageRecoveryListener; 30 import org.apache.activemq.store.MessageStore; 31 import org.apache.activemq.util.ByteSequence; 32 import org.apache.activemq.util.ByteSequenceData; 33 import org.apache.activemq.util.IOExceptionSupport; 34 import org.apache.activemq.wireformat.WireFormat; 35 36 37 40 public class JDBCMessageStore implements MessageStore { 41 42 protected final WireFormat wireFormat; 43 protected final ActiveMQDestination destination; 44 protected final JDBCAdapter adapter; 45 protected final JDBCPersistenceAdapter persistenceAdapter; 46 protected AtomicLong lastMessageId = new AtomicLong (-1); 47 48 public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, 49 ActiveMQDestination destination) { 50 this.persistenceAdapter = persistenceAdapter; 51 this.adapter = adapter; 52 this.wireFormat = wireFormat; 53 this.destination = destination; 54 } 55 56 public void addMessage(ConnectionContext context, Message message) throws IOException { 57 58 byte data[]; 60 try { 61 ByteSequence packet = wireFormat.marshal(message); 62 data = ByteSequenceData.toByteArray(packet); 63 } catch (IOException e) { 64 throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: " 65 + e, e); 66 } 67 68 TransactionContext c = persistenceAdapter.getTransactionContext(context); 70 try { 71 adapter.doAddMessage(c, message.getMessageId(), destination, data, message.getExpiration()); 72 } catch (SQLException e) { 73 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 74 throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: " 75 + e, e); 76 } finally { 77 c.close(); 78 } 79 } 80 81 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { 82 TransactionContext c = persistenceAdapter.getTransactionContext(context); 84 try { 85 adapter.doAddMessageReference(c, messageId, destination, expirationTime, messageRef); 86 } catch (SQLException e) { 87 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 88 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " 89 + e, e); 90 } finally { 91 c.close(); 92 } 93 } 94 95 public Message getMessage(MessageId messageId) throws IOException { 96 97 long id = messageId.getBrokerSequenceId(); 98 99 TransactionContext c = persistenceAdapter.getTransactionContext(); 101 try { 102 byte data[] = adapter.doGetMessage(c, id); 103 if (data == null) 104 return null; 105 106 Message answer = (Message) wireFormat.unmarshal(new ByteSequence(data)); 107 return answer; 108 } catch (IOException e) { 109 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 110 } catch (SQLException e) { 111 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 112 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 113 } finally { 114 c.close(); 115 } 116 } 117 118 public String getMessageReference(MessageId messageId) throws IOException { 119 long id = messageId.getBrokerSequenceId(); 120 121 TransactionContext c = persistenceAdapter.getTransactionContext(); 123 try { 124 return adapter.doGetMessageReference(c, id); 125 } catch (IOException e) { 126 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 127 } catch (SQLException e) { 128 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 129 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 130 } finally { 131 c.close(); 132 } 133 } 134 135 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 136 long seq = ack.getLastMessageId().getBrokerSequenceId(); 137 138 TransactionContext c = persistenceAdapter.getTransactionContext(context); 140 try { 141 adapter.doRemoveMessage(c, seq); 142 } catch (SQLException e) { 143 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 144 throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e); 145 } finally { 146 c.close(); 147 } 148 } 149 150 public void recover(final MessageRecoveryListener listener) throws Exception { 151 152 TransactionContext c = persistenceAdapter.getTransactionContext(); 154 try { 155 c = persistenceAdapter.getTransactionContext(); 156 adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() { 157 public void recoverMessage(long sequenceId, byte[] data) throws Exception { 158 Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); 159 msg.getMessageId().setBrokerSequenceId(sequenceId); 160 listener.recoverMessage(msg); 161 } 162 public void recoverMessageReference(String reference) throws Exception { 163 listener.recoverMessageReference(new MessageId(reference)); 164 } 165 public void finished(){ 166 listener.finished(); 167 } 168 }); 169 } catch (SQLException e) { 170 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 171 throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e); 172 } finally { 173 c.close(); 174 } 175 } 176 177 public void start() { 178 } 179 180 public void stop() { 181 } 182 183 186 public void removeAllMessages(ConnectionContext context) throws IOException { 187 TransactionContext c = persistenceAdapter.getTransactionContext(context); 189 try { 190 adapter.doRemoveAllMessages(c, destination); 191 } catch (SQLException e) { 192 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 193 throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e); 194 } finally { 195 c.close(); 196 } 197 } 198 199 public ActiveMQDestination getDestination() { 200 return destination; 201 } 202 203 public void setUsageManager(UsageManager usageManager) { 204 } 206 207 208 public int getMessageCount() throws IOException { 209 int result = 0; 210 TransactionContext c = persistenceAdapter.getTransactionContext(); 211 try { 212 213 result = adapter.doGetMessageCount(c, destination); 214 215 } catch (SQLException e) { 216 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 217 throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e); 218 } finally { 219 c.close(); 220 } 221 return result; 222 } 223 224 230 public void recoverNextMessages(int maxReturned,final MessageRecoveryListener listener) throws Exception { 231 TransactionContext c=persistenceAdapter.getTransactionContext(); 232 233 try{ 234 adapter.doRecoverNextMessages(c,destination,lastMessageId.get(),maxReturned, 235 new JDBCMessageRecoveryListener(){ 236 237 public void recoverMessage(long sequenceId,byte[] data) throws Exception { 238 if(listener.hasSpace()){ 239 Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data)); 240 msg.getMessageId().setBrokerSequenceId(sequenceId); 241 listener.recoverMessage(msg); 242 lastMessageId.set(sequenceId); 243 } 244 } 245 246 public void recoverMessageReference(String reference) throws Exception { 247 if(listener.hasSpace()) { 248 listener.recoverMessageReference(new MessageId(reference)); 249 } 250 } 251 252 public void finished(){ 253 listener.finished(); 254 } 255 }); 256 }catch(SQLException e){ 257 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 258 }finally{ 259 c.close(); 260 } 261 262 } 263 264 268 public void resetBatching(){ 269 lastMessageId.set(-1); 270 271 } 272 273 } 274 | Popular Tags |