1 46 package org.mr.api.blocks; 47 48 import java.io.Serializable ; 49 import java.util.ArrayList ; 50 import java.util.Iterator ; 51 import java.util.List ; 52 53 import javax.jms.JMSException ; 54 import javax.jms.Message ; 55 import javax.jms.MessageConsumer ; 56 import javax.jms.MessageListener ; 57 import javax.jms.MessageProducer ; 58 import javax.jms.ObjectMessage ; 59 import javax.jms.Session ; 60 import javax.jms.TopicConnection ; 61 import javax.jms.TopicConnectionFactory ; 62 import javax.jms.TopicSession ; 63 64 import org.apache.commons.logging.LogFactory; 65 import org.mr.api.jms.MantaTopicConnectionFactory; 66 import org.mr.core.util.Stage; 67 import org.mr.core.util.StageHandler; 68 import org.mr.core.util.StageParams; 69 70 75 public class ScalableDispatcher implements StageHandler, MessageListener { 76 80 private boolean distributed; 81 84 private Stage stage = null; 85 88 private TopicSession sendSession = null; 89 private TopicSession receiveSession = null; 90 private MessageConsumer consumer = null; 91 private MessageProducer producer = null; 92 95 private List handlers = new ArrayList (); 96 99 private Object handlerSync = new Object (); 100 103 private String dispatcherName; 104 105 110 ScalableDispatcher(String name,boolean distributed) { 111 this.distributed = distributed; 112 dispatcherName = name; 113 if(!distributed){ 114 StageParams params = new StageParams(); 115 params.setBlocking(false); 116 params.setHandler(this); 117 params.setMaxNumberOfThreads(1); 118 params.setNumberOfStartThreads(1); 119 params.setStageName(name); 120 params.setPersistent(false); 121 this.stage = new Stage(params); 122 }else{ 123 124 try { 125 TopicConnectionFactory conFactory = (TopicConnectionFactory ) new MantaTopicConnectionFactory(); 126 TopicConnection con = conFactory.createTopicConnection(); 127 con.start(); 128 sendSession = con.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 129 receiveSession = con.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 130 131 } catch (JMSException e) { 132 if(LogFactory.getLog("ScalableDispatcher").isErrorEnabled()){ 133 LogFactory.getLog("ScalableDispatcher").error("Problem while init distributed Dispatcher.",e); 134 } 135 } 136 } 137 138 } 140 144 public synchronized void dispatch(Serializable event){ 145 146 if(distributed){ 147 try { 148 if(producer == null){ 149 producer = sendSession.createProducer(sendSession.createTopic(dispatcherName)); 150 } 151 ObjectMessage msg = sendSession.createObjectMessage(); 152 153 msg.setObject(event); 154 producer.send( msg, 155 javax.jms.DeliveryMode.NON_PERSISTENT, 156 javax.jms.Message.DEFAULT_PRIORITY, 157 100000); 158 } catch (JMSException e) { 159 if(LogFactory.getLog("ScalableDispatcher").isErrorEnabled()){ 160 LogFactory.getLog("ScalableDispatcher").error("Problem while dispatching distributed event.",e); 161 } 162 } 163 }else{ 164 stage.enqueue(event); 165 } 166 167 } 168 169 172 public boolean handle(Object event) { 173 synchronized(handlerSync){ 174 if(handlers.size() == 0){ 175 try { 176 handlerSync.wait(); 177 } catch (InterruptedException e) { 178 if(LogFactory.getLog("ScalableDispatcher").isErrorEnabled()){ 179 LogFactory.getLog("ScalableDispatcher").error("Problem while waiting for handlers.",e); 180 } 181 } 182 } 183 Iterator handlersIter = handlers.iterator(); 184 while(handlersIter.hasNext()){ 185 ((ScalableHandler)handlersIter.next()).handle(event); 186 } 187 188 } 189 return true; 190 } 191 192 196 public List getHandlers() { 197 return handlers; 198 } 199 200 201 202 206 public void addHandler(ScalableHandler handler) { 207 if(handler == null) 208 return; 209 synchronized(handlerSync){ 210 if(distributed){ 211 if(handlers.size() == 0 ){ 212 try { 213 if(consumer==null) 214 consumer =receiveSession.createConsumer(receiveSession.createTopic(dispatcherName)); 215 consumer.setMessageListener(this); 216 217 } catch (JMSException e) { 218 if(LogFactory.getLog("ScalableDispatcher").isErrorEnabled()){ 219 LogFactory.getLog("ScalableDispatcher").error("Problem creating the JMS objects.",e); 220 } 221 } } } handlers.add(handler); 225 handlerSync.notifyAll(); 226 } 227 } 229 230 234 public void removeHandler(StageHandler handler){ 235 synchronized(handlerSync){ 236 handlers.remove(handler); 237 if(handlers.size() == 0) { 238 try { 239 consumer.setMessageListener(null); 240 } catch (JMSException e) { 241 if(LogFactory.getLog("ScalableDispatcher").isErrorEnabled()){ 242 LogFactory.getLog("ScalableDispatcher").error("Problem removing JMS listener.",e); 243 } 244 } 245 } 246 } 247 } 248 249 252 public void onMessage(Message msg) { 253 ObjectMessage o = (ObjectMessage )msg; 254 try { 255 handle(o.getObject()); 256 } catch (JMSException e) { 257 if(LogFactory.getLog("ScalableDispatcher").isErrorEnabled()){ 258 LogFactory.getLog("ScalableDispatcher").error("Problem handeling new message.",e); 259 } 260 } 261 262 } 263 } 264 | Popular Tags |