1 46 package org.mr.api.blocks; 47 48 import java.io.Serializable ; 49 import java.util.ArrayList ; 50 import java.util.List ; 51 52 import javax.jms.JMSException ; 53 import javax.jms.Message ; 54 import javax.jms.MessageConsumer ; 55 import javax.jms.MessageListener ; 56 import javax.jms.MessageProducer ; 57 import javax.jms.ObjectMessage ; 58 import javax.jms.QueueConnection ; 59 import javax.jms.QueueConnectionFactory ; 60 import javax.jms.QueueSession ; 61 import javax.jms.Session ; 62 63 import org.apache.commons.logging.LogFactory; 64 import org.mr.api.jms.MantaQueueConnectionFactory; 65 import org.mr.core.util.Stage; 66 import org.mr.core.util.StageHandler; 67 import org.mr.core.util.StageParams; 68 69 70 71 76 public class ScalableStage implements StageHandler, MessageListener { 77 81 private boolean distributed; 82 85 private Stage stage = null; 86 89 private QueueSession sendSession = null; 90 private QueueSession receiveSession = null; 91 private MessageConsumer consumer = null; 92 private MessageProducer producer = null; 93 96 private List handlers = new ArrayList (); 97 100 private Object handlerSync = new Object (); 101 104 private int currentHandler = 0; 105 108 private String stageName; 109 110 111 116 ScalableStage(String name,boolean distributed) { 117 this.distributed = distributed; 118 stageName = name; 119 if(!distributed){ 120 StageParams params = new StageParams(); 121 params.setBlocking(false); 122 params.setHandler(this); 123 params.setMaxNumberOfThreads(1); 124 params.setNumberOfStartThreads(1); 125 params.setStageName(name); 126 params.setPersistent(false); 127 this.stage = new Stage(params); 128 }else{ 129 130 try { 131 QueueConnectionFactory conFactory = (QueueConnectionFactory ) new MantaQueueConnectionFactory(); 132 QueueConnection con = conFactory.createQueueConnection(); 133 con.start(); 134 sendSession = con.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 135 receiveSession = con.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 136 137 } catch (JMSException e) { 138 if(LogFactory.getLog("ScalableStage").isErrorEnabled()){ 139 LogFactory.getLog("ScalableStage").error("Problem while init distributed stage.",e); 140 } 141 } 142 } 143 144 } 145 146 150 public synchronized void queue(Serializable event){ 151 152 if(distributed){ 153 try { 154 if(producer == null){ 155 producer = sendSession.createProducer(sendSession.createQueue(stageName)); 156 } 157 ObjectMessage msg = sendSession.createObjectMessage(); 158 159 msg.setObject(event); 160 producer.send( msg, 161 javax.jms.DeliveryMode.NON_PERSISTENT, 162 javax.jms.Message.DEFAULT_PRIORITY, 163 100000); 164 } catch (JMSException e) { 165 if(LogFactory.getLog("ScalableStage").isErrorEnabled()){ 166 LogFactory.getLog("ScalableStage").error("Problem while queueing distributed event.",e); 167 } 168 } 169 }else{ 170 stage.enqueue(event); 171 } 172 173 } 174 175 178 public boolean handle(Object event) { 179 synchronized(handlerSync){ 180 if(handlers.size() == 0){ 181 try { 182 handlerSync.wait(); 183 } catch (InterruptedException e) { 184 if(LogFactory.getLog("ScalableStage").isErrorEnabled()){ 185 LogFactory.getLog("ScalableStage").error("Problem while waiting for handlers.",e); 186 } 187 } 188 } 189 currentHandler++; 190 if(currentHandler == handlers.size()){ 191 currentHandler=0; 192 } 193 ((ScalableHandler)handlers.get(currentHandler)).handle(event); 194 } 195 return true; 196 } 197 198 202 public List getHandlers() { 203 return handlers; 204 } 205 206 211 public int size(){ 212 if (!distributed) { 213 return stage.size(); 214 } else { 215 return -1; 216 } 217 218 } 219 220 221 225 public void addHandler(ScalableHandler handler) { 226 if(handler == null) 227 return; 228 synchronized(handlerSync){ 229 if(distributed){ 230 if(handlers.size() == 0 ){ 231 try { 232 if(consumer==null) 233 consumer =receiveSession.createConsumer(receiveSession.createQueue(stageName)); 234 consumer.setMessageListener(this); 235 236 } catch (JMSException e) { 237 if(LogFactory.getLog("ScalableStage").isErrorEnabled()){ 238 LogFactory.getLog("ScalableStage").error("Problem creating the JMS objects.",e); 239 } 240 } } } handlers.add(handler); 244 handlerSync.notifyAll(); 245 } 246 } 248 252 public void removeHandler(ScalableHandler handler){ 253 synchronized(handlerSync){ 254 handlers.remove(handler); 255 if(handlers.size() == 0 && distributed) { 256 try { 257 consumer.setMessageListener(null); 258 } catch (JMSException e) { 259 if(LogFactory.getLog("ScalableStage").isErrorEnabled()){ 260 LogFactory.getLog("ScalableStage").error("Problem removing JMS listener.",e); 261 } 262 } 263 } 264 } 265 } 266 267 public void onMessage(Message msg) { 268 ObjectMessage o = (ObjectMessage )msg; 269 try { 270 handle(o.getObject()); 271 } catch (JMSException e) { 272 if(LogFactory.getLog("ScalableStage").isErrorEnabled()){ 273 LogFactory.getLog("ScalableStage").error("Problem handeling new message.",e); 274 } 275 } 276 277 } 278 } 279 | Popular Tags |