KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > indexing > WBHandler


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Uri Schneider.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46
47 package org.mr.indexing;
48
49 import java.util.Iterator JavaDoc;
50 import java.util.List JavaDoc;
51 import java.util.Set JavaDoc;
52
53 import org.apache.commons.logging.Log;
54 import org.apache.commons.logging.LogFactory;
55 import org.mr.MantaAgent;
56 import org.mr.core.net.TransportInfo;
57 import org.mr.core.protocol.MantaBusMessage;
58 import org.mr.indexing.messages.AgentTransportsChanged;
59 import org.mr.indexing.messages.MWBMessageConsts;
60 import org.mr.indexing.messages.ServiceParticipationChanged;
61 import org.mr.kernel.services.MantaService;
62 import org.mr.kernel.services.ServiceActor;
63 import org.mr.kernel.services.ServiceConsumer;
64 import org.mr.kernel.services.ServiceProducer;
65 import org.mr.kernel.services.queues.QueueMaster;
66 import org.mr.kernel.services.queues.VirtualQueuesManager;
67 import org.mr.kernel.services.topics.VirtualTopicManager;
68 import org.mr.kernel.world.WorldModeler;
69
70 /**
71  * IRSHandler.java
72  *
73  *
74  * Created: Thu Jan 06 17:07:14 2005
75  *
76  * @author Uri Schneider
77  * @version 1.0
78  */

79 public class WBHandler {
80     private Log log;
81
82     public WBHandler() {
83         this.log = LogFactory.getLog("WBHandler");
84     } // IRSHandler constructor
85

86     /**
87      * Handle a message the IRS has sent.
88      */

89     public void messageArrived(MantaBusMessage message, String JavaDoc messageType) {
90         if (this.log.isDebugEnabled()) {
91             this.log.debug("Handling " + message.getPayload().toString());
92         }
93
94         if (messageType.equals(AgentTransportsChanged.getTypeString())) {
95             AgentTransportsChanged atc =
96                 (AgentTransportsChanged) message.getPayload();
97             agentTransportsChanged(atc);
98         } else if (messageType.equals(ServiceParticipationChanged.getTypeString())) {
99             ServiceParticipationChanged spc =
100                 (ServiceParticipationChanged) message.getPayload();
101             serviceParticipationChanged(spc);
102         } else {
103             if (this.log.isWarnEnabled()) {
104                 this.log.warn("Ignoring IRS message with unknown type: " +
105                               messageType);
106             }
107         }
108     }
109
110     private void agentTransportsChanged(AgentTransportsChanged atc) {
111         MantaAgent agent = MantaAgent.getInstance();
112         WorldModeler world = agent.getSingletonRepository().getWorldModeler();
113         String JavaDoc agentName = atc.getAgentName();
114         String JavaDoc domainName = atc.getDomainName();
115         List JavaDoc added = atc.getAddedTransports();
116         List JavaDoc removed = atc.getRemovedTransports();
117         boolean saveWorld = false;
118         boolean actionPerformed = false;
119         Iterator JavaDoc i;
120
121         i = added.iterator();
122         while (i.hasNext()) {
123             TransportInfo info = (TransportInfo) i.next();
124             actionPerformed = world.addTransportInfoToAgent(domainName,
125                                                             agentName, info);
126             saveWorld = saveWorld || actionPerformed;
127             if (actionPerformed && this.log.isInfoEnabled()) {
128                 this.log.info("Added transport for " + agentName + ": " + info);
129             }
130         }
131         i = removed.iterator();
132         while (i.hasNext()) {
133             TransportInfo info = (TransportInfo) i.next();
134             actionPerformed =
135                 world.removeTransportInfoFromAgent(domainName, agentName, info);
136             saveWorld = saveWorld || actionPerformed;
137             if (actionPerformed && this.log.isInfoEnabled()) {
138                 this.log.info("Removed transport for " + agentName + ": " +
139                               info);
140             }
141         }
142
143         if (saveWorld) {
144             /*
145             try {
146                 //world.save();
147             } catch (IOException e) {
148                 if (this.log.isErrorEnabled()) {
149                     this.log.error("Error saving world map: " + e.getMessage());
150                 }
151             }
152             */

153         }
154     }
155
156     /*
157      * pretty long, but straightforward:
158      *
159      * if an actor is added, first add the new actor's transports,
160      * then add the actor itself.
161      *
162      * if an actor is removed, just remove it.
163      *
164      * there is a special case for the removal of a durable topic
165      * subscriber.
166      */

167     private void serviceParticipationChanged(ServiceParticipationChanged spc) {
168         MantaAgent agent = MantaAgent.getInstance();
169         WorldModeler world = agent.getSingletonRepository().getWorldModeler();
170         String JavaDoc domainName = spc.getDomainName();
171         String JavaDoc serviceName = spc.getServiceName();
172         String JavaDoc serviceType = spc.getServiceType();
173 // boolean isAdd = spc.isAdd();
174
byte operation = spc.getOperation();
175         MantaService service = null;
176
177         if (serviceType.equals(ServiceParticipationChanged.QUEUE)) {
178             service = agent.getService(serviceName,MantaService.SERVICE_TYPE_QUEUE );
179                 
180         } else if (serviceType.equals(ServiceParticipationChanged.TOPIC)) {
181             service = agent.getService(serviceName,MantaService.SERVICE_TYPE_TOPIC);
182         }
183
184         if (service != null) {
185             Set JavaDoc actors = spc.getActors();
186             Iterator JavaDoc i = actors.iterator();
187
188             while (i.hasNext()) {
189                 ServiceActor actor = (ServiceActor) i.next();
190                 if (operation == MWBMessageConsts.OP_ADD) {
191                     // update the remote agent's transport information
192
String JavaDoc agentName = actor.getAgentName();
193                     List JavaDoc transports = spc.getTransports(actor);
194                     Iterator JavaDoc ii = transports.iterator();
195                     boolean actionPerformed;
196                     boolean saveWorld = false;
197                     while (ii.hasNext()) {
198                         TransportInfo info = (TransportInfo) ii.next();
199                         actionPerformed =
200                             world.addTransportInfoToAgent(domainName, agentName,
201                                                           info);
202                         saveWorld = saveWorld || actionPerformed;
203                         if (actionPerformed && this.log.isInfoEnabled()) {
204                             this.log.info("Added transport for " + agentName +
205                                           ": " + info);
206                         }
207                     }
208                     if (saveWorld) {
209                         /*
210                         try {
211                             //world.save();
212                         } catch (IOException e) {
213                             if (this.log.isErrorEnabled()) {
214                                 this.log.error("Error saving world map: " +
215                                                e.getMessage());
216                             }
217                         }
218                         */

219                     }
220
221                     // add the actor to the service
222
if (this.log.isInfoEnabled()) {
223                         this.log.info("Adding actor " + actor);
224                     }
225                     
226                     if((actor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC)&&
227                             (actor.getType()== ServiceActor.CONSUMER )){
228                             // topic consumers need special attention
229
MantaAgent.getInstance().getSingletonRepository()
230                                 .getVirtualTopicManager().addConsumer((ServiceConsumer) actor);
231                     }else if (actor.getType() == ServiceActor.CONSUMER) {
232                         service.addConsumer((ServiceConsumer) actor);
233                     } else if (actor.getType() == ServiceConsumer.PRODUCER) {
234                         service.addProducer((ServiceProducer) actor);
235                     } else {
236                         QueueMaster master = (QueueMaster) actor;
237                         master.setValidUntil(Long.MAX_VALUE);
238                         VirtualQueuesManager vqm = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager();
239                         vqm.setQueueMaster(serviceName, master);
240                     }
241                 } else if (operation == MWBMessageConsts.OP_REMOVE) {
242                     // remove the actor from the service
243
if (this.log.isInfoEnabled()) {
244                         this.log.info("Removing actor " + actor);
245                     }
246                     if (actor.getType() == ServiceActor.CONSUMER) {
247                         if(actor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC){
248                             MantaAgent.getInstance().getSingletonRepository()
249                                 .getVirtualTopicManager().removeConsumer((ServiceConsumer) actor);
250                         }else{
251                             service.removeConsumer((ServiceConsumer) actor);
252                         }
253                     } else if (actor.getType() == ServiceConsumer.PRODUCER) {
254                         service.removeProducer((ServiceProducer) actor);
255                     } else {
256                         VirtualQueuesManager vqm = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager();
257                         vqm.setQueueMaster(serviceName,null);
258                     }
259                 } else { // if (operation == MWBMessageConsts.OP_REMOVE_DURABLE)
260
if (this.log.isInfoEnabled()) {
261                         this.log.info("Removing durable actor " + actor);
262                     }
263                     if (actor.getType() == ServiceActor.CONSUMER) {
264                         VirtualTopicManager topicManager =MantaAgent.getInstance().getSingletonRepository().getVirtualTopicManager();
265                         topicManager.
266                             removeDurableConsumer(serviceName,(ServiceConsumer) actor);
267                     }
268                 }
269             }
270         }
271     }
272 } // IRSHandler
273
Popular Tags