1 18 package org.apache.activemq.store.jpa; 19 20 import java.io.IOException ; 21 import java.util.List ; 22 import java.util.concurrent.atomic.AtomicLong ; 23 24 import javax.persistence.EntityManager; 25 import javax.persistence.Query; 26 27 import org.apache.activemq.broker.ConnectionContext; 28 import org.apache.activemq.command.ActiveMQDestination; 29 import org.apache.activemq.command.Message; 30 import org.apache.activemq.command.MessageAck; 31 import org.apache.activemq.command.MessageId; 32 import org.apache.activemq.memory.UsageManager; 33 import org.apache.activemq.store.MessageRecoveryListener; 34 import org.apache.activemq.store.MessageStore; 35 import org.apache.activemq.store.jpa.model.StoredMessage; 36 import org.apache.activemq.util.ByteSequence; 37 import org.apache.activemq.util.IOExceptionSupport; 38 import org.apache.activemq.wireformat.WireFormat; 39 40 public class JPAMessageStore implements MessageStore { 41 42 protected final JPAPersistenceAdapter adapter; 43 protected final WireFormat wireFormat; 44 protected final ActiveMQDestination destination; 45 protected final String destinationName; 46 protected AtomicLong lastMessageId = new AtomicLong (-1); 47 48 public JPAMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) { 49 this.adapter = adapter; 50 this.destination = destination; 51 this.destinationName = destination.getQualifiedName(); 52 this.wireFormat = this.adapter.getWireFormat(); 53 } 54 55 public void addMessage(ConnectionContext context, Message message) throws IOException { 56 57 EntityManager manager = adapter.beginEntityManager(context); 58 try { 59 60 ByteSequence sequence = wireFormat.marshal(message); 61 sequence.compact(); 62 63 StoredMessage sm = new StoredMessage(); 64 sm.setDestination(destinationName); 65 sm.setId(message.getMessageId().getBrokerSequenceId()); 66 sm.setMessageId(message.getMessageId().toString()); 67 sm.setExiration(message.getExpiration()); 68 sm.setData(sequence.data); 69 70 manager.persist(sm); 71 72 } catch (Throwable e) { 73 adapter.rollbackEntityManager(context,manager); 74 throw IOExceptionSupport.create(e); 75 } 76 adapter.commitEntityManager(context,manager); 77 } 78 79 public ActiveMQDestination getDestination() { 80 return destination; 81 } 82 83 public Message getMessage(MessageId identity) throws IOException { 84 Message rc; 85 EntityManager manager = adapter.beginEntityManager(null); 86 try { 87 StoredMessage message=null; 88 if( identity.getBrokerSequenceId()!= 0 ) { 89 message = manager.find(StoredMessage.class, identity.getBrokerSequenceId()); 90 } else { 91 Query query = manager.createQuery("select m from StoredMessage m where m.messageId=?1"); 92 query.setParameter(1, identity.toString()); 93 message = (StoredMessage) query.getSingleResult(); 94 } 95 96 rc = (Message) wireFormat.unmarshal(new ByteSequence(message.getData())); 97 } catch (Throwable e) { 98 adapter.rollbackEntityManager(null,manager); 99 throw IOExceptionSupport.create(e); 100 } 101 adapter.commitEntityManager(null,manager); 102 return rc; 103 } 104 105 public int getMessageCount() throws IOException { 106 Long rc; 107 EntityManager manager = adapter.beginEntityManager(null); 108 try { 109 Query query = manager.createQuery("select count(m) from StoredMessage m"); 110 rc = (Long ) query.getSingleResult(); 111 } catch (Throwable e) { 112 adapter.rollbackEntityManager(null,manager); 113 throw IOExceptionSupport.create(e); 114 } 115 adapter.commitEntityManager(null,manager); 116 return rc.intValue(); 117 } 118 119 @SuppressWarnings ("unchecked") 120 public void recover(MessageRecoveryListener container) throws Exception { 121 EntityManager manager = adapter.beginEntityManager(null); 122 try { 123 Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 order by m.id asc"); 124 query.setParameter(1, destinationName); 125 for (StoredMessage m : (List <StoredMessage>)query.getResultList()) { 126 Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData())); 127 container.recoverMessage(message); 128 } 129 } catch (Throwable e) { 130 adapter.rollbackEntityManager(null,manager); 131 throw IOExceptionSupport.create(e); 132 } 133 adapter.commitEntityManager(null,manager); 134 } 135 136 @SuppressWarnings ("unchecked") 137 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { 138 139 EntityManager manager = adapter.beginEntityManager(null); 140 try { 141 142 Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc"); 143 query.setParameter(1, destinationName); 144 query.setParameter(2, lastMessageId.get()); 145 query.setMaxResults(maxReturned); 146 int count = 0; 147 for (StoredMessage m : (List <StoredMessage>)query.getResultList()) { 148 Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData())); 149 listener.recoverMessage(message); 150 lastMessageId.set(m.getId()); 151 count++; 152 if( count >= maxReturned ) { 153 return; 154 } 155 } 156 157 } catch (Throwable e) { 158 adapter.rollbackEntityManager(null,manager); 159 throw IOExceptionSupport.create(e); 160 } 161 adapter.commitEntityManager(null,manager); 162 } 163 164 public void removeAllMessages(ConnectionContext context) throws IOException { 165 EntityManager manager = adapter.beginEntityManager(context); 166 try { 167 Query query = manager.createQuery("delete from StoredMessage m where m.destination=?1"); 168 query.setParameter(1, destinationName); 169 query.executeUpdate(); 170 } catch (Throwable e) { 171 adapter.rollbackEntityManager(context,manager); 172 throw IOExceptionSupport.create(e); 173 } 174 adapter.commitEntityManager(context,manager); 175 } 176 177 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 178 EntityManager manager = adapter.beginEntityManager(context); 179 try { 180 Query query = manager.createQuery("delete from StoredMessage m where m.id=?1"); 181 query.setParameter(1, ack.getLastMessageId().getBrokerSequenceId()); 182 query.executeUpdate(); 183 } catch (Throwable e) { 184 adapter.rollbackEntityManager(context,manager); 185 throw IOExceptionSupport.create(e); 186 } 187 adapter.commitEntityManager(context,manager); 188 } 189 190 public void resetBatching() { 191 lastMessageId.set(-1); 192 } 193 194 public void setUsageManager(UsageManager usageManager) { 195 } 196 197 public void start() throws Exception { 198 } 199 200 public void stop() throws Exception { 201 } 202 203 } 204 | Popular Tags |