1 18 19 package org.apache.activemq; 20 21 import java.util.Collections ; 22 import java.util.LinkedList ; 23 import java.util.List ; 24 25 import javax.jms.ConnectionConsumer ; 26 import javax.jms.IllegalStateException ; 27 import javax.jms.JMSException ; 28 import javax.jms.ServerSession ; 29 import javax.jms.ServerSessionPool ; 30 import javax.jms.Session ; 31 32 import org.apache.activemq.command.ConsumerInfo; 33 import org.apache.activemq.command.MessageDispatch; 34 35 57 58 public class ActiveMQConnectionConsumer implements ConnectionConsumer , ActiveMQDispatcher { 59 60 private ActiveMQConnection connection; 61 private ServerSessionPool sessionPool; 62 private ConsumerInfo consumerInfo; 63 private boolean closed; 64 65 protected final List messageQueue = Collections.synchronizedList(new LinkedList ()); 66 67 68 77 protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, 78 ServerSessionPool theSessionPool, 79 ConsumerInfo theConsumerInfo) throws JMSException { 80 this.connection = theConnection; 81 this.sessionPool = theSessionPool; 82 this.consumerInfo = theConsumerInfo; 83 84 this.connection.addConnectionConsumer(this); 85 this.connection.addDispatcher(consumerInfo.getConsumerId(), this); 86 this.connection.syncSendPacket(this.consumerInfo); 87 } 88 89 96 97 public ServerSessionPool getServerSessionPool() throws JMSException { 98 if (closed) { 99 throw new IllegalStateException ("The Connection Consumer is closed"); 100 } 101 return this.sessionPool; 102 } 103 104 115 116 public void close() throws JMSException { 117 if (!closed) { 118 dispose(); 119 this.connection.asyncSendPacket(this.consumerInfo.createRemoveCommand()); 120 } 121 122 } 123 124 public void dispose() { 125 if (!closed) { 126 this.connection.removeDispatcher(consumerInfo.getConsumerId()); 127 this.connection.removeConnectionConsumer(this); 128 closed = true; 129 } 130 } 131 132 public void dispatch(MessageDispatch messageDispatch) { 133 try { 134 messageDispatch.setConsumer(this); 135 136 ServerSession serverSession = sessionPool.getServerSession(); 137 Session s = serverSession.getSession(); 138 ActiveMQSession session = null; 139 140 141 if( s instanceof ActiveMQSession ) { 142 session = (ActiveMQSession) s; 143 } else if (s instanceof ActiveMQTopicSession) { 144 ActiveMQTopicSession topicSession = (ActiveMQTopicSession) s; 145 session = (ActiveMQSession) topicSession.getNext(); 146 } else if (s instanceof ActiveMQQueueSession) { 147 ActiveMQQueueSession queueSession = (ActiveMQQueueSession) s; 148 session = (ActiveMQSession) queueSession.getNext(); 149 } else { 150 connection.onAsyncException(new JMSException ("Session pool provided an invalid session type: "+s.getClass())); 151 return; 152 } 153 154 session.dispatch(messageDispatch); 155 serverSession.start(); 156 } catch (JMSException e) { 157 connection.onAsyncException(e); 158 } 159 } 160 161 public String toString() { 162 return "ActiveMQConnectionConsumer { value=" +consumerInfo.getConsumerId()+" }"; 163 } 164 } 165 | Popular Tags |