KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > jonas > discovery > DiscoveryClientListener


1 /**
2  * JOnAS: Java(TM) Open Application Server
3  * Copyright (C) 2004 Bull S.A.
4  * Contact: jonas-team@objectweb.org
5  *
6  * This library is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU Lesser General Public License as published by the
8  * Free Software Foundation; either version 2.1 of the License, or any later
9  * version.
10  *
11  * This library is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
14  * for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with this library; if not, write to the Free Software Foundation,
18  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
19  *
20  */

21
22 package org.objectweb.jonas.discovery;
23
24 import java.io.ByteArrayInputStream JavaDoc;
25 import java.io.IOException JavaDoc;
26 import java.io.ObjectInputStream JavaDoc;
27 import java.net.DatagramPacket JavaDoc;
28 import java.net.DatagramSocket JavaDoc;
29 import java.net.InetAddress JavaDoc;
30 import java.net.MulticastSocket JavaDoc;
31 import java.net.SocketException JavaDoc;
32 import java.net.UnknownHostException JavaDoc;
33
34 import javax.management.Notification JavaDoc;
35
36 import org.objectweb.jonas.common.Log;
37 import org.objectweb.jonas.service.ServiceException;
38 import org.objectweb.util.monolog.api.BasicLevel;
39 import org.objectweb.util.monolog.api.Logger;
40
41 /**
42  * @author <a HREF="mailto:Takoua.Abdellatif@inria.fr">Takoua Abdellatif </a>
43  * @version 1.0
44  */

45 public class DiscoveryClientListener implements Runnable JavaDoc {
46
47     /**
48      * Management notification type for <i>discovery</i> events
49      */

50     public static final String JavaDoc DISCOVERY_TYPE = "jonas.management.discovery";
51
52     private static int RECEIVE_BUFFER_SIZE = 1024;
53
54     /**
55      * My discovery client
56      */

57     private DiscoveryClient discoveryClient;
58
59     /**
60      * Used to multicast a discovery message on run() execution
61      */

62     private MulticastSocket JavaDoc multicastSocket;
63
64     /**
65      * Port associated to the multicast socket
66      */

67     private int port;
68
69     /**
70      * IP address for the multicast socket
71      */

72     private InetAddress JavaDoc destAddress;
73
74     /**
75      * Time to live for multicatst packets
76      */

77     private int ttl;
78
79     /**
80      * Uset to receive a dicovery event as response to the sent discovey message
81      */

82     private DatagramSocket JavaDoc unicastSocket;
83
84     /**
85      * Ip address which identify the sender of the discovery message
86      */

87     private String JavaDoc sourceIp;
88
89     /**
90      * Port which identify by the sender of the discovery message
91      */

92     private int sourcePort;
93
94     /**
95      * State information
96      */

97     private boolean notStopped = true;
98
99     /**
100      * Time to wait for a response
101      */

102     private long timeout = 1000;
103
104     /**
105      * Used to construct JMX notifications
106      */

107     private long sequenceNumber = 0;
108
109     /**
110      * Logger
111      */

112     private static Logger logger = Log.getLogger(Log.JONAS_DISCOVERY_PREFIX);
113
114     /**
115      * Constructs a DiscoveryClientListener associated with a DiscoveryClient
116      * @param discoveryClient DiscoveryClient to which this thread is associated
117      */

118     public DiscoveryClientListener(DiscoveryClient discoveryClient) {
119         this.port = discoveryClient.getListeningPort();
120         try {
121             this.destAddress = InetAddress.getByName(discoveryClient.getListeningIp());
122             this.ttl = discoveryClient.getTimeToLive();
123         } catch (UnknownHostException JavaDoc e) {
124             logger.log(BasicLevel.ERROR, "Invalid host", e);
125         }
126         this.timeout = discoveryClient.getTimeout();
127         this.sourcePort = discoveryClient.getSourcePort();
128         this.sourceIp = discoveryClient.getSourceIp();
129         this.discoveryClient = discoveryClient;
130         
131         // Create a unicast socket to receive responses
132
try {
133             unicastSocket = new DatagramSocket JavaDoc(sourcePort);
134         } catch (SocketException JavaDoc e2) {
135             logger.log(BasicLevel.ERROR, "DiscoveryClient : Unable to create a Datagram socket", e2);
136             // Could not create datagram socket, so throw a ServiceException.
137
throw new ServiceException(
138                     "Could not create socket to listen for discovery "
139                             + "messages at port: " + sourcePort
140                             + ". The port might be in use.");
141         }
142     }
143
144     /**
145      * Sends a discovery message to the server group.
146      */

147     public void sendDiscoveryMessage(DiscMessage msg) {
148         if (logger.isLoggable(BasicLevel.DEBUG)) {
149             logger.log(BasicLevel.DEBUG, "DiscoveryClient : The message to send is " + msg);
150         }
151         // send the message on the multicast socket
152
// after packing it into a datagram
153
byte[] messageBytes = DiscMessage.objectToBytes(msg);
154         if (messageBytes != null) {
155             try {
156                 multicastSocket.send(new DatagramPacket JavaDoc(messageBytes, messageBytes.length, destAddress, port));
157             } catch (IOException JavaDoc e1) {
158                 logger.log(BasicLevel.ERROR, "DiscoveryClient : Error to send discovery message", e1);
159             }
160         }
161     }
162
163     /**
164      * @see java.lang.Runnable#run()
165      */

166     public void run() {
167         // Create a DatagramPacket to receive messages
168
DatagramPacket JavaDoc datagram = new DatagramPacket JavaDoc(new byte[RECEIVE_BUFFER_SIZE], RECEIVE_BUFFER_SIZE);
169         // Object reveived in a message
170
Object JavaDoc objReceived = null;
171         ObjectInputStream JavaDoc in = null;
172
173         // Create a multicast socket
174
try {
175             multicastSocket = new MulticastSocket JavaDoc(port);
176             multicastSocket.setTimeToLive(ttl);
177             /*
178              * Not necessary as the DiscoveryClient does not waits for multicast
179              * events
180              */

181             // multicastSocket.joinGroup(destAddress);
182
} catch (IOException JavaDoc e) {
183             // TODO Auto-generated catch block
184
e.printStackTrace();
185         }
186
187         /* Do this in constructor so the exception can be caught
188          * and acted on before a new thread is created.
189          *
190         // Create a unicast socket to receive responses
191         try {
192             unicastSocket = new DatagramSocket(sourcePort);
193         } catch (SocketException e2) {
194             logger.log(BasicLevel.ERROR, "DiscoveryClient : Unable to create a Datagram socket", e2);
195             // Could not create datagram socket, so throw a ServiceException.
196             throw new ServiceException(
197                     "Could not create socket to listen for discovery "
198                             + "messages at port: " + sourcePort
199                             + ". The port might be in use.");
200         }*/

201
202         // Prepare a discovery message
203
DiscMessage msg = new DiscMessage(sourceIp, sourcePort);
204         // First send a discovery message
205
sendDiscoveryMessage(msg);
206         if (logger.isLoggable(BasicLevel.DEBUG)) {
207             logger.log(BasicLevel.DEBUG, " DiscoveryClient: Sent Message is" + msg);
208         }
209
210         // wait for responses during a Timeout period on the unicast socket.
211
long lastTime = timeout + System.currentTimeMillis();
212         DiscEvent event = null;
213         try {
214             while ((notStopped) && System.currentTimeMillis() <= lastTime) {
215                 unicastSocket.receive(datagram);
216                 in = new ObjectInputStream JavaDoc(new ByteArrayInputStream JavaDoc(datagram.getData()));
217                 objReceived = in.readObject();
218                 if (objReceived != null) {
219                     if (objReceived instanceof DiscEvent) {
220                         event = (DiscEvent) objReceived;
221                         // Trust the datagram packet instead of the encoded URL
222
event.setSourceAddress(datagram.getAddress().getHostAddress());
223                         handleReceivedMessage(event);
224                     }
225                 }
226             }
227         } catch (SocketException JavaDoc e) {
228             logger.log(BasicLevel.ERROR, "DiscoveryClient : Socket closed", e);
229             notStopped = false;
230         } catch (IOException JavaDoc e1) {
231             logger.log(BasicLevel.ERROR, "DiscoveryClient IOException", e1);
232         } catch (ClassNotFoundException JavaDoc e) {
233             logger.log(BasicLevel.ERROR, "DiscoveryClient ClassNotFoundException ", e);
234         }
235     }
236
237     /**
238      * @param msg a discovery message response received.
239      */

240     private void handleReceivedMessage(DiscEvent msg) {
241         if (logger.isLoggable(BasicLevel.DEBUG)) {
242             logger.log(BasicLevel.DEBUG, "discovery event received: " + msg);
243         }
244
245         // create a JMX notification for all listeners of a discovery event
246
Notification JavaDoc notif = new Notification JavaDoc(DISCOVERY_TYPE, discoveryClient, sequenceNumber++, System
247                 .currentTimeMillis(), msg.getState());
248         notif.setUserData(msg);
249         discoveryClient.sendNotification(notif);
250     }
251
252     /**
253      * Stops the current thread
254      */

255     public void stop() {
256         notStopped = false;
257         Thread.interrupted();
258     }
259 }
Popular Tags