1 18 package org.apache.activemq.broker.region.policy; 19 20 import org.apache.activemq.ActiveMQMessageTransformation; 21 import org.apache.activemq.broker.ConnectionContext; 22 import org.apache.activemq.broker.region.Destination; 23 import org.apache.activemq.broker.region.MessageReference; 24 import org.apache.activemq.broker.region.SubscriptionRecovery; 25 import org.apache.activemq.broker.region.Topic; 26 import org.apache.activemq.command.*; 27 import org.apache.activemq.util.IdGenerator; 28 import org.apache.commons.logging.Log; 29 import org.apache.commons.logging.LogFactory; 30 31 import javax.jms.JMSException ; 32 import javax.jms.Message ; 33 import javax.jms.MessageListener ; 34 import java.util.concurrent.atomic.AtomicLong ; 35 36 44 public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { 45 46 private static final Log log = LogFactory.getLog(QueryBasedSubscriptionRecoveryPolicy.class); 47 48 private MessageQuery query; 49 private AtomicLong messageSequence = new AtomicLong (0); 50 private IdGenerator idGenerator = new IdGenerator(); 51 private ProducerId producerId = createProducerId(); 52 53 public SubscriptionRecoveryPolicy copy() { 54 QueryBasedSubscriptionRecoveryPolicy rc = new QueryBasedSubscriptionRecoveryPolicy(); 55 rc.setQuery(query); 56 return rc; 57 } 58 59 public boolean add(ConnectionContext context, MessageReference message) throws Exception { 60 return query.validateUpdate(message.getMessage()); 61 } 62 63 public void recover(final ConnectionContext context,final Topic topic,final SubscriptionRecovery sub) 64 throws Exception { 65 if(query!=null){ 66 ActiveMQDestination destination=sub.getActiveMQDestination(); 67 query.execute(destination,new MessageListener (){ 68 69 public void onMessage(Message message){ 70 dispatchInitialMessage(message,topic,context,sub); 71 } 72 }); 73 } 74 } 75 76 public void start() throws Exception { 77 if (query == null) { 78 throw new IllegalArgumentException ("No query property configured"); 79 } 80 } 81 82 public void stop() throws Exception { 83 } 84 85 public MessageQuery getQuery() { 86 return query; 87 } 88 89 92 public void setQuery(MessageQuery query) { 93 this.query = query; 94 } 95 96 public org.apache.activemq.command.Message[] browse(ActiveMQDestination dest) throws Exception { 97 return new org.apache.activemq.command.Message[0]; 98 } 99 100 protected void dispatchInitialMessage(Message message, Destination regionDestination, ConnectionContext context, SubscriptionRecovery sub) { 101 try { 102 ActiveMQMessage activeMessage = ActiveMQMessageTransformation.transformMessage(message, null); 103 ActiveMQDestination destination = activeMessage.getDestination(); 104 if (destination == null) { 105 destination = sub.getActiveMQDestination(); 106 activeMessage.setDestination(destination); 107 } 108 activeMessage.setRegionDestination(regionDestination); 109 configure(activeMessage); 110 sub.addRecoveredMessage(context,activeMessage); 111 } 112 catch (Throwable e) { 113 log.warn("Failed to dispatch initial message: " + message + " into subscription. Reason: " + e, e); 114 } 115 } 116 117 protected void configure(ActiveMQMessage msg) throws JMSException { 118 long sequenceNumber = messageSequence.incrementAndGet(); 119 msg.setMessageId(new MessageId(producerId, sequenceNumber)); 120 msg.onSend(); 121 msg.setProducerId(producerId); 122 } 123 124 protected ProducerId createProducerId() { 125 String id = idGenerator.generateId(); 126 ConnectionId connectionId = new ConnectionId(id); 127 SessionId sessionId = new SessionId(connectionId, 1); 128 return new ProducerId(sessionId, 1); 129 } 130 } 131 | Popular Tags |