KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > jonas > discovery > DiscoveryComm


1 /**
2  * JOnAS: Java(TM) Open Application Server
3  * Copyright (C) 2004 Bull S.A.
4  * Contact: jonas-team@objectweb.org
5  *
6  * This library is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU Lesser General Public License as published by the
8  * Free Software Foundation; either version 2.1 of the License, or any later
9  * version.
10  *
11  * This library is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
14  * for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with this library; if not, write to the Free Software Foundation,
18  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
19  * --------------------------------------------------------------------------
20  * $Id: DiscoveryComm.java,v 1.8 2005/05/23 07:33:44 vivekl Exp $
21  * --------------------------------------------------------------------------
22  */

23
24 package org.objectweb.jonas.discovery;
25
26 import java.io.ByteArrayInputStream JavaDoc;
27 import java.io.IOException JavaDoc;
28 import java.io.ObjectInputStream JavaDoc;
29 import java.net.DatagramPacket JavaDoc;
30 import java.net.DatagramSocket JavaDoc;
31 import java.net.InetAddress JavaDoc;
32 import java.net.MulticastSocket JavaDoc;
33 import java.net.SocketException JavaDoc;
34 import java.net.UnknownHostException JavaDoc;
35
36 import org.objectweb.jonas.common.Log;
37 import org.objectweb.jonas.common.NetUtils;
38 import org.objectweb.util.monolog.api.BasicLevel;
39 import org.objectweb.util.monolog.api.Logger;
40
41 /**
42  * @author <a HREF="mailto:Takoua.Abdellatif@inria.fr">Takoua Abdellatif </a>
43  * @version 1.0
44  */

45 public class DiscoveryComm implements Runnable JavaDoc {
46     /**
47      * Size of buffer to read incoming packets into.
48      */

49     public static final int RECEIVE_BUFFER_SIZE = 1024;
50     /**
51      * Used to multicast a discovery event on run() execution
52      */

53     protected MulticastSocket JavaDoc multicastSocket;
54     /**
55      * Uset to send a dicovery event as response to a discovey message
56      */

57     protected DatagramSocket JavaDoc unicastSocket;
58
59     /**
60      * My discovery manager
61      */

62     private DiscoveryManager dm;
63     /**
64      * My manager's listening port
65      */

66     private int port;
67     /**
68      * My manager's multicast IP address
69      */

70     private InetAddress JavaDoc destAddress;
71     /**
72      * Set to false if the thread is stoped
73      */

74     protected boolean notStopped = true;
75     /**
76      * Time to live for multicatst packets
77      */

78     private int ttl = 1; // why 1 ??
79

80     /**
81      * Name for this jonas instance.
82      */

83     protected String JavaDoc jonasName = null;
84     /**
85      * Domain name that this instance belongs to.
86      */

87     protected String JavaDoc domainName = null;
88
89     /**
90      * The server ID of the jonas instance.
91      */

92     protected String JavaDoc serverId = null;
93
94
95     /**
96      * MBean server connection URLs for this server.
97      */

98     protected String JavaDoc[] urls = null;
99
100     /**
101      * logger
102      */

103     private static Logger logger = Log.getLogger(Log.JONAS_DISCOVERY_PREFIX);
104     /**
105      * Constructs a DiscoveryComm associated to the DiscoveryManager
106      * @param dm DiscoveryManager to which this thread is associated
107      */

108     public DiscoveryComm(DiscoveryManager dm) {
109         this.port = dm.getListeningPort();
110         try {
111             this.destAddress = InetAddress.getByName(dm.getListeningIp());
112             this.ttl = dm.getTimeToLive();
113             this.jonasName = dm.getJonasName();
114             this.domainName = dm.getDomainName();
115             this.urls = dm.getUrls();
116             this.serverId = dm.getServerId();
117         } catch (UnknownHostException JavaDoc e) {
118             logger.log(BasicLevel.ERROR, "Unknown Host", e);
119         }
120         this.dm = dm;
121     }
122
123     /**
124      * Creates a MulticastSocket and joins the group of multicas host
125      * identified by the InetAddress <code>destAddress</code>
126      *
127       */

128     protected void join() {
129         try {
130             multicastSocket = new MulticastSocket JavaDoc(port);
131             multicastSocket.setTimeToLive(ttl);
132             multicastSocket.joinGroup(destAddress);
133             if (logger.isLoggable(BasicLevel.DEBUG)) {
134                 logger.log(BasicLevel.DEBUG, "multicast ip address is "
135                         + destAddress);
136                 logger.log(BasicLevel.DEBUG, "multicast port is " + port);
137             }
138         } catch (IOException JavaDoc e) {
139             logger.log(BasicLevel.ERROR, "io problem", e);
140         }
141     }
142
143     /**
144      * sends (multicasts) a Discovery Message to the group.
145      * @param msg The message to send.
146      */

147     public void sendNotif(DiscMessage msg) {
148         try {
149             //send it on the multicast address
150
//after transforming the object to a datagram
151
if (logger.isLoggable(BasicLevel.DEBUG)) {
152                 logger.log(BasicLevel.DEBUG, msg);
153             }
154             byte[] messageBytes = DiscMessage.objectToBytes(msg);
155             multicastSocket.send(new DatagramPacket JavaDoc(messageBytes,
156                     messageBytes.length, destAddress, port));
157         } catch (IOException JavaDoc e1) {
158             logger.log(BasicLevel.ERROR, "DiscoveryComm: Error to send notification", e1);
159         }
160
161     }
162 /**
163  * Send response to a DiscoveryMessage
164  * @param msg Containes a DiscoveryMessage allowing to inform about the responder
165  * (name, state, URLs)
166  * @param destAddress the destination address picked up from the request
167  * @param port the destination port picked up from the request
168  */

169     protected void sendResponse(DiscMessage msg, InetAddress JavaDoc destAddress, int port) {
170         if (logger.isLoggable(BasicLevel.DEBUG)) {
171             logger.log(BasicLevel.DEBUG, "DiscoveryComm : The message to send is "
172                 + msg + "Sending it to: " + destAddress + " and port is: " + port);
173         }
174         byte[] messageBytes = DiscMessage.objectToBytes(msg);
175         if (messageBytes != null) {
176             try {
177                 // send the unicast response to the discovery client
178
unicastSocket.send(new DatagramPacket JavaDoc(messageBytes,
179                         messageBytes.length, destAddress, port));
180             } catch (IOException JavaDoc e) {
181                 logger.log(BasicLevel.ERROR, "DiscoveryComm: Error to send response to discovery message", e);
182             }
183         }
184     }
185
186     /**
187      * Create a discovery event to notify about a state change of the
188      * event sender
189      * @param state
190      * - RUNNING if the sender notifies that it gets running
191      * - STOPPING if the sender notifies that it stops running
192      * @return a Discovery event (notification)
193      * @throws Exception
194      * is thrown if the jmx service is not reached.
195      */

196     public DiscEvent createNotifMessage(String JavaDoc state) throws Exception JavaDoc {
197         String JavaDoc theHostAddress;
198         try {
199             theHostAddress = NetUtils.getLocalAddress();
200         } catch (UnknownHostException JavaDoc e) {
201             logger.log(BasicLevel.ERROR, "Unknown host", e);
202             return null;
203         }
204
205         if (!state.equals(DiscEvent.RUNNING)) {
206             urls = null;
207         }
208         // In the case of a notification, the field port is not important since the
209
// notifier is not waiting for an acknowledfement.
210
DiscEvent resp = new DiscEvent(theHostAddress, port, jonasName, domainName, serverId, urls);
211         resp.setState(state);
212         return resp;
213     }
214
215
216     /**
217      *
218      * @see java.lang.Runnable#run()
219      */

220     public void run() {
221         // Create a DatagramPacket to receive messages
222
DatagramPacket JavaDoc datagram = new DatagramPacket JavaDoc(new byte[RECEIVE_BUFFER_SIZE],
223                 RECEIVE_BUFFER_SIZE);
224         // Object reveived in a message
225
Object JavaDoc objReceived = null;
226         ObjectInputStream JavaDoc in = null;
227
228         // Join the group in order to receive multicast messages
229
join();
230     // Create notification message containing a discovery event with state RUNNING
231
DiscEvent msg = null;
232         try {
233             msg = createNotifMessage(DiscEvent.RUNNING);
234         } catch (Exception JavaDoc e) {
235             logger.log(BasicLevel.ERROR,
236                 "DiscoveryComm: Unable to create a notification message", e);
237         }
238         if (msg != null) {
239             // Multicast the message
240
sendNotif(msg);
241         }
242         // Create the socket to be used for responding
243
try {
244             unicastSocket = new DatagramSocket JavaDoc();
245         } catch (SocketException JavaDoc e3) {
246             logger.log(BasicLevel.ERROR, "Socket exception", e3);
247             return;
248         }
249         try {
250             while (notStopped) {
251                 multicastSocket.receive(datagram);
252                 in = new ObjectInputStream JavaDoc(new ByteArrayInputStream JavaDoc(datagram.getData()));
253                 objReceived = in.readObject();
254
255                 if (objReceived != null) {
256                     // The DiscEvents are ignored
257
if (objReceived instanceof DiscMessage) {
258                         if ((objReceived instanceof DiscEvent) || (objReceived instanceof DiscGreeting) ) {
259                             if (logger.isLoggable(BasicLevel.DEBUG)) {
260                                 logger.log(BasicLevel.DEBUG,
261                                         "This discovery event is ignored " + objReceived);
262                             }
263                         } else {
264                             DiscMessage request = (DiscMessage) objReceived;
265                             if (logger.isLoggable(BasicLevel.DEBUG)) {
266                                 logger.log(BasicLevel.DEBUG,
267                                         "A dicovery message is received "
268                                         + objReceived);
269                             }
270
271                             /* do not create a new discovery event object with state RUNNING but
272                              * reuse the one created above
273                              */

274                             /*
275                             DiscEvent msg = null;
276                             try {
277                                 msg = createNotifMessage(DiscEvent.RUNNING);
278                             } catch (Exception e) {
279                                 logger.log(BasicLevel.ERROR,
280                                     "DiscoveryComm: Unable to create a notification message");
281                                 e.printStackTrace();
282                             }
283                             */

284                             if (msg != null) {
285                                 // Use the address in the datagram packet instead of trusting the message.
286
InetAddress JavaDoc destAddress = datagram.getAddress();
287                                 int destPort = request.getSourcePort();
288                                 sendResponse(msg, destAddress, destPort);
289                             }
290                         }
291                     }
292                 }
293             }
294         } catch (SocketException JavaDoc e) {
295             logger.log(BasicLevel.ERROR, "Socket closed: ", e);
296             notStopped = false;
297         } catch (IOException JavaDoc e1) {
298             logger.log(BasicLevel.ERROR, e1);
299         } catch (ClassNotFoundException JavaDoc e) {
300             logger.log(BasicLevel.ERROR, e);
301         }
302     }
303
304     /**
305      * sends a notification message to notify that the server is stopping.
306      *
307      */

308     public void stop() {
309         // send a notification message of type STOPPING
310
DiscEvent msg = null;
311         try {
312             if (logger.isLoggable(BasicLevel.DEBUG)) {
313                 logger.log(BasicLevel.DEBUG, "Sending a STOPPING DiscEvent.");
314             }
315             msg = createNotifMessage(DiscEvent.STOPPING);
316         } catch (Exception JavaDoc e) {
317             logger.log(BasicLevel.ERROR, e);
318         }
319         if (msg != null) {
320             sendNotif(msg);
321         }
322         Thread.interrupted();
323     }
324
325     /**
326      * @param jonasName The jonasName to set.
327      */

328     protected void setJonasName(String JavaDoc jonasName) {
329         this.jonasName = jonasName;
330     }
331     /**
332      * @param domainName The domainName to set.
333      */

334     protected void setDomainName(String JavaDoc domainName) {
335         this.domainName = domainName;
336     }
337     /**
338      * @param urls The urls to set.
339      */

340     protected void setUrls(String JavaDoc[] urls) {
341         this.urls = urls;
342     }
343
344     public String JavaDoc getServerId() {
345         return serverId;
346     }
347
348     public void setServerId(String JavaDoc serverId) {
349         this.serverId = serverId;
350     }
351 }
Popular Tags