KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > api > blocks > ScalableStage


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Amir Shevat.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 package org.mr.api.blocks;
47
48 import java.io.Serializable JavaDoc;
49 import java.util.ArrayList JavaDoc;
50 import java.util.List JavaDoc;
51
52 import javax.jms.JMSException JavaDoc;
53 import javax.jms.Message JavaDoc;
54 import javax.jms.MessageConsumer JavaDoc;
55 import javax.jms.MessageListener JavaDoc;
56 import javax.jms.MessageProducer JavaDoc;
57 import javax.jms.ObjectMessage JavaDoc;
58 import javax.jms.QueueConnection JavaDoc;
59 import javax.jms.QueueConnectionFactory JavaDoc;
60 import javax.jms.QueueSession JavaDoc;
61 import javax.jms.Session JavaDoc;
62
63 import org.apache.commons.logging.LogFactory;
64 import org.mr.api.jms.MantaQueueConnectionFactory;
65 import org.mr.core.util.Stage;
66 import org.mr.core.util.StageHandler;
67 import org.mr.core.util.StageParams;
68
69
70
71 /**
72  * This class serves as a point-to-ponit module decuple mediator, modules queue events while
73  * others handle these events, a dispatched even will be handled by on and only one of the registered handlers
74  * @author Amir Shevat
75  */

76 public class ScalableStage implements StageHandler, MessageListener JavaDoc{
77     /**
78      * if true this stage will use an underlying JMS queue to transfer messages
79      * else a im memory Stage will be used
80      */

81     private boolean distributed;
82     /**
83      * the underlying structure in case distributed is false
84      */

85     private Stage stage = null;
86     /**
87      * a set of JMS objects that will be used in case distributed is true
88      */

89     private QueueSession JavaDoc sendSession = null;
90     private QueueSession JavaDoc receiveSession = null;
91     private MessageConsumer JavaDoc consumer = null;
92     private MessageProducer JavaDoc producer = null;
93     /**
94      * the list of all the handlers of this stage
95      */

96     private List JavaDoc handlers = new ArrayList JavaDoc();
97     /**
98      * a sync object for getting lock on handlers
99      */

100     private Object JavaDoc handlerSync = new Object JavaDoc();
101     /**
102      * a pointer to the last active handler
103      */

104     private int currentHandler = 0;
105     /**
106      * the name of the stage and the name of the JMS queue if distributed is true
107      */

108     private String JavaDoc stageName;
109
110
111     /**
112      * instance of this object should be instance with the factory method getStage(..) in ScalableFactory
113      * @param name the name of the Stage
114      * @param distributed if true use JMS if false use in memory Stage
115      */

116     ScalableStage(String JavaDoc name,boolean distributed) {
117         this.distributed = distributed;
118         stageName = name;
119         if(!distributed){
120             StageParams params = new StageParams();
121             params.setBlocking(false);
122             params.setHandler(this);
123             params.setMaxNumberOfThreads(1);
124             params.setNumberOfStartThreads(1);
125             params.setStageName(name);
126             params.setPersistent(false);
127             this.stage = new Stage(params);
128         }else{
129
130             try {
131                 QueueConnectionFactory JavaDoc conFactory = (QueueConnectionFactory JavaDoc) new MantaQueueConnectionFactory();
132                 QueueConnection JavaDoc con = conFactory.createQueueConnection();
133                 con.start();
134                 sendSession = con.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
135                 receiveSession = con.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
136
137             } catch (JMSException JavaDoc e) {
138                 if(LogFactory.getLog("ScalableStage").isErrorEnabled()){
139                     LogFactory.getLog("ScalableStage").error("Problem while init distributed stage.",e);
140                 }
141             }
142         }
143
144     }
145
146     /**
147      * modules use this method to send events to the handlers of this stage
148      * @param event any Serializable object
149      */

150     public synchronized void queue(Serializable JavaDoc event){
151
152         if(distributed){
153             try {
154                 if(producer == null){
155                     producer = sendSession.createProducer(sendSession.createQueue(stageName));
156                 }
157                 ObjectMessage JavaDoc msg = sendSession.createObjectMessage();
158
159                 msg.setObject(event);
160                 producer.send( msg,
161                         javax.jms.DeliveryMode.NON_PERSISTENT,
162                         javax.jms.Message.DEFAULT_PRIORITY,
163                         100000);
164             } catch (JMSException JavaDoc e) {
165                 if(LogFactory.getLog("ScalableStage").isErrorEnabled()){
166                     LogFactory.getLog("ScalableStage").error("Problem while queueing distributed event.",e);
167                 }
168             }
169         }else{
170             stage.enqueue(event);
171         }
172
173     }
174
175     /**
176      * internal use, not a part of the API
177      */

178     public boolean handle(Object JavaDoc event) {
179         synchronized(handlerSync){
180             if(handlers.size() == 0){
181                 try {
182                     handlerSync.wait();
183                 } catch (InterruptedException JavaDoc e) {
184                     if(LogFactory.getLog("ScalableStage").isErrorEnabled()){
185                         LogFactory.getLog("ScalableStage").error("Problem while waiting for handlers.",e);
186                     }
187                 }
188             }
189             currentHandler++;
190             if(currentHandler == handlers.size()){
191                 currentHandler=0;
192             }
193             ((ScalableHandler)handlers.get(currentHandler)).handle(event);
194         }
195         return true;
196     }
197
198     /**
199      * returns the list of all the handlers of this stage
200      * @return a list of all the handlers of this stage
201      */

202     public List JavaDoc getHandlers() {
203         return handlers;
204     }
205
206     /**
207      * returns the size of the queue if the stage is none distrebuted
208      * if the Scalble stage is distrebuted the return value will be -1
209      * @return the size of a none distrebuted ScalableStage
210      */

211     public int size(){
212         if (!distributed) {
213             return stage.size();
214         } else {
215             return -1;
216         }
217
218     }
219
220
221     /**
222      * modules that want to handle events that are enqueud to this dispatcher use this method to do so
223      * @param handler a ScalableHandler object that will receive the event
224      */

225     public void addHandler(ScalableHandler handler) {
226         if(handler == null)
227             return;
228         synchronized(handlerSync){
229             if(distributed){
230                 if(handlers.size() == 0 ){
231                     try {
232                         if(consumer==null)
233                             consumer =receiveSession.createConsumer(receiveSession.createQueue(stageName));
234                         consumer.setMessageListener(this);
235
236                     } catch (JMSException JavaDoc e) {
237                         if(LogFactory.getLog("ScalableStage").isErrorEnabled()){
238                             LogFactory.getLog("ScalableStage").error("Problem creating the JMS objects.",e);
239                         }
240                     }//try
241
}//if
242
}//if
243
handlers.add(handler);
244             handlerSync.notifyAll();
245         }
246     }//setHandler
247

248     /**
249      * removes the handler for the stage form its handlers list
250      * @param handler the object that up untill now was a handler for events
251      */

252     public void removeHandler(ScalableHandler handler){
253         synchronized(handlerSync){
254             handlers.remove(handler);
255             if(handlers.size() == 0 && distributed) {
256                 try {
257                     consumer.setMessageListener(null);
258                 } catch (JMSException JavaDoc e) {
259                     if(LogFactory.getLog("ScalableStage").isErrorEnabled()){
260                         LogFactory.getLog("ScalableStage").error("Problem removing JMS listener.",e);
261                     }
262                 }
263             }
264         }
265     }
266
267     public void onMessage(Message JavaDoc msg) {
268         ObjectMessage JavaDoc o = (ObjectMessage JavaDoc)msg;
269         try {
270             handle(o.getObject());
271         } catch (JMSException JavaDoc e) {
272             if(LogFactory.getLog("ScalableStage").isErrorEnabled()){
273                 LogFactory.getLog("ScalableStage").error("Problem handeling new message.",e);
274             }
275         }
276
277     }
278 }
279
Popular Tags