1 46 package org.mr.core.groups; 47 48 import java.io.IOException ; 49 import java.net.DatagramPacket ; 50 import java.net.MulticastSocket ; 51 import java.nio.ByteBuffer ; 52 import java.util.ArrayList ; 53 import java.util.HashMap ; 54 import java.util.List ; 55 56 import org.apache.commons.logging.Log; 57 import org.apache.commons.logging.LogFactory; 58 import org.mr.MantaAgent; 59 import org.mr.core.protocol.MantaBusMessage; 60 import org.mr.core.protocol.MessageTransformer; 61 62 67 public class GroupReactor extends Thread { 68 HashMap subjectListenerMap = new HashMap (); 70 MulticastSocket socket ; 72 73 boolean go = true; 74 boolean logEveryMessage = false; 75 byte[] buf = new byte[100000]; 76 GroupKey key; 78 79 private Log log; 80 81 GroupReactor(MulticastSocket s, GroupKey key){ 82 log=LogFactory.getLog("GroupReactor"); 83 this.socket = s; 84 this.key = key; 85 this.setName("GroupReactor"); 86 logEveryMessage = MantaAgent.getInstance().getSingletonRepository().getConfigManager().getBooleanProperty("multicast.log", false); 87 88 } 89 90 public void setSocket(MulticastSocket s) { 91 this.socket = s; 92 } 93 94 public void run(){ 95 while(go){ 96 DatagramPacket recv = new DatagramPacket (buf, buf.length); 97 try { 98 socket.receive(recv); 99 ByteBuffer buff = ByteBuffer.wrap(recv.getData() ,0,recv.getLength() ); 100 101 MantaBusMessage message = MessageTransformer.fromBuffer(buff); 102 if(log.isDebugEnabled() && logEveryMessage){ 103 log.debug( "Got group message "+message); 104 } 105 String sender = null; 106 String subject = null; 107 List listeners =null; 108 if(message != null && (subject=(String )message.getHeader(MutlicastGroupManager.GRUOP_SUBJECT_KEY )) != null) { 109 listeners=(List )subjectListenerMap.get(subject); 110 if(listeners == null) { 111 if(log.isDebugEnabled()){ 112 log.debug( "Received a messages tagged with subject=" + subject + ", but there is no registration for that subject. Will drop message."); 113 } 114 continue; 115 } 116 sender = message.getHeader(MutlicastGroupManager.GRUOP_SENDER_KEY); 117 for (int i = 0; i < listeners.size(); i++) { 118 ((GroupMessageListener)listeners.get(i)).onMessage(key, subject, message); 119 } 120 } 121 122 123 124 } catch (IOException e) { 125 log.error("Problem while getting group message",e); 126 } 127 128 } 129 } 131 136 public synchronized void registerListenerToSubject(String subject, GroupMessageListener mlistener) { 137 List listeners =(List ) subjectListenerMap.get(subject); 138 if(listeners == null){ 139 listeners = new ArrayList (); 140 subjectListenerMap.put(subject ,listeners ); 141 } 142 listeners.add(mlistener); 143 144 } 145 146 151 public synchronized void unregisterListenerToSubject(String subject, GroupMessageListener mlistener) { 152 List listeners =(List ) subjectListenerMap.get(subject); 153 if(listeners != null) 154 listeners.remove(mlistener); 155 156 157 } 158 159 160 } 161 | Popular Tags |