1 18 package org.apache.activemq.store.jdbc; 19 20 import java.io.IOException ; 21 import java.sql.SQLException ; 22 import java.util.Map ; 23 import java.util.concurrent.ConcurrentHashMap ; 24 import java.util.concurrent.atomic.AtomicLong ; 25 import org.apache.activemq.broker.ConnectionContext; 26 import org.apache.activemq.command.ActiveMQTopic; 27 import org.apache.activemq.command.Message; 28 import org.apache.activemq.command.MessageId; 29 import org.apache.activemq.command.SubscriptionInfo; 30 import org.apache.activemq.store.MessageRecoveryListener; 31 import org.apache.activemq.store.TopicMessageStore; 32 import org.apache.activemq.util.ByteSequence; 33 import org.apache.activemq.util.IOExceptionSupport; 34 import org.apache.activemq.wireformat.WireFormat; 35 36 37 40 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore { 41 42 private Map subscriberLastMessageMap=new ConcurrentHashMap (); 43 public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, 44 ActiveMQTopic topic) { 45 super(persistenceAdapter, adapter, wireFormat, topic); 46 } 47 48 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) 49 throws IOException { 50 long seq = messageId.getBrokerSequenceId(); 51 TransactionContext c = persistenceAdapter.getTransactionContext(context); 53 try { 54 adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq); 55 } catch (SQLException e) { 56 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 57 throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " 58 + messageId + " in container: " + e, e); 59 } finally { 60 c.close(); 61 } 62 } 63 64 68 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) 69 throws Exception { 70 71 TransactionContext c = persistenceAdapter.getTransactionContext(); 72 try { 73 adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, 74 new JDBCMessageRecoveryListener() { 75 public void recoverMessage(long sequenceId, byte[] data) throws Exception { 76 Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); 77 msg.getMessageId().setBrokerSequenceId(sequenceId); 78 listener.recoverMessage(msg); 79 } 80 public void recoverMessageReference(String reference) throws Exception { 81 listener.recoverMessageReference(new MessageId(reference)); 82 } 83 84 public void finished(){ 85 listener.finished(); 86 } 87 }); 88 } catch (SQLException e) { 89 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 90 throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e); 91 } finally { 92 c.close(); 93 } 94 } 95 96 public synchronized void recoverNextMessages(final String clientId,final String subscriptionName, 97 final int maxReturned,final MessageRecoveryListener listener) throws Exception { 98 TransactionContext c=persistenceAdapter.getTransactionContext(); 99 String subcriberId=getSubscriptionKey(clientId,subscriptionName); 100 AtomicLong last=(AtomicLong )subscriberLastMessageMap.get(subcriberId); 101 if(last==null){ 102 long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c,destination,clientId,subscriptionName); 103 last=new AtomicLong (lastAcked); 104 subscriberLastMessageMap.put(subcriberId,last); 105 } 106 final AtomicLong finalLast=last; 107 try{ 108 adapter.doRecoverNextMessages(c,destination,clientId,subscriptionName,last.get(),maxReturned, 109 new JDBCMessageRecoveryListener(){ 110 111 public void recoverMessage(long sequenceId,byte[] data) throws Exception { 112 if(listener.hasSpace()){ 113 Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data)); 114 msg.getMessageId().setBrokerSequenceId(sequenceId); 115 listener.recoverMessage(msg); 116 finalLast.set(sequenceId); 117 } 118 } 119 120 public void recoverMessageReference(String reference) throws Exception { 121 listener.recoverMessageReference(new MessageId(reference)); 122 } 123 124 public void finished(){ 125 listener.finished(); 126 } 127 }); 128 }catch(SQLException e){ 129 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 130 }finally{ 131 c.close(); 132 last.set(finalLast.get()); 133 } 134 } 135 136 public void resetBatching(String clientId,String subscriptionName) { 137 String subcriberId=getSubscriptionKey(clientId,subscriptionName); 138 subscriberLastMessageMap.remove(subcriberId); 139 } 140 141 145 public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) 146 throws IOException { 147 TransactionContext c = persistenceAdapter.getTransactionContext(); 148 try { 149 c = persistenceAdapter.getTransactionContext(); 150 adapter.doSetSubscriberEntry(c, destination, clientId, subscriptionName, selector, retroactive); 151 } catch (SQLException e) { 152 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 153 throw IOExceptionSupport 154 .create("Failed to lookup subscription for info: " + clientId + ". Reason: " + e, e); 155 } finally { 156 c.close(); 157 } 158 } 159 160 164 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 165 TransactionContext c = persistenceAdapter.getTransactionContext(); 166 try { 167 return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName); 168 } catch (SQLException e) { 169 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 170 throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e); 171 } finally { 172 c.close(); 173 } 174 } 175 176 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 177 TransactionContext c = persistenceAdapter.getTransactionContext(); 178 try { 179 adapter.doDeleteSubscription(c, destination, clientId, subscriptionName); 180 } catch (SQLException e) { 181 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 182 throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e); 183 } finally { 184 c.close(); 185 resetBatching(clientId,subscriptionName); 186 } 187 } 188 189 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 190 TransactionContext c = persistenceAdapter.getTransactionContext(); 191 try { 192 return adapter.doGetAllSubscriptions(c, destination); 193 } catch (SQLException e) { 194 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 195 throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e); 196 } finally { 197 c.close(); 198 } 199 } 200 201 202 203 204 205 public int getMessageCount(String clientId,String subscriberName) throws IOException { 206 int result = 0; 207 TransactionContext c = persistenceAdapter.getTransactionContext(); 208 try { 209 result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName); 210 211 } catch (SQLException e) { 212 JDBCPersistenceAdapter.log("JDBC Failure: ",e); 213 throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e); 214 } finally { 215 c.close(); 216 } 217 return result; 218 } 219 220 protected String getSubscriptionKey(String clientId,String subscriberName){ 221 String result=clientId+":"; 222 result+=subscriberName!=null?subscriberName:"NOT_SET"; 223 return result; 224 } 225 226 227 228 229 230 } 231 | Popular Tags |