1 46 package org.mr.kernel.services.queues; 47 48 import java.util.Collections ; 49 import java.util.HashMap ; 50 import java.util.HashSet ; 51 import java.util.Iterator ; 52 import java.util.Map ; 53 import java.util.Set ; 54 55 import org.apache.commons.logging.Log; 56 import org.apache.commons.logging.LogFactory; 57 import org.mr.MantaAgent; 58 import org.mr.MantaAgentConstants; 59 import org.mr.MantaException; 60 import org.mr.core.protocol.DeadEndRecipient; 61 import org.mr.core.protocol.MantaBusMessage; 62 import org.mr.core.protocol.MantaBusMessageConsts; 63 import org.mr.core.util.SystemTime; 64 import org.mr.core.util.byteable.ByteableList; 65 import org.mr.kernel.services.MantaService; 66 import org.mr.kernel.services.ServiceConsumer; 67 import org.mr.kernel.services.ServiceProducer; 68 import org.mr.kernel.world.WorldModeler; 69 70 71 81 public class VirtualQueuesManager { 82 83 private Map virtualQueueServicesMap ; 84 public Log log = null; 85 86 public static final String ENQUEUE_ACK_HEADER ="enqueue_ack"; 87 public static final byte ENQUEUE_OK = 1; 88 public static final byte NOT_MASTER = 2; 89 public static final byte ENQUEUE_FAIL = 3; 90 private static long enqueueWaitforCoordinator = -1; 91 92 public static final String ORIGINAL_MESSAGE_PRUDUCER = "msg_org"; 94 95 public static final String ENQUEUE_TIME = "enq_time"; 96 97 public VirtualQueuesManager(){ 98 virtualQueueServicesMap = Collections.synchronizedMap(new HashMap ()); 99 enqueueWaitforCoordinator = MantaAgent.getInstance().getSingletonRepository() 100 .getConfigManager().getLongProperty("jms.enqueueWaitForCoordinator",-1); 101 log=LogFactory.getLog("VirtualQueuesManager"); 102 103 } 105 106 107 112 public synchronized AbstractQueueService getQueueService(String queueName){ 113 AbstractQueueService result =(AbstractQueueService) virtualQueueServicesMap.get(queueName); 114 if(result == null){ 115 MantaAgent manta = MantaAgent.getInstance(); 116 result = (AbstractQueueService) manta.getSingletonRepository().getWorldModeler().getService(manta.getDomainName(),queueName,MantaService.SERVICE_TYPE_QUEUE ); 117 if(result !=null){ 118 virtualQueueServicesMap.put(queueName, result); 119 if(log.isDebugEnabled()){ 120 log.debug("Added queue "+queueName); 121 } 122 } 123 124 } 125 return result; 126 } 127 128 public Set getQueueServices(){ 129 return new HashSet (virtualQueueServicesMap.values()); 130 } 131 132 138 public boolean registerReceiverToQueue( ServiceConsumer consumer ,long numberOfReceive ){ 139 AbstractQueueService service = getQueueService(consumer.getServiceName()); 140 if(service == null){ 141 if(log.isErrorEnabled()){ 142 log.error("Got receive request on queue that this layer is not a coordinator of - queue = "+consumer.getServiceName()); 143 } 144 return false; 145 } 146 service.active(); 147 service.registerReceiverToQueue(consumer , numberOfReceive ); 148 return true; 149 } 151 157 public void unregisterReceiverToQueue( ServiceConsumer consumer ){ 158 AbstractQueueService service = getQueueService(consumer.getServiceName()); 159 if(service == null){ 160 if(log.isErrorEnabled()){ 161 log.error("Got un-register request on queue that this layer is not a coordinator of - queue = "+consumer.getServiceName()); 162 } 163 return ; 164 } 165 service.active(); 166 service.unregisterReceiverToQueue(consumer ); 167 } 169 170 public void closeQueue(String queueName) throws MantaException{ 171 AbstractQueueService queue =(AbstractQueueService) virtualQueueServicesMap.get(queueName); 172 if(queue != null){ 173 queue.close(); 174 virtualQueueServicesMap.remove(queueName); 175 WorldModeler wm=MantaAgent.getInstance().getSingletonRepository().getWorldModeler(); 176 wm.removeService(wm.getDefaultDomainName(),queueName); 177 if(log.isDebugEnabled()){ 178 log.debug("Deleted queue "+queueName); 179 } 180 } 181 } 182 183 188 public void sendQueueCopy(ServiceConsumer consumer) { 189 AbstractQueueService service = getQueueService(consumer.getServiceName()); 190 ByteableList underline; 191 if(service == null){ 192 if(log.isErrorEnabled()){ 193 log.error("Got sendQueueCopy request on queue that this layer is not a coordinator of - queue = "+consumer.getServiceName()); 194 } 195 196 underline = new ByteableList(); 197 QueueReceiver receiver = new QueueReceiver(consumer , 0); 198 199 MantaBusMessage msg = MantaBusMessage.getInstance(); 200 msg.setPayload(underline); 201 msg.setPriority(MantaAgentConstants.HIGH); 202 msg.setDeliveryMode(MantaAgentConstants.NON_PERSISTENT); 203 msg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT); 204 receiver.receive(msg); 205 }else{ 206 service.active(); 207 service.sendQueueCopy(consumer); 208 } 209 210 211 212 } 214 215 219 public int getNumberOfQueueServices(){ 220 return virtualQueueServicesMap.size(); 221 } 222 223 228 public boolean hasQueueService(String queueName){ 229 return virtualQueueServicesMap.containsKey(queueName); 230 } 231 232 236 public void addNewQueueServiceToMap( AbstractQueueService service){ 237 virtualQueueServicesMap.put(service.getServiceName(),service); 239 } 241 247 public void waitForQueueMaster(String queueName, long timeToWait) throws InterruptedException { 248 getQueueService(queueName).waitForQueueMaster(timeToWait); 249 } 250 251 256 public void setQueueMaster(String queueName,QueueMaster queueMaster){ 257 getQueueService(queueName).setQueueMaster(queueMaster); 258 } 259 260 264 public QueueMaster getQueueMaster(String queueName){ 265 return getQueueService(queueName).getQueueMaster(); 266 } 267 268 269 270 271 277 public void handleEnqueueMessageToQueue(String controlId ,ServiceProducer producer, QueueMaster master, MantaBusMessage enqueuedMessage, String responseToId) { 278 boolean ok = false; 279 boolean deley = false; 280 try{ 281 String queueName = producer.getServiceName(); 282 AbstractQueueService queue = getQueueService(queueName); 283 if(queue == null ||!queue.amIQueueMaster() ){ 284 sendEnqueueResp(controlId,producer,master ,enqueuedMessage,NOT_MASTER ,responseToId,deley ); 286 return; 287 } 288 enqueuedMessage.addHeader(ORIGINAL_MESSAGE_PRUDUCER,((ServiceProducer)enqueuedMessage.getSource()).getId() ); 290 enqueuedMessage.setSource(queue.getQueueMaster()); 291 byte deliveryMode =enqueuedMessage.getDeliveryMode(); 292 enqueuedMessage.addHeader(ENQUEUE_TIME,SystemTime.currentTimeMillis()+""); 294 boolean persistent = deliveryMode == MantaAgentConstants.PERSISTENT; 295 if (queue.getPersistentMode() == MantaAgentConstants.PERSISTENT) 297 persistent = true; 298 queue.active(); 300 301 302 303 ok = !queue.isOverflow(); 304 if(ok){ 305 queue.enqueue(enqueuedMessage, persistent); 306 if (log.isDebugEnabled()) { 307 log.debug("Message is added to queue. Message ID="+enqueuedMessage.getMessageId()+", Queue="+queue.getServiceName()); 308 } 309 }else{ 310 String overFlawMsg = "Queue overflow. Queue name="+queue.getServiceName()+", Queue size="+queue.getUnsentCount(); 311 if(queue.getOverflowStrategy() == 312 AbstractQueueService.THROW_EXCEPTION_STRATERGY 313 || queue.getOverflowStrategy() == 314 AbstractQueueService. 315 RETURN_WITHOUT_ENQUEUE_STRATERGY) { 316 if (log.isWarnEnabled()) { 317 log.warn(overFlawMsg+". Message droped."); 318 } 319 } 320 else if(queue.getOverflowStrategy() == 321 AbstractQueueService.THROTTLE_STRATERGY) { 322 queue.enqueue(enqueuedMessage, persistent); 323 if (log.isDebugEnabled()) { 324 log.debug("Message is added to queue. Message ID="+enqueuedMessage.getMessageId()+", Queue="+queue.getServiceName()); 325 log.debug(overFlawMsg+". Initiating throttling strategy."); 326 } 327 328 deley= true; 329 ok = true; 330 } 331 332 333 } 334 335 336 }catch(Throwable t){ 337 if(log.isErrorEnabled()) 338 log.error("Could not enqueue message. Message ID="+enqueuedMessage.getMessageId()+". ",t); 339 } 340 if(ok){ 341 sendEnqueueResp( controlId , producer,master, enqueuedMessage ,ENQUEUE_OK,responseToId ,deley); 342 }else{ 343 sendEnqueueResp( controlId , producer,master, enqueuedMessage ,ENQUEUE_FAIL,responseToId ,deley); 344 } 345 346 347 } 348 349 350 354 private void sendEnqueueResp(String controlId ,ServiceProducer producer,QueueMaster master, MantaBusMessage enqueuedMessage ,byte respCode, String responseToId, boolean delay) { 355 if(log.isDebugEnabled()){ 356 log.debug("Got message enqueue and responded with code "+respCode+" message was "+enqueuedMessage+"."); 357 } 358 MantaBusMessage controlMsg = MantaBusMessage.getInstance(); 359 360 controlMsg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT); 361 362 363 controlMsg.setDeliveryMode(MantaAgentConstants.NON_PERSISTENT); 364 controlMsg.setPriority(MantaAgentConstants.HIGH); 365 controlMsg.setValidUntil( MantaAgentConstants.CONTROL_MESSAGES_TTL+SystemTime.gmtCurrentTimeMillis()); 366 controlMsg.setSource(master); 367 controlMsg.setRecipient(DeadEndRecipient.createDeadEndRecipient(producer.getAgentName(),producer.getDomainName()) ); 368 controlMsg.addHeader(MantaBusMessageConsts.HEADER_NAME_ACK_RESPONSE_REFERENCE , responseToId); 369 controlMsg.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION, producer.getServiceName()+controlId); 370 controlMsg.addHeader(MantaBusMessageConsts.ENQUEUE_STATUS,""+respCode); 372 if(!delay){ 373 MantaAgent.getInstance().send(controlMsg,master ); 374 }else{ 375 MantaAgent.getInstance().getSingletonRepository() 376 .getDelayedMessageSender().send(controlMsg, QueueService.throttleDelay); 377 } 378 379 } 380 381 public QueueSubscriberManager getSubscriberManager(String queueName) { 382 return getQueueService(queueName).getSubscriberManager(); 383 } 384 388 public void active(String serviceName) { 389 getQueueService(serviceName).active(); 390 391 } 392 393 public boolean amIQueueMaster(String serviceName) { 394 return getQueueService(serviceName).amIQueueMaster(); 395 } 396 397 398 public boolean isTempQueue(String serviceName){ 399 return getQueueService(serviceName).isTempQueue(); 400 } 401 402 403 404 405 public static long getEnqueueWaitforCoordinator() { 406 return enqueueWaitforCoordinator; 407 } 408 } 410 | Popular Tags |