1 46 package org.mr.kernel.services.queues; 47 48 import java.util.ArrayList ; 49 import java.util.HashMap ; 50 import java.util.Iterator ; 51 52 import org.apache.commons.logging.Log; 53 import org.apache.commons.logging.LogFactory; 54 import org.mr.IMessageListener; 55 import org.mr.MantaAgent; 56 import org.mr.MantaAgentConstants; 57 import org.mr.MantaException; 58 import org.mr.core.protocol.MantaBusMessage; 59 import org.mr.core.protocol.MantaBusMessageConsts; 60 import org.mr.core.util.SystemTime; 61 import org.mr.kernel.BlockingMessageListener; 62 import org.mr.kernel.IncomingClientMessageRouter; 63 import org.mr.kernel.control.ControlSignal; 64 import org.mr.kernel.services.ServiceConsumer; 65 66 72 public class QueueSubscriberManager { 73 ArrayList queueSubscribers = new ArrayList (); 74 HashMap subscribersToListeners = new HashMap (); 75 HashMap subscribersToNumberOfReceives = new HashMap (); 76 AbstractQueueService queue; 77 MantaAgent layer; 78 private static Log log; 79 80 QueueSubscriberManager(AbstractQueueService queue){ 81 this.queue = queue; 82 layer = MantaAgent.getInstance(); 83 log = log=LogFactory.getLog("QueueSubscriberManager"); 84 } 85 86 96 public synchronized void subscribeToQueue(ServiceConsumer consumer ,IMessageListener listener, long numberOfReceive) throws MantaException { 97 98 queueSubscribers.add(consumer); 99 subscribersToListeners.put(consumer,listener ); 100 subscribersToNumberOfReceives.put(consumer,String.valueOf(numberOfReceive)); 101 QueueMaster master =queue.getQueueMaster(); 103 sendSubscriptionToCoordinator(master ,consumer ,listener ); 105 } 106 107 115 public synchronized void unregisterFromQueue(ServiceConsumer consumer ,IMessageListener listener ) throws MantaException { 116 queueSubscribers.remove(consumer); 117 subscribersToListeners.remove(consumer); 118 subscribersToNumberOfReceives.remove(consumer); 119 IncomingClientMessageRouter router = layer.getSingletonRepository().getIncomingClientMessageRouter(); 120 router.removeIncomingClientMessageListener(queue.getServiceName()+consumer.getId(), listener); 121 122 QueueMaster master = queue.getQueueMaster(); 123 if(master == null) 125 return ; 126 MantaBusMessage msg = MantaBusMessage.getInstance(); 127 msg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CONTROL); 128 129 ControlSignal control = new ControlSignal(ControlSignal.OPERATION_TYPE_QUEUE_UNREGISTER); 131 132 msg.setPayload(control); 133 134 135 msg.setRecipient(master); 136 137 msg.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION, queue.getServiceName()); 138 139 layer.send(msg, consumer, MantaAgentConstants.NON_PERSISTENT, MantaAgentConstants.HIGH, MantaAgentConstants.CONTROL_MESSAGES_TTL+SystemTime.gmtCurrentTimeMillis()); 140 141 } 142 143 144 protected synchronized void queueCoordinatorFound(QueueMaster coordinator){ 145 Iterator subscribers = queueSubscribers.iterator(); 146 while(subscribers.hasNext()){ 147 ServiceConsumer consumer = (ServiceConsumer) subscribers.next(); 148 try { 149 sendSubscriptionToCoordinator(coordinator,consumer, (IMessageListener) subscribersToListeners.get(consumer)); 150 } catch (MantaException e) { 151 if(log.isErrorEnabled()){ 152 log.error("error sending subscribe to queue coordinator", e); 153 } 154 } 155 } 156 } 157 158 161 private void sendSubscriptionToCoordinator(QueueMaster master ,ServiceConsumer consumer ,IMessageListener listener ) throws MantaException{ 162 if(master != null){ 163 MantaBusMessage msg = MantaBusMessage.getInstance(); 165 166 msg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CONTROL); 167 168 ControlSignal control = new ControlSignal(ControlSignal.OPERATION_TYPE_QUEUE_REGISTER); 170 171 control.getParams().put(ControlSignal.NUMBER_OF_RECEIVE_ON_QUEUE_KEY, (String ) subscribersToNumberOfReceives.get(consumer)); 172 msg.setPayload(control); 173 String listenerStr = queue.getServiceName() + consumer.getId(); 174 IncomingClientMessageRouter router = layer.getSingletonRepository().getIncomingClientMessageRouter(); 176 router.removeIncomingClientMessageListener(listenerStr, listener); 177 if (listener instanceof BlockingMessageListener) 178 ((BlockingMessageListener)listener).setListenerString(listenerStr); 179 router.addIncommingClientMessageListener(listenerStr, listener); 180 181 182 183 msg.setRecipient(master); 184 185 msg.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION, queue.getServiceName()); 186 187 layer.send(msg, consumer, MantaAgentConstants.NON_PERSISTENT, MantaAgentConstants.HIGH, MantaAgentConstants.CONTROL_MESSAGES_TTL+SystemTime.gmtCurrentTimeMillis()); 188 } 189 } 190 191 public synchronized void removeSubscribeToQueue(ServiceConsumer consumer) { 192 queueSubscribers.remove(consumer); 193 subscribersToListeners.remove(consumer); 194 subscribersToNumberOfReceives.remove(consumer); 195 196 } 197 198 199 } 200 | Popular Tags |