KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > core > groups > MutlicastGroupManager


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Amir Shevat.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 package org.mr.core.groups;
47
48 import java.net.DatagramPacket JavaDoc;
49 import java.net.InetAddress JavaDoc;
50 import java.net.MulticastSocket JavaDoc;
51 import java.net.NetworkInterface JavaDoc;
52
53 import java.nio.ByteBuffer JavaDoc;
54 import java.util.HashMap JavaDoc;
55 import java.util.Enumeration JavaDoc;
56 import java.io.IOException JavaDoc;
57
58 import org.apache.commons.logging.Log;
59 import org.apache.commons.logging.LogFactory;
60 import org.mr.core.protocol.MantaBusMessage;
61 import org.mr.core.protocol.MessageTransformer;
62 import org.mr.core.util.byteable.ByteBufferPool;
63
64 /**
65  *
66  * manages a set of multicast sockets gives and API to join, leave, send messages and regiter listeners to these sockets
67  * @author Amir Shevat
68  *
69  */

70 public class MutlicastGroupManager {
71     // keys for headers added to the messages by the group manager
72
public static final String JavaDoc GRUOP_SENDER_KEY = "%#$GRUOP_SENDER";
73     public static final String JavaDoc GRUOP_SUBJECT_KEY = "%#$GRUOP_SUBJECT";
74     // key = groupKey value = socket
75
HashMap JavaDoc multicastSockets = new HashMap JavaDoc();
76     // key = groupKey value = socket reactor
77
HashMap JavaDoc groupReactors = new HashMap JavaDoc();
78     // pool for buffers that are used in the recreating of messages from the sockets
79
private ByteBufferPool bufferPool;
80     private Log log;
81
82     /**
83      * inits the buffer pool
84      *
85      */

86     public MutlicastGroupManager(){
87         bufferPool = new ByteBufferPool(100 , 20 , 10);
88         log = LogFactory.getLog("MulticastGroupManager");
89
90     }
91
92     /**
93      * binds to a multicast socket is the socket is not binded already
94      * @param groupKey the information about the socket like ip and port
95      * @throws GroupsException if bind fail
96      */

97     public synchronized void joinGroup(GroupKey groupKey , String JavaDoc localBindAddress) throws GroupsException {
98         if(multicastSockets.get(groupKey)!= null){
99             // we are in the group
100
return;
101         }
102
103         try {
104             InetAddress JavaDoc group = InetAddress.getByName(groupKey.getGroupIP());
105             MulticastSocket JavaDoc s = new MulticastSocket JavaDoc(groupKey.getGroupPort());
106             s.setLoopbackMode(false);
107
108             s.setInterface(InetAddress.getByName(localBindAddress));
109             s.joinGroup(group);
110             multicastSockets.put(groupKey , s);
111         } catch (Exception JavaDoc e) {
112             throw new GroupsException(e.getMessage());
113         }
114
115
116     }
117
118     /**
119      * adds a listener to a multicast socket, with a multiplexing over a String subject
120      * @param groupKey the information about the socket like ip and port
121      * @param subject the multiplexoer over the multicast socket
122      * @param listener this object will be notified when message has arrived on the socket with that subject key
123      * @throws GroupsException if we are not binded to this socket
124      */

125      public synchronized void registerListenerToSubject(GroupKey groupKey, String JavaDoc subject, GroupMessageListener listener) throws GroupsException{
126
127         GroupReactor reactor = (GroupReactor) groupReactors.get(groupKey);
128         if(reactor == null){
129             MulticastSocket JavaDoc socket = (MulticastSocket JavaDoc) multicastSockets.get(groupKey);
130             if(socket == null){
131                 throw new GroupsException("Group not joined");
132             }
133             reactor = new GroupReactor(socket,groupKey);
134             reactor.start();
135             groupReactors.put(groupKey , reactor);
136         }
137         reactor.registerListenerToSubject(subject , listener);
138      }
139
140      /**
141       * sends a message on the multicast socket on a given subject
142       * @param groupKey the information about the socket like ip and port
143       * @param subject only listeners on the socket on this subject will get the message
144       * @param msg the message to be passed on the socket
145       * @throws Exception if fail to send message or is socket not binded
146       */

147      public synchronized void sendMessageToGroup(GroupKey groupKey , String JavaDoc subject , MantaBusMessage msg) throws Exception JavaDoc{
148         ByteBuffer JavaDoc buffer = null;
149         try{
150             msg.addHeader(GRUOP_SUBJECT_KEY ,subject );
151             MulticastSocket JavaDoc socket =(MulticastSocket JavaDoc) multicastSockets.get(groupKey);
152             if(socket == null){
153                 throw new GroupsException("Group not joined");
154             }
155             buffer = MessageTransformer.toBuffer(msg ,getBufferPool());
156             InetAddress JavaDoc groupAdd = InetAddress.getByName(groupKey.getGroupIP());
157             DatagramPacket JavaDoc send = new DatagramPacket JavaDoc(buffer.array(), buffer.remaining(),
158                     groupAdd, groupKey.getGroupPort());
159             socket.send(send);
160         } catch (IOException JavaDoc e) {
161             // after changing the IP address in linux, socket.send()
162
// throws an IOException each time.
163
recreateSocket(groupKey);
164         } catch(Throwable JavaDoc t){
165             t.printStackTrace();
166         }
167         finally{
168             if(buffer != null)
169                 getBufferPool().release(buffer);
170         }
171
172      }
173
174
175      /**
176          * @return Returns the bufferPool.
177          */

178         public final ByteBufferPool getBufferPool() {
179             return bufferPool;
180         }
181     public static void main(String JavaDoc[] args) throws Exception JavaDoc {
182
183          String JavaDoc msg = "Hello";
184          InetAddress JavaDoc group = InetAddress.getByName("228.5.6.7");
185          MulticastSocket JavaDoc s = new MulticastSocket JavaDoc(6789);
186          s.joinGroup(group);
187          DatagramPacket JavaDoc hi = new DatagramPacket JavaDoc(msg.getBytes(), msg.length(),
188                                      group, 6789);
189          s.send(hi);
190          // get their responses!
191
byte[] buf = new byte[1000];
192          DatagramPacket JavaDoc recv = new DatagramPacket JavaDoc(buf, buf.length);
193          s.receive(recv);
194
195          ByteBuffer JavaDoc buff = ByteBuffer.wrap(recv.getData() ,0,recv.getLength() );
196          String JavaDoc string = new String JavaDoc(buff.array());
197          System.out.println(string);
198          // OK, I'm done talking - leave the group...
199
s.leaveGroup(group);
200
201     }
202
203     private void recreateSocket(GroupKey groupKey) throws IOException JavaDoc {
204         String JavaDoc local = getValidLocalAddress();
205         if (local == null) {
206             // no local interface suitable for sending multicast traffic
207
return;
208         }
209         
210         GroupReactor reactor = (GroupReactor) groupReactors.get(groupKey);
211         MulticastSocket JavaDoc old =
212             (MulticastSocket JavaDoc) multicastSockets.remove(groupKey);
213         this.log.info("Trying to recreate multicast socket (old = " + old + ", local = " + local + ")");
214         MulticastSocket JavaDoc newSock = new MulticastSocket JavaDoc(groupKey.getGroupPort());
215         newSock.setLoopbackMode(false);
216         newSock.setInterface(InetAddress.getByName(local));
217         newSock.joinGroup(InetAddress.getByName(groupKey.getGroupIP()));
218         multicastSockets.put(groupKey, newSock);
219         reactor.setSocket(newSock);
220         old.close();
221         this.log.info("Recreated multicast socket on interface " + local);
222     }
223
224     private static synchronized String JavaDoc getValidLocalAddress() {
225         String JavaDoc validAddress = null;
226
227         try {
228             Enumeration JavaDoc ifs = NetworkInterface.getNetworkInterfaces();
229             while (ifs.hasMoreElements()) {
230                 NetworkInterface JavaDoc iface = (NetworkInterface JavaDoc)
231                     ifs.nextElement();
232                 Enumeration JavaDoc ips = iface.getInetAddresses();
233                 while (ips.hasMoreElements()) {
234                     InetAddress JavaDoc ip = (InetAddress JavaDoc) ips.nextElement();
235                     if (!ip.getHostAddress().equals("127.0.0.1")) {
236                         validAddress =ip.getHostAddress();
237                         return validAddress;
238                     }
239                 }
240             }
241         } catch (Throwable JavaDoc t) {}
242
243         return validAddress;
244     }
245     
246 }
247
Popular Tags