1 46 package org.mr.kernel.control; 47 48 49 import org.mr.MantaAgent; 50 import org.mr.MantaAgentConstants; 51 import org.mr.SingletonRepository; 52 import org.mr.Version; 53 import org.mr.indexing.messages.MWBMessageConsts; 54 import org.mr.kernel.services.*; 55 import org.mr.kernel.services.queues.QueueMaster; 56 import org.mr.kernel.services.queues.VirtualQueuesManager; 57 import org.mr.kernel.services.topics.VirtualTopicManager; 58 import org.mr.kernel.world.WorldModeler; 59 import org.apache.commons.logging.Log; 60 import org.apache.commons.logging.LogFactory; 61 import org.mr.core.configuration.ConfigManager; 62 import org.mr.core.net.MantaAddress; 63 import org.mr.core.protocol.DeadEndRecipient; 64 import org.mr.core.protocol.MantaBusMessage; 65 import org.mr.core.protocol.MantaBusMessageConsts; 66 import org.mr.kernel.IncomingMessageListener; 67 import org.mr.kernel.IncomingMessageListenerRegister; 68 import org.mr.core.util.SystemTime; 69 import org.mr.core.util.byteable.ByteableList; 70 71 import java.util.Iterator ; 72 import java.util.ArrayList ; 73 74 75 84 public class ControlSignalMessageConsumer implements IncomingMessageListener { 85 86 public static final long START_CONTROL_SIGNAL_ID = 0; 87 88 89 private Log log; 90 91 public static final String CONTROL_PSEUDO_SERVICE_NAME = "controlPsadoService"; 92 93 94 95 public ControlSignalMessageConsumer(){ 96 IncomingMessageListenerRegister.setControlRouter(this); 97 log=LogFactory.getLog("ControlSignalMessageConsumer"); 98 } 99 100 103 public void messageArrived(MantaBusMessage msg) { 104 MantaAgent agent = MantaAgent.getInstance(); 105 106 String ref = msg.getHeader(MantaBusMessageConsts.HEADER_NAME_ACK_RESPONSE_REFERENCE); 108 if(ref != null ){ 109 if(log.isDebugEnabled()){ 111 log.debug("Got ACK for message "+ref+" from peer "+msg.getSource()+"."); 112 } 113 agent.gotAck(ref,msg.getSource() ); 114 115 } else { 116 if(log.isDebugEnabled()){ 117 log.debug("Got control message. Message ID="+msg.getMessageId()+", Sender="+msg.getSource()+"."); 118 } 119 } 120 121 byte ackType = msg.getRecipient().getAcknowledgeMode(); 123 if(ackType==MantaAgentConstants.AUTO_ACK ){ 124 agent.ack(msg); 125 } 126 127 String mwbType = msg.getHeader(MWBMessageConsts.MWB_TYPE); 128 if (mwbType != null) { 129 agent.getSingletonRepository().getWBManager().getWbHandler().messageArrived(msg, mwbType); 130 return; 131 } 132 ControlSignal control =(ControlSignal) msg.getPayload(); 134 135 if(control != null){ 136 byte controlOp =control.getOperation(); 137 boolean redeliverd = msg.isRedelivered(); 139 if(controlOp == ControlSignal.OPERATION_TYPE_ADVERTISE){ 141 doHandleAdvertise(msg,control); 142 }else if (controlOp == ControlSignal.OPERATION_TYPE_RECALL){ 143 doHandleRecall(msg,control); 144 }else if(controlOp == ControlSignal.OPERATION_TYPE_QUEUE_REGISTER){ 145 doHandleQueueRegister(msg,control); 146 }else if(controlOp == ControlSignal.OPERATION_TYPE_QUEUE_UNREGISTER){ 147 doHandleQueueUnregister(msg,control); 148 }else if(controlOp == ControlSignal.OPERATION_TYPE_UPDATE_SERVICE_ACTOR_STATE){ 149 doHandleServiceActorUpdate(msg,control); 150 }else if(controlOp == ControlSignal.OPERATION_TYPE_GET_QUEUE_COPY){ 151 doHandleQueueCopy(msg,control); 152 }else if(controlOp == ControlSignal.OPERATION_TYPE_ENQUEUE){ 153 doHandleEnqueue(msg,control); 154 }else if(controlOp == ControlSignal.OPERATION_TYPE_GET_MANAGEMENT_PROPERTIES){ 155 doHandleGetManagementInfo(msg,control); 156 }else if(controlOp == ControlSignal.OPERATION_TYPE_UNSUBSCRIBE_DURABLE){ 157 doHandleUnsubscribeDurable(msg,control); 158 } 159 } 161 } 163 164 165 166 167 168 169 178 private void doHandleGetManagementInfo(MantaBusMessage msg, ControlSignal control) { 179 MantaBusMessage cbm = MantaBusMessage.getInstance(); 180 cbm.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT); 181 ControlSignal info; 182 183 info = new ControlSignal(ControlSignal.OPERATION_TYPE_GET_MANAGEMENT_PROPERTIES); 185 info.getParams().put(ControlSignal.MANTA_VERSION,Version.version); 187 ConfigManager config = MantaAgent.getInstance().getSingletonRepository().getConfigManager(); 190 info.getParams().put(ControlSignal.LAYER_TYPE,config.getStringProperty( "management.agentType", "AGENT")); 192 if ((config.getStringProperty("management.jmx.rmiConnector.enabled")).equalsIgnoreCase("false")){ 193 info.getParams().put(ControlSignal.MANAGEABLE_LAYER,MantaBusMessageConsts.HEADER_VALUE_FALSE); 194 } 195 else{ 196 info.getParams().put(ControlSignal.MANAGEABLE_LAYER,MantaBusMessageConsts.HEADER_VALUE_TRUE); 197 String rmiPort=config.getStringProperty("management.jmx.rmiConnector.rmiPort","1099"); 198 info.getParams().put(ControlSignal.JMX_RMI_PORT,rmiPort); 199 if(config.getStringProperty("management.jmx.httpAdaptor.enabled")!=null){ 202 if (config.getBooleanProperty("management.jmx.httpAdaptor.enabled")){ 203 String httpPort=config.getStringProperty("management.jmx.httpAdaptor.httpPort","8080"); 204 info.getParams().put(ControlSignal.JMX_HTTP_PORT,httpPort); 205 } 206 } 207 } MantaAddress reply= ((MantaAddress)msg.getRecipient()); 210 cbm.setLogicalDestination("&&MANGEMENT_INFO$$"+reply.getAgentName()); 211 cbm.setPayload(info); 213 DeadEndRecipient recipient = DeadEndRecipient.createDeadEndRecipient(msg.getSource().getAgentName(), msg.getSource().getDomainName()); 215 cbm.setRecipient(recipient); 216 MantaAgent.getInstance().send(cbm,reply, MantaAgentConstants.NON_PERSISTENT , MantaAgentConstants.HIGH,MantaAgentConstants.CONTROL_MESSAGES_TTL+SystemTime.gmtCurrentTimeMillis()); 217 218 } 219 220 221 222 private void doHandleServiceActorUpdate(MantaBusMessage msg , ControlSignal control){ 223 SingletonRepository repo = MantaAgent.getInstance().getSingletonRepository(); 225 WorldModeler world = repo.getWorldModeler(); 226 ByteableList actors = (ByteableList) control.getParams().get(ControlSignal.SERVICE_ACTORS_UPDATE_KEY); 227 int size = actors.size(); 228 for (int i = 0; i < size; i++) { 229 ServiceActor actor =(ServiceActor)actors.get(i); 230 MantaService service = MantaAgent.getInstance().getService(actor.getServiceName(), actor.getServiceType()); 231 if(service != null){ 232 if((actor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC)&& 233 (actor.getType()== ServiceActor.CONSUMER )){ 234 if (log.isInfoEnabled()) { 236 log.info("Updating state of remote service consumer "+actor); 237 } 238 repo.getVirtualTopicManager().addConsumer((ServiceConsumer) actor); 239 }else if(actor.getType() == ServiceActor.CONSUMER ){ 240 if(service.getActor(actor.getId())==null){ 241 if (log.isInfoEnabled()) { 242 log.info("Updating state of remote service consumer "+actor); 243 } 244 service.addConsumer((ServiceConsumer) actor); 245 } 246 }else if(actor.getType() == ServiceActor.PRODUCER ){ 247 if (log.isInfoEnabled()) { 248 log.info("Updating state of remote service producer "+actor); 249 } 250 service.addProducer((ServiceProducer) actor); 251 }else{ 252 QueueMaster master =(QueueMaster) actor; 253 master.setValidUntil(Long.MAX_VALUE); 254 QueueMaster oldMaster = repo.getVirtualQueuesManager().getQueueMaster(service.getServiceName()); 255 if(oldMaster==null || !oldMaster.getId().equals(master.getId()) ){ 257 if (log.isInfoEnabled()) { 258 log.info("Updating state of remote service coordinator "+actor); 259 } 260 repo.getVirtualQueuesManager().setQueueMaster(service.getServiceName(), master); 261 } 262 263 } 264 } } 267 268 269 } 271 272 273 274 private void doHandleAdvertise(MantaBusMessage msg , ControlSignal control){ 275 MantaAgent manta = MantaAgent.getInstance(); 277 278 ServiceActor actor = (ServiceActor)msg.getSource(); 279 MantaService service =manta.getService(actor.getServiceName(), actor.getServiceType()); 280 VirtualQueuesManager vqm = manta.getSingletonRepository().getVirtualQueuesManager(); 281 VirtualTopicManager vtm = manta.getSingletonRepository().getVirtualTopicManager(); 282 if((actor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC)){ 283 if((actor.getType()== ServiceActor.CONSUMER )){ 285 if (log.isInfoEnabled()) { 286 if (!((ServiceConsumer)actor).isDurable()) { 287 log.info("Discovered remote service consumer "+actor); 288 } 289 else { 290 log.info("Discovered remote durable subscriber "+actor); 291 } 292 } 293 vtm.addConsumer((ServiceConsumer) actor); 294 } 295 else if ((actor.getType()== ServiceActor.PRODUCER )){ 297 if (log.isInfoEnabled()) { 298 log.info("Discovered remote service producer "+actor); 299 } 300 } 301 }else{ 302 if(service!= null){ 303 if(actor.getType() == ServiceActor.CONSUMER) { 304 if (log.isInfoEnabled()) { 305 log.info("Discovered remote service consumer "+actor); 306 } 307 service.addConsumer((ServiceConsumer) actor); 308 } 309 else if(actor.getType() == ServiceActor.PRODUCER) { 310 if (log.isInfoEnabled()) { 311 log.info("Discovered remote service producer "+actor); 312 } 313 service.addProducer((ServiceProducer) actor); 314 } 315 else if(actor.getType() == ServiceActor.COORDINATOR) { 316 if (log.isInfoEnabled()) { 317 log.info("Discovered remote service coordinator "+actor); 318 } 319 QueueMaster master =(QueueMaster) actor; 320 master.setValidUntil(Long.MAX_VALUE); 321 vqm.setQueueMaster(service.getServiceName(),master); 322 } 323 } 324 } 325 String update = (String ) control.getParams().get(ControlSignal.SERVICE_UPDATE_NEEDED); 326 if(service != null && update != null && !msg.getSource().getAgentName().equalsIgnoreCase(manta.getAgentName())){ 328 ByteableList updates = new ByteableList(); 329 ArrayList consumers = service.getConsumersByAgentId(manta.getAgentName()); 331 Iterator itr = consumers.iterator(); 332 while(itr.hasNext()) { 333 ServiceConsumer sc = (ServiceConsumer) itr.next(); 334 if (!sc.isDurable()) { 335 updates.add(sc); 336 } else { 337 if (ServiceActorControlCenter.isConsumerUp(sc)) { 338 updates.add(sc); 339 } 340 } 341 } 342 343 updates.addAll(service.getProducersByAgentId(manta.getAgentName())); 344 if(service.getServiceType() == MantaService.SERVICE_TYPE_QUEUE){ 345 if(vqm.amIQueueMaster(service.getServiceName())){ 346 updates.add(vqm.getQueueMaster(service.getServiceName())); 347 } 348 } 349 350 if(!updates.isEmpty()) { 351 sendServiceActorsUpdateMessage(updates , msg.getSource() ,service); 352 } 353 } 354 355 356 } 358 359 360 361 private void sendServiceActorsUpdateMessage(ByteableList serviceActors, MantaAddress destination ,MantaService service){ 362 363 MantaBusMessage cbm = MantaBusMessage.getInstance(); 364 cbm.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CONTROL); 365 ControlSignal control; 366 control = new ControlSignal(ControlSignal.OPERATION_TYPE_UPDATE_SERVICE_ACTOR_STATE); 368 369 control.getParams().put(ControlSignal.SERVICE_ACTORS_UPDATE_KEY, serviceActors); 370 cbm.setPayload(control); 371 372 DeadEndRecipient resp = DeadEndRecipient.createDeadEndRecipient(destination.getAgentName(),destination.getDomainName()); 373 cbm.setRecipient(resp); 374 MantaAgent.getInstance().send(cbm,(ServiceActor)serviceActors.get(0), MantaAgentConstants.NON_PERSISTENT , MantaAgentConstants.HIGH,MantaAgentConstants.CONTROL_MESSAGES_TTL+SystemTime.gmtCurrentTimeMillis()); 375 376 377 } 379 380 381 382 private void doHandleRecall(MantaBusMessage msg , ControlSignal control){ 383 ServiceActor actor = (ServiceActor) msg.getSource(); 385 if(actor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC) { 386 VirtualTopicManager vtm = MantaAgent.getInstance().getSingletonRepository().getVirtualTopicManager(); 387 if (actor.getType() == ServiceActor.CONSUMER) { 388 if (log.isInfoEnabled()) { 389 if (!((ServiceConsumer)actor).isDurable()) { 390 log.info("Removing remote service consumer "+actor); 391 } 392 else { 393 log.info("Removing remote durable subscriber "+actor); 394 } 395 } 396 vtm.removeConsumer((ServiceConsumer) actor); 397 } 398 else if (actor.getType() == ServiceActor.PRODUCER) { 400 if (log.isInfoEnabled()) { 401 log.info("Removing remote service producer "+actor); 402 } 403 vtm.removeProducer((ServiceProducer) actor); 405 } 406 }else{ 407 MantaService service = MantaAgent.getInstance().getService(actor.getServiceName(), actor.getServiceType()); 408 if(service!= null){ 409 if(actor.getType() == ServiceActor.CONSUMER){ 410 if (log.isInfoEnabled()) { 411 log.info("Removing remote service consumer "+actor); 412 } 413 service.removeConsumer((ServiceConsumer) actor); 414 } 415 else if(actor.getType() == ServiceActor.PRODUCER){ 416 if (log.isInfoEnabled()) { 417 log.info("Removing remote service producer "+actor); 418 } 419 service.removeProducer((ServiceProducer) actor); 420 } 421 else{ 422 VirtualQueuesManager vqm = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager(); 423 QueueMaster coordinator = vqm.getQueueMaster(service.getServiceName()); 424 if(coordinator!=null && coordinator.getId().equals(actor.getId()) ) { 425 if (log.isInfoEnabled()) { 426 log.info("Removing remote service coordinator "+actor); 427 } 428 vqm.setQueueMaster(service.getServiceName(),null); 429 } 430 } 431 } 432 } 433 434 435 436 } 438 439 private void doHandleUnsubscribeDurable(MantaBusMessage msg, ControlSignal control) { 440 ServiceActor actor = (ServiceActor) msg.getSource(); 442 MantaService service = MantaAgent.getInstance().getService(actor.getServiceName(), actor.getServiceType()); 443 if(service!= null){ 444 445 if(actor.getType() == ServiceActor.CONSUMER ){ 446 VirtualTopicManager topicManager =MantaAgent.getInstance().getSingletonRepository().getVirtualTopicManager(); 447 if (log.isInfoEnabled()) { 448 log.info("Removing remote durable subscriber "+actor); 449 } 450 topicManager.removeDurableConsumer(service.getServiceName(),(ServiceConsumer) actor); 451 } 452 } 453 454 } 455 456 460 private void doHandleQueueRegister(MantaBusMessage msg , ControlSignal control){ 461 462 VirtualQueuesManager vqm = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager(); 464 465 String numberOfReceives =(String ) control.getParams().get(ControlSignal.NUMBER_OF_RECEIVE_ON_QUEUE_KEY); 466 ServiceConsumer consumer = (ServiceConsumer) msg.getSource(); 467 468 boolean ok = vqm.registerReceiverToQueue( consumer, Long.parseLong(numberOfReceives) ); 469 if(ok){ 470 MantaAgent.getInstance().ack(msg); 471 } 472 473 } 475 479 private void doHandleQueueUnregister(MantaBusMessage msg , ControlSignal control){ 480 VirtualQueuesManager vqm = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager(); 481 ServiceConsumer consumer = (ServiceConsumer) msg.getSource(); 482 vqm.unregisterReceiverToQueue(consumer ); 483 } 484 485 488 private void doHandleEnqueue(MantaBusMessage msg, ControlSignal control) { 489 VirtualQueuesManager vqm = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager(); 490 ServiceProducer producer = (ServiceProducer) msg.getSource(); 491 QueueMaster master = (QueueMaster) msg.getRecipient(); 492 MantaBusMessage enqueuedMessage =(MantaBusMessage) control.getParams().get(ControlSignal.ENQUEUED_MESSAGE); 493 String controlId =String.valueOf(control.getControlId()) ; 494 495 vqm.handleEnqueueMessageToQueue(controlId,producer,master , enqueuedMessage ,msg.getMessageId()); 496 497 } 498 499 503 private void doHandleQueueCopy(MantaBusMessage msg, ControlSignal control) { 504 505 VirtualQueuesManager vqm = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager(); 506 ServiceConsumer consumer = (ServiceConsumer)msg.getSource() ; 507 vqm.sendQueueCopy(consumer ); 508 } 509 } 510 | Popular Tags |