1 18 package org.apache.activemq.store.jpa; 19 20 import java.io.IOException ; 21 import java.util.List ; 22 import java.util.concurrent.atomic.AtomicLong ; 23 24 import javax.persistence.EntityManager; 25 import javax.persistence.Query; 26 27 import org.apache.activemq.broker.ConnectionContext; 28 import org.apache.activemq.command.ActiveMQDestination; 29 import org.apache.activemq.command.Message; 30 import org.apache.activemq.command.MessageAck; 31 import org.apache.activemq.command.MessageId; 32 import org.apache.activemq.memory.UsageManager; 33 import org.apache.activemq.store.MessageRecoveryListener; 34 import org.apache.activemq.store.ReferenceStore; 35 import org.apache.activemq.store.jpa.model.StoredMessageReference; 36 import org.apache.activemq.util.IOExceptionSupport; 37 import org.apache.activemq.wireformat.WireFormat; 38 39 public class JPAReferenceStore implements ReferenceStore { 40 41 protected final JPAPersistenceAdapter adapter; 42 protected final WireFormat wireFormat; 43 protected final ActiveMQDestination destination; 44 protected final String destinationName; 45 protected AtomicLong lastMessageId = new AtomicLong (-1); 46 47 public JPAReferenceStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) { 48 this.adapter = adapter; 49 this.destination = destination; 50 this.destinationName = destination.getQualifiedName(); 51 this.wireFormat = this.adapter.getWireFormat(); 52 } 53 54 public ActiveMQDestination getDestination() { 55 return destination; 56 } 57 58 public void addMessage(ConnectionContext context, Message message) throws IOException { 59 throw new RuntimeException ("Use addMessageReference instead"); 60 } 61 62 public Message getMessage(MessageId identity) throws IOException { 63 throw new RuntimeException ("Use addMessageReference instead"); 64 } 65 66 public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException { 67 EntityManager manager = adapter.beginEntityManager(context); 68 try { 69 70 StoredMessageReference sm = new StoredMessageReference(); 71 sm.setDestination(destinationName); 72 sm.setId(messageId.getBrokerSequenceId()); 73 sm.setMessageId(messageId.toString()); 74 sm.setExiration(data.getExpiration()); 75 sm.setFileId(data.getFileId()); 76 sm.setOffset(data.getOffset()); 77 78 manager.persist(sm); 79 80 } catch (Throwable e) { 81 adapter.rollbackEntityManager(context,manager); 82 throw IOExceptionSupport.create(e); 83 } 84 adapter.commitEntityManager(context,manager); 85 } 86 87 public ReferenceData getMessageReference(MessageId identity) throws IOException { 88 ReferenceData rc=null; 89 EntityManager manager = adapter.beginEntityManager(null); 90 try { 91 StoredMessageReference message=null; 92 if( identity.getBrokerSequenceId()!= 0 ) { 93 message = manager.find(StoredMessageReference.class, identity.getBrokerSequenceId()); 94 } else { 95 Query query = manager.createQuery("select m from StoredMessageReference m where m.messageId=?1"); 96 query.setParameter(1, identity.toString()); 97 message = (StoredMessageReference) query.getSingleResult(); 98 } 99 if( message !=null ) { 100 rc = new ReferenceData(); 101 rc.setExpiration(message.getExiration()); 102 rc.setFileId(message.getFileId()); 103 rc.setOffset(message.getOffset()); 104 } 105 } catch (Throwable e) { 106 adapter.rollbackEntityManager(null,manager); 107 throw IOExceptionSupport.create(e); 108 } 109 adapter.commitEntityManager(null,manager); 110 return rc; 111 } 112 113 public int getMessageCount() throws IOException { 114 Long rc; 115 EntityManager manager = adapter.beginEntityManager(null); 116 try { 117 Query query = manager.createQuery("select count(m) from StoredMessageReference m"); 118 rc = (Long ) query.getSingleResult(); 119 } catch (Throwable e) { 120 adapter.rollbackEntityManager(null,manager); 121 throw IOExceptionSupport.create(e); 122 } 123 adapter.commitEntityManager(null,manager); 124 return rc.intValue(); 125 } 126 127 public void recover(MessageRecoveryListener container) throws Exception { 128 EntityManager manager = adapter.beginEntityManager(null); 129 try { 130 Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 order by m.id asc"); 131 query.setParameter(1, destinationName); 132 for (StoredMessageReference m : (List <StoredMessageReference>)query.getResultList()) { 133 MessageId id = new MessageId(m.getMessageId()); 134 id.setBrokerSequenceId(m.getId()); 135 container.recoverMessageReference(id); 136 } 137 } catch (Throwable e) { 138 adapter.rollbackEntityManager(null,manager); 139 throw IOExceptionSupport.create(e); 140 } 141 adapter.commitEntityManager(null,manager); 142 } 143 144 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { 145 146 EntityManager manager = adapter.beginEntityManager(null); 147 try { 148 149 Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc"); 150 query.setParameter(1, destinationName); 151 query.setParameter(2, lastMessageId.get()); 152 query.setMaxResults(maxReturned); 153 int count = 0; 154 for (StoredMessageReference m : (List <StoredMessageReference>)query.getResultList()) { 155 MessageId id = new MessageId(m.getMessageId()); 156 id.setBrokerSequenceId(m.getId()); 157 listener.recoverMessageReference(id); 158 lastMessageId.set(m.getId()); 159 count++; 160 if( count >= maxReturned ) { 161 return; 162 } 163 } 164 165 } catch (Throwable e) { 166 adapter.rollbackEntityManager(null,manager); 167 throw IOExceptionSupport.create(e); 168 } 169 adapter.commitEntityManager(null,manager); 170 } 171 172 public void removeAllMessages(ConnectionContext context) throws IOException { 173 EntityManager manager = adapter.beginEntityManager(context); 174 try { 175 Query query = manager.createQuery("delete from StoredMessageReference m where m.destination=?1"); 176 query.setParameter(1, destinationName); 177 query.executeUpdate(); 178 } catch (Throwable e) { 179 adapter.rollbackEntityManager(context,manager); 180 throw IOExceptionSupport.create(e); 181 } 182 adapter.commitEntityManager(context,manager); 183 } 184 185 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 186 EntityManager manager = adapter.beginEntityManager(context); 187 try { 188 Query query = manager.createQuery("delete from StoredMessageReference m where m.id=?1"); 189 query.setParameter(1, ack.getLastMessageId().getBrokerSequenceId()); 190 query.executeUpdate(); 191 } catch (Throwable e) { 192 adapter.rollbackEntityManager(context,manager); 193 throw IOExceptionSupport.create(e); 194 } 195 adapter.commitEntityManager(context,manager); 196 } 197 198 public void resetBatching() { 199 lastMessageId.set(-1); 200 } 201 202 public void setUsageManager(UsageManager usageManager) { 203 } 204 205 public void start() throws Exception { 206 } 207 208 public void stop() throws Exception { 209 } 210 211 public void setBatch(MessageId startAfter){ 212 } 213 214 public boolean supportsExternalBatchControl(){ 215 return false; 216 } 217 } 218 | Popular Tags |