1 46 50 package org.mr.kernel; 51 52 import java.util.HashMap ; 53 import java.util.HashSet ; 54 import java.util.Iterator ; 55 import java.util.Set ; 56 57 import javax.jms.JMSException ; 58 59 import org.apache.commons.logging.Log; 60 import org.apache.commons.logging.LogFactory; 61 import org.mr.IMessageListener; 62 import org.mr.MantaAgent; 63 import org.mr.MantaAgentConstants; 64 import org.mr.MessageManipulator; 65 import org.mr.api.jms.MantaMessage; 66 import org.mr.core.protocol.MantaBusMessage; 67 import org.mr.core.protocol.MantaBusMessageConsts; 68 import org.mr.core.util.Stage; 69 import org.mr.core.util.StageHandler; 70 import org.mr.core.util.StageParams; 71 72 73 82 public class IncomingClientMessageRouter implements StageHandler ,IncomingMessageListener { 83 private Log log; 84 Stage stage = null; 85 HashMap listenersMap = new HashMap (); 86 87 88 public IncomingClientMessageRouter(){ 89 log=LogFactory.getLog("IncomingClientMessageRouter"); 90 IncomingMessageListenerRegister.setClientRouter(this); 91 StageParams params = new StageParams(); 92 params.setBlocking(false); 93 params.setPersistent(false); 94 params.setHandler(this); 95 params.setNumberOfStartThreads(1); 96 params.setStageName("IncomingClientMessages"); 97 stage = new Stage(params); 98 } 99 100 101 102 public void addIncommingClientMessageListener(String destenation ,IMessageListener listener){ 103 synchronized (listenersMap) { 104 Set listeners =(Set ) listenersMap.get(destenation); 105 if(listeners == null){ 106 listeners = new HashSet (); 107 listenersMap.put(destenation , listeners); 108 } 109 listeners.add(listener); 110 } 111 } 113 public void removeIncomingClientMessageListener(String destenation ,IMessageListener listener){ 114 synchronized (listenersMap) { 115 Set listeners =(Set ) listenersMap.get(destenation); 116 if(listeners != null){ 117 listeners.remove(listener); 118 if(listeners.size() == 0) 119 listenersMap.remove(destenation); 120 } 121 } 122 } 124 125 128 public boolean handle(Object event) { 129 130 MantaBusMessage msg = (MantaBusMessage)event; 131 132 MessageManipulator mm =MantaAgent.getInstance().getSingletonRepository().getMessageManipulator(); 134 if(mm!= null){ 135 msg = mm.manipulate(msg, null, MessageManipulator.INCOMING); 136 } 137 138 boolean isAck = false; 140 String ackRejectTo = msg.getHeader(MantaBusMessageConsts.HEADER_NAME_ACK_REJECT_RESPONSE_REFERENCE); 141 if (ackRejectTo != null) { 142 MantaAgent.getInstance().gotAckReject(ackRejectTo, 143 msg.getSource()); 144 isAck = true; 145 } 146 147 String ackTo = msg.getHeader(MantaBusMessageConsts.HEADER_NAME_ACK_RESPONSE_REFERENCE); 149 if(ackTo!= null){ 150 if(log.isDebugEnabled()){ 151 log.debug("Got message. Message ID="+msg.getMessageId()+". Ack for message "+ackTo); 152 } 153 MantaAgent.getInstance().gotAck(ackTo ,msg.getSource()); 154 isAck = true; 155 } 156 else { 157 if(log.isDebugEnabled()){ 158 log.debug("Got message. Message ID="+msg.getMessageId()); 159 } 160 } 161 String dest = msg.getLogicalDestination(); 162 boolean sentToListeners = false; 163 164 synchronized (listenersMap) { 165 Set listeners = (Set ) listenersMap.get(dest); 166 if (listeners != null && listeners.size() > 0) { 167 Iterator curListeners = listeners.iterator(); 168 while (curListeners.hasNext()) { 169 IMessageListener listener =(IMessageListener)curListeners.next(); 170 listener.onMessage(msg); 171 } 172 sentToListeners = true; 173 } 174 } 175 if (sentToListeners) { 176 byte ackType = msg.getRecipient().getAcknowledgeMode(); 178 if(ackType == MantaAgentConstants.AUTO_ACK || ackType == MantaAgentConstants.DUPLICATE_ACK){ 179 if(msg.getPayload()==null || !(msg.getPayload() instanceof MantaMessage)){ 180 MantaAgent.getInstance().ack(msg); 182 } 183 } 184 } 185 else { 186 if (!isAck) { 187 if (log.isInfoEnabled()) { 188 log.info("No listeners found for message, message will be discarded. Message ID="+msg.getMessageId()+", message was"+msg); 189 } 190 MantaAgent.getInstance().ackReject(msg); 191 } 192 } 193 return true; 194 } 195 196 197 200 public void messageArrived(MantaBusMessage msg) { 201 stage.enqueue(msg); 202 } 203 204 } 205 | Popular Tags |