KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > kernel > control > ControlSignalMessageConsumer


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Amir Shevat.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 package org.mr.kernel.control;
47
48
49 import org.mr.MantaAgent;
50 import org.mr.MantaAgentConstants;
51 import org.mr.SingletonRepository;
52 import org.mr.Version;
53 import org.mr.indexing.messages.MWBMessageConsts;
54 import org.mr.kernel.services.*;
55 import org.mr.kernel.services.queues.QueueMaster;
56 import org.mr.kernel.services.queues.VirtualQueuesManager;
57 import org.mr.kernel.services.topics.VirtualTopicManager;
58 import org.mr.kernel.world.WorldModeler;
59 import org.apache.commons.logging.Log;
60 import org.apache.commons.logging.LogFactory;
61 import org.mr.core.configuration.ConfigManager;
62 import org.mr.core.net.MantaAddress;
63 import org.mr.core.protocol.DeadEndRecipient;
64 import org.mr.core.protocol.MantaBusMessage;
65 import org.mr.core.protocol.MantaBusMessageConsts;
66 import org.mr.kernel.IncomingMessageListener;
67 import org.mr.kernel.IncomingMessageListenerRegister;
68 import org.mr.core.util.SystemTime;
69 import org.mr.core.util.byteable.ByteableList;
70
71 import java.util.Iterator JavaDoc;
72 import java.util.ArrayList JavaDoc;
73
74
75 /**
76  * This class represents the entry point of all control messages ,this object gets the control
77  * message and knows what to do with it.
78  *
79  * @version 1.0
80  * @since Dec 29, 2003
81  * @author Amir Shevat
82  *
83  */

84 public class ControlSignalMessageConsumer implements IncomingMessageListener {
85
86     public static final long START_CONTROL_SIGNAL_ID = 0;
87
88
89     private Log log;
90
91     public static final String JavaDoc CONTROL_PSEUDO_SERVICE_NAME = "controlPsadoService";
92
93
94
95     public ControlSignalMessageConsumer(){
96         IncomingMessageListenerRegister.setControlRouter(this);
97         log=LogFactory.getLog("ControlSignalMessageConsumer");
98     }
99
100     /* (non-Javadoc)
101       * @see org.mr.core.protocol.MantaBusMessageListener#messageArrived(org.mr.core.protocol.MantaBusMessage)
102       */

103     public void messageArrived(MantaBusMessage msg) {
104         MantaAgent agent = MantaAgent.getInstance();
105
106         // check id this is ACK
107
String JavaDoc ref = msg.getHeader(MantaBusMessageConsts.HEADER_NAME_ACK_RESPONSE_REFERENCE);
108         if(ref != null ){
109             // we got OK ack remove memory
110
if(log.isDebugEnabled()){
111                 log.debug("Got ACK for message "+ref+" from peer "+msg.getSource()+".");
112             }
113             agent.gotAck(ref,msg.getSource() );
114
115         } else {
116             if(log.isDebugEnabled()){
117                 log.debug("Got control message. Message ID="+msg.getMessageId()+", Sender="+msg.getSource()+".");
118             }
119         }
120
121         // ack on message
122
byte ackType = msg.getRecipient().getAcknowledgeMode();
123         if(ackType==MantaAgentConstants.AUTO_ACK ){
124             agent.ack(msg);
125         }
126
127         String JavaDoc mwbType = msg.getHeader(MWBMessageConsts.MWB_TYPE);
128         if (mwbType != null) {
129             agent.getSingletonRepository().getWBManager().getWbHandler().messageArrived(msg, mwbType);
130             return;
131         }
132         // go on to control cases
133
ControlSignal control =(ControlSignal) msg.getPayload();
134
135         if(control != null){
136             byte controlOp =control.getOperation();
137             // if this is a retransmit of an old message ignore it
138
boolean redeliverd = msg.isRedelivered();
139             // new control message process it
140
if(controlOp == ControlSignal.OPERATION_TYPE_ADVERTISE){
141                 doHandleAdvertise(msg,control);
142             }else if (controlOp == ControlSignal.OPERATION_TYPE_RECALL){
143                 doHandleRecall(msg,control);
144             }else if(controlOp == ControlSignal.OPERATION_TYPE_QUEUE_REGISTER){
145                 doHandleQueueRegister(msg,control);
146             }else if(controlOp == ControlSignal.OPERATION_TYPE_QUEUE_UNREGISTER){
147                 doHandleQueueUnregister(msg,control);
148             }else if(controlOp == ControlSignal.OPERATION_TYPE_UPDATE_SERVICE_ACTOR_STATE){
149                 doHandleServiceActorUpdate(msg,control);
150             }else if(controlOp == ControlSignal.OPERATION_TYPE_GET_QUEUE_COPY){
151                 doHandleQueueCopy(msg,control);
152             }else if(controlOp == ControlSignal.OPERATION_TYPE_ENQUEUE){
153                 doHandleEnqueue(msg,control);
154             }else if(controlOp == ControlSignal.OPERATION_TYPE_GET_MANAGEMENT_PROPERTIES){
155                 doHandleGetManagementInfo(msg,control);
156             }else if(controlOp == ControlSignal.OPERATION_TYPE_UNSUBSCRIBE_DURABLE){
157                 doHandleUnsubscribeDurable(msg,control);
158             }
159         }//if(controlOp != null){
160

161     }//messageArrived
162

163
164
165
166
167
168
169     /**
170      * gets a request to send it's management parameters: weather this layer allow
171      * JMX rmi management (jmx.rmiConnector.enabled) include the jmx.rmiPort and
172      * jmx.httpPort as they are updated in the default_config file or the default ones
173      * if there are no values in the file.
174      * returns a message with the ports to requester.
175      * @param msg
176      * @param control
177      */

178     private void doHandleGetManagementInfo(MantaBusMessage msg, ControlSignal control) {
179         MantaBusMessage cbm = MantaBusMessage.getInstance();
180         cbm.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT);
181         ControlSignal info;
182
183         //creates a ControlSignal that will hold the management parameters and be sent to the Console
184
info = new ControlSignal(ControlSignal.OPERATION_TYPE_GET_MANAGEMENT_PROPERTIES);
185         //inserts the manta version of this layer into the message returned to the console
186
info.getParams().put(ControlSignal.MANTA_VERSION,Version.version);
187          //in case the JMXrmi connection is set to false at the default config file
188
//the layer is not manageable
189
ConfigManager config = MantaAgent.getInstance().getSingletonRepository().getConfigManager();
190          //inserts the layer type
191
info.getParams().put(ControlSignal.LAYER_TYPE,config.getStringProperty( "management.agentType", "AGENT"));
192         if ((config.getStringProperty("management.jmx.rmiConnector.enabled")).equalsIgnoreCase("false")){
193             info.getParams().put(ControlSignal.MANAGEABLE_LAYER,MantaBusMessageConsts.HEADER_VALUE_FALSE);
194         }
195         else{
196             info.getParams().put(ControlSignal.MANAGEABLE_LAYER,MantaBusMessageConsts.HEADER_VALUE_TRUE);
197             String JavaDoc rmiPort=config.getStringProperty("management.jmx.rmiConnector.rmiPort","1099");
198             info.getParams().put(ControlSignal.JMX_RMI_PORT,rmiPort);
199             //In case the jmx.httpAdaptor.enabled is set to false at the default config file
200
//don't add the http adaptor port to the message sent to the Console
201
if(config.getStringProperty("management.jmx.httpAdaptor.enabled")!=null){
202                 if (config.getBooleanProperty("management.jmx.httpAdaptor.enabled")){
203                     String JavaDoc httpPort=config.getStringProperty("management.jmx.httpAdaptor.httpPort","8080");
204                     info.getParams().put(ControlSignal.JMX_HTTP_PORT,httpPort);
205                 }
206             }
207         }//else
208
//the address for the sent message
209
MantaAddress reply= ((MantaAddress)msg.getRecipient());
210         cbm.setLogicalDestination("&&MANGEMENT_INFO$$"+reply.getAgentName());
211         // insert the control payload
212
cbm.setPayload(info);
213         // we want to reply to a DeadEndRecipient we do not want any ACK
214
DeadEndRecipient recipient = DeadEndRecipient.createDeadEndRecipient(msg.getSource().getAgentName(), msg.getSource().getDomainName());
215         cbm.setRecipient(recipient);
216         MantaAgent.getInstance().send(cbm,reply, MantaAgentConstants.NON_PERSISTENT , MantaAgentConstants.HIGH,MantaAgentConstants.CONTROL_MESSAGES_TTL+SystemTime.gmtCurrentTimeMillis());
217
218     }
219
220
221
222     private void doHandleServiceActorUpdate(MantaBusMessage msg , ControlSignal control){
223         // update agent-service map to the world mapper
224
SingletonRepository repo = MantaAgent.getInstance().getSingletonRepository();
225         WorldModeler world = repo.getWorldModeler();
226         ByteableList actors = (ByteableList) control.getParams().get(ControlSignal.SERVICE_ACTORS_UPDATE_KEY);
227         int size = actors.size();
228         for (int i = 0; i < size; i++) {
229             ServiceActor actor =(ServiceActor)actors.get(i);
230             MantaService service = MantaAgent.getInstance().getService(actor.getServiceName(), actor.getServiceType());
231             if(service != null){
232                 if((actor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC)&&
233                         (actor.getType()== ServiceActor.CONSUMER )){
234                         // topic consumers need special attention
235
if (log.isInfoEnabled()) {
236                             log.info("Updating state of remote service consumer "+actor);
237                         }
238                         repo.getVirtualTopicManager().addConsumer((ServiceConsumer) actor);
239                 }else if(actor.getType() == ServiceActor.CONSUMER ){
240                     if(service.getActor(actor.getId())==null){
241                         if (log.isInfoEnabled()) {
242                             log.info("Updating state of remote service consumer "+actor);
243                         }
244                         service.addConsumer((ServiceConsumer) actor);
245                     }
246                 }else if(actor.getType() == ServiceActor.PRODUCER ){
247                     if (log.isInfoEnabled()) {
248                         log.info("Updating state of remote service producer "+actor);
249                     }
250                     service.addProducer((ServiceProducer) actor);
251                 }else{
252                     QueueMaster master =(QueueMaster) actor;
253                     master.setValidUntil(Long.MAX_VALUE);
254                     QueueMaster oldMaster = repo.getVirtualQueuesManager().getQueueMaster(service.getServiceName());
255                     // we do not want to reset the same queue coordinator
256
if(oldMaster==null || !oldMaster.getId().equals(master.getId()) ){
257                         if (log.isInfoEnabled()) {
258                             log.info("Updating state of remote service coordinator "+actor);
259                         }
260                         repo.getVirtualQueuesManager().setQueueMaster(service.getServiceName(), master);
261                     }
262
263                 }
264             }//if
265
}//for
266

267
268
269     }//doHandleServiceActorUpdate
270

271
272
273
274     private void doHandleAdvertise(MantaBusMessage msg , ControlSignal control){
275         // add agent-service map to the world mapper
276
MantaAgent manta = MantaAgent.getInstance();
277
278         ServiceActor actor = (ServiceActor)msg.getSource();
279         MantaService service =manta.getService(actor.getServiceName(), actor.getServiceType());
280         VirtualQueuesManager vqm = manta.getSingletonRepository().getVirtualQueuesManager();
281         VirtualTopicManager vtm = manta.getSingletonRepository().getVirtualTopicManager();
282         if((actor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC)){
283                 // topic consumers need special attention
284
if((actor.getType()== ServiceActor.CONSUMER )){
285                 if (log.isInfoEnabled()) {
286                     if (!((ServiceConsumer)actor).isDurable()) {
287                         log.info("Discovered remote service consumer "+actor);
288                     }
289                     else {
290                         log.info("Discovered remote durable subscriber "+actor);
291                     }
292                 }
293                 vtm.addConsumer((ServiceConsumer) actor);
294             }
295             // not interested in producers - just log
296
else if ((actor.getType()== ServiceActor.PRODUCER )){
297                 if (log.isInfoEnabled()) {
298                     log.info("Discovered remote service producer "+actor);
299                 }
300             }
301         }else{
302             if(service!= null){
303                 if(actor.getType() == ServiceActor.CONSUMER) {
304                     if (log.isInfoEnabled()) {
305                         log.info("Discovered remote service consumer "+actor);
306                     }
307                     service.addConsumer((ServiceConsumer) actor);
308                 }
309                 else if(actor.getType() == ServiceActor.PRODUCER) {
310                     if (log.isInfoEnabled()) {
311                         log.info("Discovered remote service producer "+actor);
312                     }
313                     service.addProducer((ServiceProducer) actor);
314                 }
315                 else if(actor.getType() == ServiceActor.COORDINATOR) {
316                     if (log.isInfoEnabled()) {
317                         log.info("Discovered remote service coordinator "+actor);
318                     }
319                     QueueMaster master =(QueueMaster) actor;
320                     master.setValidUntil(Long.MAX_VALUE);
321                     vqm.setQueueMaster(service.getServiceName(),master);
322                 }
323             }
324         }
325             String JavaDoc update = (String JavaDoc) control.getParams().get(ControlSignal.SERVICE_UPDATE_NEEDED);
326             // update only if needed and only others
327
if(service != null && update != null && !msg.getSource().getAgentName().equalsIgnoreCase(manta.getAgentName())){
328                 ByteableList updates = new ByteableList();
329                 //Aviad if consumer is durable check that it is up before sending him in update
330
ArrayList JavaDoc consumers = service.getConsumersByAgentId(manta.getAgentName());
331                 Iterator JavaDoc itr = consumers.iterator();
332                 while(itr.hasNext()) {
333                     ServiceConsumer sc = (ServiceConsumer) itr.next();
334                     if (!sc.isDurable()) {
335                         updates.add(sc);
336                     } else {
337                         if (ServiceActorControlCenter.isConsumerUp(sc)) {
338                             updates.add(sc);
339                         }
340                     }
341                 }
342
343                 updates.addAll(service.getProducersByAgentId(manta.getAgentName()));
344                 if(service.getServiceType() == MantaService.SERVICE_TYPE_QUEUE){
345                     if(vqm.amIQueueMaster(service.getServiceName())){
346                         updates.add(vqm.getQueueMaster(service.getServiceName()));
347                     }
348                 }
349
350                 if(!updates.isEmpty()) {
351                     sendServiceActorsUpdateMessage(updates , msg.getSource() ,service);
352                 }
353             }
354
355
356     }//doAdvertise
357

358
359
360
361     private void sendServiceActorsUpdateMessage(ByteableList serviceActors, MantaAddress destination ,MantaService service){
362
363         MantaBusMessage cbm = MantaBusMessage.getInstance();
364         cbm.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CONTROL);
365         ControlSignal control;
366         // insert the control payload
367
control = new ControlSignal(ControlSignal.OPERATION_TYPE_UPDATE_SERVICE_ACTOR_STATE);
368
369         control.getParams().put(ControlSignal.SERVICE_ACTORS_UPDATE_KEY, serviceActors);
370         cbm.setPayload(control);
371
372         DeadEndRecipient resp = DeadEndRecipient.createDeadEndRecipient(destination.getAgentName(),destination.getDomainName());
373         cbm.setRecipient(resp);
374         MantaAgent.getInstance().send(cbm,(ServiceActor)serviceActors.get(0), MantaAgentConstants.NON_PERSISTENT , MantaAgentConstants.HIGH,MantaAgentConstants.CONTROL_MESSAGES_TTL+SystemTime.gmtCurrentTimeMillis());
375
376
377     }//sendServiceCorrectionMessage
378

379
380
381
382     private void doHandleRecall(MantaBusMessage msg , ControlSignal control){
383         // remove agent-service map from the world mapper
384
ServiceActor actor = (ServiceActor) msg.getSource();
385         if(actor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC) {
386             VirtualTopicManager vtm = MantaAgent.getInstance().getSingletonRepository().getVirtualTopicManager();
387             if (actor.getType() == ServiceActor.CONSUMER) {
388                 if (log.isInfoEnabled()) {
389                     if (!((ServiceConsumer)actor).isDurable()) {
390                         log.info("Removing remote service consumer "+actor);
391                     }
392                     else {
393                         log.info("Removing remote durable subscriber "+actor);
394                     }
395                 }
396                 vtm.removeConsumer((ServiceConsumer) actor);
397             }
398             // not interesting - just log
399
else if (actor.getType() == ServiceActor.PRODUCER) {
400                 if (log.isInfoEnabled()) {
401                     log.info("Removing remote service producer "+actor);
402                 }
403                 //Aviad added remove producer from vtm and world map
404
vtm.removeProducer((ServiceProducer) actor);
405             }
406         }else{
407             MantaService service = MantaAgent.getInstance().getService(actor.getServiceName(), actor.getServiceType());
408             if(service!= null){
409                 if(actor.getType() == ServiceActor.CONSUMER){
410                     if (log.isInfoEnabled()) {
411                         log.info("Removing remote service consumer "+actor);
412                     }
413                     service.removeConsumer((ServiceConsumer) actor);
414                 }
415                 else if(actor.getType() == ServiceActor.PRODUCER){
416                     if (log.isInfoEnabled()) {
417                         log.info("Removing remote service producer "+actor);
418                     }
419                     service.removeProducer((ServiceProducer) actor);
420                 }
421                 else{
422                     VirtualQueuesManager vqm = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager();
423                     QueueMaster coordinator = vqm.getQueueMaster(service.getServiceName());
424                     if(coordinator!=null && coordinator.getId().equals(actor.getId()) ) {
425                         if (log.isInfoEnabled()) {
426                             log.info("Removing remote service coordinator "+actor);
427                         }
428                         vqm.setQueueMaster(service.getServiceName(),null);
429                     }
430                 }
431             }
432         }
433
434
435
436     }//doRecall
437

438
439     private void doHandleUnsubscribeDurable(MantaBusMessage msg, ControlSignal control) {
440 // remove agent-service map from the world mapper
441
ServiceActor actor = (ServiceActor) msg.getSource();
442         MantaService service = MantaAgent.getInstance().getService(actor.getServiceName(), actor.getServiceType());
443         if(service!= null){
444
445             if(actor.getType() == ServiceActor.CONSUMER ){
446                 VirtualTopicManager topicManager =MantaAgent.getInstance().getSingletonRepository().getVirtualTopicManager();
447                 if (log.isInfoEnabled()) {
448                     log.info("Removing remote durable subscriber "+actor);
449                 }
450                 topicManager.removeDurableConsumer(service.getServiceName(),(ServiceConsumer) actor);
451             }
452         }
453
454     }
455
456     /**
457      * sent from the consumer of the queue do the queue coordinator
458      * telling the coordinator that recieve is wanted
459      */

460     private void doHandleQueueRegister(MantaBusMessage msg , ControlSignal control){
461
462         // register agent to receive
463
VirtualQueuesManager vqm = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager();
464
465         String JavaDoc numberOfReceives =(String JavaDoc) control.getParams().get(ControlSignal.NUMBER_OF_RECEIVE_ON_QUEUE_KEY);
466         ServiceConsumer consumer = (ServiceConsumer) msg.getSource();
467
468          boolean ok = vqm.registerReceiverToQueue( consumer, Long.parseLong(numberOfReceives) );
469          if(ok){
470              MantaAgent.getInstance().ack(msg);
471          }
472
473     }//doQueueRegister
474

475     /**
476      * sent from the consumer of the queue do the queue coordinator
477      * telling the coordinator that no recieve is wanted any more
478      */

479     private void doHandleQueueUnregister(MantaBusMessage msg , ControlSignal control){
480         VirtualQueuesManager vqm = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager();
481         ServiceConsumer consumer = (ServiceConsumer) msg.getSource();
482         vqm.unregisterReceiverToQueue(consumer );
483     }
484
485     /**
486      * sent from the producer of the queue do the queue coordinator with a message to be enqueued
487      */

488     private void doHandleEnqueue(MantaBusMessage msg, ControlSignal control) {
489         VirtualQueuesManager vqm = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager();
490         ServiceProducer producer = (ServiceProducer) msg.getSource();
491         QueueMaster master = (QueueMaster) msg.getRecipient();
492         MantaBusMessage enqueuedMessage =(MantaBusMessage) control.getParams().get(ControlSignal.ENQUEUED_MESSAGE);
493         String JavaDoc controlId =String.valueOf(control.getControlId()) ;
494
495         vqm.handleEnqueueMessageToQueue(controlId,producer,master , enqueuedMessage ,msg.getMessageId());
496
497     }
498
499     /**
500      * @param msg
501      * @param control
502      */

503     private void doHandleQueueCopy(MantaBusMessage msg, ControlSignal control) {
504
505         VirtualQueuesManager vqm = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager();
506         ServiceConsumer consumer = (ServiceConsumer)msg.getSource() ;
507         vqm.sendQueueCopy(consumer );
508     }
509 }
510
Popular Tags