1 18 19 package org.apache.activemq; 20 21 import java.util.Iterator ; 22 import java.util.List ; 23 24 import javax.jms.JMSException ; 25 26 import org.apache.activemq.command.ConsumerId; 27 import org.apache.activemq.command.MessageDispatch; 28 import org.apache.activemq.thread.Task; 29 import org.apache.activemq.thread.TaskRunner; 30 import org.apache.activemq.util.JMSExceptionSupport; 31 32 38 public class ActiveMQSessionExecutor implements Task { 39 40 private ActiveMQSession session; 41 private MessageDispatchChannel messageQueue = new MessageDispatchChannel(); 42 private boolean dispatchedBySessionPool; 43 private TaskRunner taskRunner; 44 45 ActiveMQSessionExecutor(ActiveMQSession session) { 46 this.session = session; 47 } 48 49 void setDispatchedBySessionPool(boolean value) { 50 dispatchedBySessionPool = value; 51 wakeup(); 52 } 53 54 55 void execute(MessageDispatch message) throws InterruptedException { 56 if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool){ 57 dispatch(message); 58 }else { 59 messageQueue.enqueue(message); 60 wakeup(); 61 } 62 } 63 64 public void wakeup() { 65 if( !dispatchedBySessionPool ) { 66 if( session.isSessionAsyncDispatch() ) { 67 try { 68 if( taskRunner == null ) { 69 taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ Session: "+session.getSessionId()); 70 } 71 taskRunner.wakeup(); 72 } catch (InterruptedException e) { 73 Thread.currentThread().interrupt(); 74 } 75 } else { 76 while( iterate() ) 77 ; 78 } 79 } 80 } 81 82 void executeFirst(MessageDispatch message) { 83 messageQueue.enqueueFirst(message); 84 wakeup(); 85 } 86 87 public boolean hasUncomsumedMessages() { 88 return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty(); 89 } 90 91 void dispatch(MessageDispatch message){ 92 93 95 for (Iterator i = this.session.consumers.iterator(); i.hasNext();) { 96 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next(); 97 ConsumerId consumerId = message.getConsumerId(); 98 if( consumerId.equals(consumer.getConsumerId()) ) { 99 consumer.dispatch(message); 100 } 101 } 102 } 103 104 synchronized void start() { 105 if( !messageQueue.isRunning() ) { 106 messageQueue.start(); 107 if( hasUncomsumedMessages() ) 108 wakeup(); 109 } 110 } 111 112 void stop() throws JMSException { 113 try { 114 if( messageQueue.isRunning() ) { 115 messageQueue.stop(); 116 if( taskRunner!=null ) { 117 taskRunner.shutdown(); 118 taskRunner=null; 119 } 120 } 121 } catch (InterruptedException e) { 122 Thread.currentThread().interrupt(); 123 throw JMSExceptionSupport.create(e); 124 } 125 } 126 127 boolean isRunning() { 128 return messageQueue.isRunning(); 129 } 130 131 void close() { 132 messageQueue.close(); 133 } 134 135 void clear() { 136 messageQueue.clear(); 137 } 138 139 MessageDispatch dequeueNoWait() { 140 return (MessageDispatch) messageQueue.dequeueNoWait(); 141 } 142 143 protected void clearMessagesInProgress(){ 144 messageQueue.clear(); 145 } 146 147 public boolean isEmpty() { 148 return messageQueue.isEmpty(); 149 } 150 151 public boolean iterate() { 152 153 for (Iterator i = this.session.consumers.iterator(); i.hasNext();) { 155 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next(); 156 if( consumer.iterate() ) { 157 return true; 158 } 159 } 160 161 MessageDispatch message = messageQueue.dequeueNoWait(); 163 if( message==null ) { 164 return false; 165 } else { 166 dispatch(message); 167 return !messageQueue.isEmpty(); 168 } 169 } 170 171 List getUnconsumedMessages() { 172 return messageQueue.removeAll(); 173 } 174 175 } 176 | Popular Tags |