1 46 51 package org.mr.kernel; 52 53 54 import org.apache.commons.logging.Log; 55 import org.apache.commons.logging.LogFactory; 56 import org.mr.core.protocol.MantaBusMessage; 57 import org.mr.core.util.Stage; 58 import org.mr.core.util.StageHandler; 59 import org.mr.core.util.StageParams; 60 61 67 public class IncomingMessageManager implements StageHandler { 68 Stage stage = null; 69 private Log log = null; 70 71 72 public IncomingMessageManager(){ 73 StageParams params = new StageParams(); 74 75 params.setPersistent(false); 76 params.setBlocking(false); 77 78 79 params.setStageName("Incoming messages"); 80 params.setHandler(this); 81 stage = new Stage(params); 82 83 log = LogFactory.getLog("IncomingMessageManager"); 84 } 85 86 87 88 89 93 public void messageArrived(MantaBusMessage msg){ 94 stage.enqueue(msg); 95 if (log.isDebugEnabled()) { 96 log.debug("Received a new message. Message ID="+msg.getMessageId()+". Message enqueued."); 97 } 98 } 99 100 103 public boolean handle(Object event) { 104 IncomingMessageListenerRegister.notifyListeners((MantaBusMessage)event); 105 return true; 106 } 107 } 108 | Popular Tags |