1 46 package org.mr.core.groups; 47 48 import java.net.DatagramPacket ; 49 import java.net.InetAddress ; 50 import java.net.MulticastSocket ; 51 import java.net.NetworkInterface ; 52 53 import java.nio.ByteBuffer ; 54 import java.util.HashMap ; 55 import java.util.Enumeration ; 56 import java.io.IOException ; 57 58 import org.apache.commons.logging.Log; 59 import org.apache.commons.logging.LogFactory; 60 import org.mr.core.protocol.MantaBusMessage; 61 import org.mr.core.protocol.MessageTransformer; 62 import org.mr.core.util.byteable.ByteBufferPool; 63 64 70 public class MutlicastGroupManager { 71 public static final String GRUOP_SENDER_KEY = "%#$GRUOP_SENDER"; 73 public static final String GRUOP_SUBJECT_KEY = "%#$GRUOP_SUBJECT"; 74 HashMap multicastSockets = new HashMap (); 76 HashMap groupReactors = new HashMap (); 78 private ByteBufferPool bufferPool; 80 private Log log; 81 82 86 public MutlicastGroupManager(){ 87 bufferPool = new ByteBufferPool(100 , 20 , 10); 88 log = LogFactory.getLog("MulticastGroupManager"); 89 90 } 91 92 97 public synchronized void joinGroup(GroupKey groupKey , String localBindAddress) throws GroupsException { 98 if(multicastSockets.get(groupKey)!= null){ 99 return; 101 } 102 103 try { 104 InetAddress group = InetAddress.getByName(groupKey.getGroupIP()); 105 MulticastSocket s = new MulticastSocket (groupKey.getGroupPort()); 106 s.setLoopbackMode(false); 107 108 s.setInterface(InetAddress.getByName(localBindAddress)); 109 s.joinGroup(group); 110 multicastSockets.put(groupKey , s); 111 } catch (Exception e) { 112 throw new GroupsException(e.getMessage()); 113 } 114 115 116 } 117 118 125 public synchronized void registerListenerToSubject(GroupKey groupKey, String subject, GroupMessageListener listener) throws GroupsException{ 126 127 GroupReactor reactor = (GroupReactor) groupReactors.get(groupKey); 128 if(reactor == null){ 129 MulticastSocket socket = (MulticastSocket ) multicastSockets.get(groupKey); 130 if(socket == null){ 131 throw new GroupsException("Group not joined"); 132 } 133 reactor = new GroupReactor(socket,groupKey); 134 reactor.start(); 135 groupReactors.put(groupKey , reactor); 136 } 137 reactor.registerListenerToSubject(subject , listener); 138 } 139 140 147 public synchronized void sendMessageToGroup(GroupKey groupKey , String subject , MantaBusMessage msg) throws Exception { 148 ByteBuffer buffer = null; 149 try{ 150 msg.addHeader(GRUOP_SUBJECT_KEY ,subject ); 151 MulticastSocket socket =(MulticastSocket ) multicastSockets.get(groupKey); 152 if(socket == null){ 153 throw new GroupsException("Group not joined"); 154 } 155 buffer = MessageTransformer.toBuffer(msg ,getBufferPool()); 156 InetAddress groupAdd = InetAddress.getByName(groupKey.getGroupIP()); 157 DatagramPacket send = new DatagramPacket (buffer.array(), buffer.remaining(), 158 groupAdd, groupKey.getGroupPort()); 159 socket.send(send); 160 } catch (IOException e) { 161 recreateSocket(groupKey); 164 } catch(Throwable t){ 165 t.printStackTrace(); 166 } 167 finally{ 168 if(buffer != null) 169 getBufferPool().release(buffer); 170 } 171 172 } 173 174 175 178 public final ByteBufferPool getBufferPool() { 179 return bufferPool; 180 } 181 public static void main(String [] args) throws Exception { 182 183 String msg = "Hello"; 184 InetAddress group = InetAddress.getByName("228.5.6.7"); 185 MulticastSocket s = new MulticastSocket (6789); 186 s.joinGroup(group); 187 DatagramPacket hi = new DatagramPacket (msg.getBytes(), msg.length(), 188 group, 6789); 189 s.send(hi); 190 byte[] buf = new byte[1000]; 192 DatagramPacket recv = new DatagramPacket (buf, buf.length); 193 s.receive(recv); 194 195 ByteBuffer buff = ByteBuffer.wrap(recv.getData() ,0,recv.getLength() ); 196 String string = new String (buff.array()); 197 System.out.println(string); 198 s.leaveGroup(group); 200 201 } 202 203 private void recreateSocket(GroupKey groupKey) throws IOException { 204 String local = getValidLocalAddress(); 205 if (local == null) { 206 return; 208 } 209 210 GroupReactor reactor = (GroupReactor) groupReactors.get(groupKey); 211 MulticastSocket old = 212 (MulticastSocket ) multicastSockets.remove(groupKey); 213 this.log.info("Trying to recreate multicast socket (old = " + old + ", local = " + local + ")"); 214 MulticastSocket newSock = new MulticastSocket (groupKey.getGroupPort()); 215 newSock.setLoopbackMode(false); 216 newSock.setInterface(InetAddress.getByName(local)); 217 newSock.joinGroup(InetAddress.getByName(groupKey.getGroupIP())); 218 multicastSockets.put(groupKey, newSock); 219 reactor.setSocket(newSock); 220 old.close(); 221 this.log.info("Recreated multicast socket on interface " + local); 222 } 223 224 private static synchronized String getValidLocalAddress() { 225 String validAddress = null; 226 227 try { 228 Enumeration ifs = NetworkInterface.getNetworkInterfaces(); 229 while (ifs.hasMoreElements()) { 230 NetworkInterface iface = (NetworkInterface ) 231 ifs.nextElement(); 232 Enumeration ips = iface.getInetAddresses(); 233 while (ips.hasMoreElements()) { 234 InetAddress ip = (InetAddress ) ips.nextElement(); 235 if (!ip.getHostAddress().equals("127.0.0.1")) { 236 validAddress =ip.getHostAddress(); 237 return validAddress; 238 } 239 } 240 } 241 } catch (Throwable t) {} 242 243 return validAddress; 244 } 245 246 } 247 | Popular Tags |