1 18 package org.apache.activemq; 19 20 import javax.jms.JMSException ; 21 22 import org.apache.activemq.advisory.AdvisorySupport; 23 import org.apache.activemq.command.ActiveMQDestination; 24 import org.apache.activemq.command.ConsumerId; 25 import org.apache.activemq.command.ConsumerInfo; 26 import org.apache.activemq.command.DataStructure; 27 import org.apache.activemq.command.DestinationInfo; 28 import org.apache.activemq.command.MessageAck; 29 import org.apache.activemq.command.MessageDispatch; 30 31 public class AdvisoryConsumer implements ActiveMQDispatcher { 32 33 private final ActiveMQConnection connection; 34 private ConsumerInfo info; 35 private boolean closed; 36 int deliveredCounter; 37 38 public AdvisoryConsumer(ActiveMQConnection connection, ConsumerId consumerId) throws JMSException { 39 this.connection = connection; 40 info = new ConsumerInfo(consumerId); 41 info.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC); 42 info.setPrefetchSize(1000); 43 info.setNoLocal(true); 44 45 this.connection.addDispatcher(info.getConsumerId(), this); 46 this.connection.syncSendPacket(this.info); 47 } 48 49 public void dispose() { 50 if (!closed) { 51 this.connection.removeDispatcher(info.getConsumerId()); 52 closed = true; 53 } 54 } 55 56 public void dispatch(MessageDispatch md) { 57 58 deliveredCounter++; 60 if( deliveredCounter > (0.75 * info.getPrefetchSize()) ) { 61 try { 62 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter); 63 connection.asyncSendPacket(ack); 64 deliveredCounter = 0; 65 } catch (JMSException e) { 66 connection.onAsyncException(e); 67 } 68 } 69 70 DataStructure o = md.getMessage().getDataStructure(); 71 if( o!=null && o.getClass() == DestinationInfo.class ) { 72 processDestinationInfo((DestinationInfo) o); 73 } else { 74 connection.onAsyncException(new JMSException ("Unexpected message was dispatched to the AdvisoryConsumer: "+md)); 75 } 76 77 } 78 79 private void processDestinationInfo(DestinationInfo dinfo) { 80 ActiveMQDestination dest = dinfo.getDestination(); 81 if( dinfo.getOperationType() == DestinationInfo.ADD_OPERATION_TYPE ) { 82 connection.activeTempDestinations.put(dest,dest); 83 } else if( dinfo.getOperationType() == DestinationInfo.REMOVE_OPERATION_TYPE ) { 84 connection.activeTempDestinations.remove(dest); 85 } 86 } 87 88 } 89 | Popular Tags |