KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > sapia > ubik > mcast > EventChannel


1 package org.sapia.ubik.mcast;
2
3 import org.sapia.ubik.net.ServerAddress;
4
5 import java.io.IOException JavaDoc;
6
7 import java.util.ArrayList JavaDoc;
8 import java.util.List JavaDoc;
9 import org.sapia.ubik.rmi.Consts;
10 import org.sapia.ubik.rmi.server.Log;
11
12
13 /**
14  * An instance of this class represents a node in a given logical event channel. Instances of this
15  * class are logically grouped on a per-domain basis. Remote events are sent/dispatched to other
16  * instances of this class through the network.
17  * <p>
18  * A given <code>EventChannel</code> instance will only send/received events to/from other instances
19  * of the same domain.
20  *
21  * @see org.sapia.ubik.mcast.DomainName
22  * @see org.sapia.ubik.mcast.RemoteEvent
23  *
24  * @author Yanick Duchesne
25  * <dl>
26  * <dt><b>Copyright:</b><dd>Copyright &#169; 2002-2003 <a HREF="http://www.sapia-oss.org">Sapia Open Source Software</a>. All Rights Reserved.</dd></dt>
27  * <dt><b>License:</b><dd>Read the license.txt file of the jar or visit the
28  * <a HREF="http://www.sapia-oss.org/license.html">license page</a> at the Sapia OSS web site</dd></dt>
29  * </dl>
30  */

31 public class EventChannel {
32   static final String JavaDoc DISCOVER_EVT = "ubik/mcast/discover";
33   static final String JavaDoc PUBLISH_EVT = "ubik/mcast/publish";
34   static final String JavaDoc HEARTBEAT_EVT = "ubik/mcast/heartbeat";
35   BroadcastDispatcher _broadcast;
36   UnicastDispatcher _unicast;
37   EventConsumer _consumer;
38   ChannelEventListener _listener;
39   View _view = new View(30000);
40   ServerAddress _address;
41   List JavaDoc _discoListeners = new ArrayList JavaDoc();
42   boolean _started;
43   boolean _closed;
44
45   /**
46    * Constructor for EventChannel. For point-to-point communication, this instance will
47    * open a UDP server on a random port.
48    *
49    * @param domain the domain name of this instance.
50    * @param mcastHost the multicast address that this instance will use to broadcast remote events.
51    * @param mcastPort the multicast port that this instance will use to broadcast remote events.
52    *
53    * @see DomainName
54    */

55   public EventChannel(String JavaDoc domain, String JavaDoc mcastHost, int mcastPort)
56     throws IOException JavaDoc {
57     _consumer = new EventConsumer(domain);
58     _broadcast = new BroadcastDispatcherImpl(_consumer, mcastHost, mcastPort);
59     _unicast = new UDPUnicastDispatcher(10000, _consumer);
60     init();
61   }
62
63   /**
64    * Constructor for EventChannel.
65    *
66    * @param domain the domain name of this instance.
67    * @param mcastHost the multicast address that this instance will use to broadcast remote events.
68    * @param mcastPort the multicast port that this instance will use to broadcast remote events.
69    * @param unicastPort the port of the UDP server that this instance encapsulates, and that is
70    * used for point-to-point communication.
71    *
72    * @see DomainName
73    */

74   public EventChannel(String JavaDoc domain, String JavaDoc mcastHost, int mcastPort,
75     int unicastPort) throws IOException JavaDoc {
76     _consumer = new EventConsumer(domain);
77     
78     String JavaDoc soTimeoutProp = System.getProperty(Consts.MCAST_HEARTBEAT_INTERVAL);
79     int soTimeout = 20000;
80     if(soTimeoutProp != null){
81       try{
82         soTimeout = Integer.parseInt(soTimeoutProp);
83       }catch(NumberFormatException JavaDoc e){
84         // use default
85
}
86     }
87     _unicast = new UDPUnicastDispatcher(soTimeout, unicastPort, _consumer);
88     _broadcast = new BroadcastDispatcherImpl(_consumer, mcastHost, mcastPort);
89     init();
90   }
91
92   /**
93    * Returns this instance's domain name.
94    *
95    * @return a <code>DomainName</code>.
96    */

97   public DomainName getDomainName() {
98     return _consumer.getDomainName();
99   }
100
101   /**
102    * Returns this instance's multicast address.
103    *
104    * @return a host address as a string.
105    */

106   public String JavaDoc getMulticastHost() {
107     return _broadcast.getMulticastAddress();
108   }
109
110   /**
111    * Returns this instance's multicast port.
112    *
113    * @return a port.
114    */

115   public int getMulticastPort() {
116     return _broadcast.getMulticastPort();
117   }
118
119   /**
120    * Starts this instances. This method should be called after instantiating this instance, prior to start
121    * receiving/sending remote events.
122    *
123    * @throws IOException if an IO problem occurs starting this instance.
124    */

125   public void start() throws IOException JavaDoc {
126     _listener = new ChannelEventListener(this);
127     _consumer.registerAsyncListener(PUBLISH_EVT, _listener);
128     _consumer.registerAsyncListener(DISCOVER_EVT, _listener);
129     _consumer.registerAsyncListener(HEARTBEAT_EVT, _listener);
130     _unicast.setSoTimeoutListener(_listener);
131     _unicast.setBufsize(2000);
132     _broadcast.start();
133     _unicast.start();
134     _address = _unicast.getAddress();
135     _broadcast.dispatch(false, PUBLISH_EVT, _address);
136     _started = true;
137   }
138
139   /**
140    * @return <code>true</code> if the <code>start()</code> method was called on this instance.
141    *
142    * @see #start()
143    */

144   public boolean isStarted() {
145     return _started;
146   }
147
148   /**
149    * Closes this instance.
150    */

151   public void close() {
152     _broadcast.close();
153     _unicast.close();
154     _closed = true;
155   }
156
157   /**
158    * @return <code>true</code> if the <code>close()</code> method was called on this instance.
159    *
160    * @see #close()
161    */

162   public boolean isClosed() {
163     return _closed;
164   }
165
166   /**
167    * @see org.sapia.ubik.mcast.BroadcastDispatcher#dispatch(boolean, String, Object)
168    */

169   public void dispatch(boolean alldomains, String JavaDoc type, Object JavaDoc data)
170     throws IOException JavaDoc {
171     _broadcast.dispatch(alldomains, type, data);
172   }
173
174   /**
175    * @see org.sapia.ubik.mcast.UnicastDispatcher#dispatch(ServerAddress, String, Object)
176    */

177   public void dispatch(ServerAddress addr, String JavaDoc type, Object JavaDoc data)
178     throws IOException JavaDoc {
179     _unicast.dispatch(addr, type, data);
180   }
181
182   /**
183    * Dispatches the given data to all nodes in this instance's domain.
184    *
185    * @see org.sapia.ubik.mcast.BroadcastDispatcher#dispatch(String, String, Object)
186    */

187   public void dispatch(String JavaDoc type, Object JavaDoc data) throws IOException JavaDoc {
188     if(Log.isDebug()){
189       Log.debug(getClass(), "Sending event " + type + " - " + data);
190     }
191     _broadcast.dispatch(_consumer.getDomainName().toString(), type, data);
192   }
193
194   /**
195    * Adds the given discovery listener to this instance.
196    *
197    * @param listener a <code>DiscoveryListener</code>.
198    */

199   public void addDiscoveryListener(DiscoveryListener listener) {
200     _discoListeners.add(listener);
201   }
202
203   /**
204    * Synchronously sends a remote event to the node corresponding to the given <code>ServerAddress</code>,
205    * and returns the corresponding response.
206    *
207    * @param addr the <code>ServerAddress</code> of the node to which to send the remote event.
208    * @param type the "logical type" of the remote event.
209    * @param data the data to encapsulate in the remote event.
210    *
211    * @return the <code>Response</code> corresponding to this call.
212    *
213    * @see RemoteEvent
214    */

215   public Response send(ServerAddress addr, String JavaDoc type, Object JavaDoc data)
216     throws IOException JavaDoc, TimeoutException {
217     return _unicast.send(addr, type, data);
218   }
219
220   /**
221    * Synchronously sends a remote event to all the nodes corresponding to the given <code>ServerAddress</code>,
222    * and returns the corresponding responses.
223    * <p>
224    *
225    * @param type the "logical type" of the remote event.
226    * @param data the data to encapsulate in the remote event.
227    *
228    * @see org.sapia.ubik.mcast.RemoteEvent
229    */

230   public RespList send(String JavaDoc type, Object JavaDoc data) throws IOException JavaDoc {
231     return _unicast.send(_view.getHosts(), type, data);
232   }
233
234   /**
235    * Registers a listener of asynchronous remote events of the given type.
236    *
237    * @param type the logical type of the remotee events to listen for.
238    * @param listener an <code>AsyncEventListener</code>.
239    */

240   public synchronized void registerAsyncListener(String JavaDoc type,
241     AsyncEventListener listener) {
242     _consumer.registerAsyncListener(type, listener);
243   }
244
245   /**
246    * Registers a listener of synchronous remote events of the given type.
247    *
248    * @param type the logical type of the remotee events to listen for.
249    * @param listener a <code>SyncEventListener</code>.
250    *
251    * @throws ListenerAlreadyRegisteredException if a listener has already been
252    * registered for the given event type.
253    */

254   public synchronized void registerSyncListener(String JavaDoc type,
255     SyncEventListener listener) throws ListenerAlreadyRegisteredException {
256     _consumer.registerSyncListener(type, listener);
257   }
258
259   /**
260    * Unregisters the given listener from this instance.
261    *
262    * @param listener a <code>ASyncEventListener</code>.
263    */

264   public synchronized void unregisterListener(AsyncEventListener listener) {
265     _consumer.unregisterListener(listener);
266   }
267
268   /**
269    * Returns this instance's "view".
270    *
271    * @return a <code>View</code>.
272    */

273   public View getView() {
274     return _view;
275   }
276
277   /**
278    * @see EventConsumer#containsAsyncListener(AsyncEventListener)
279    */

280   public synchronized boolean containsAsyncListener(AsyncEventListener listener) {
281     return _consumer.containsAsyncListener(listener);
282   }
283
284   /**
285    * @see EventConsumer#containsSyncListener(SyncEventListener)
286    */

287   public synchronized boolean containsSyncListener(SyncEventListener listener) {
288     return _consumer.containsSyncListener(listener);
289   }
290
291   /**
292    * @see BroadcastDispatcher#setBufsize(int)
293    * @see UnicastDispatcher#setBufsize(int)
294    */

295   public void setBufsize(int size) {
296     _broadcast.setBufsize(size);
297     _unicast.setBufsize(size);
298   }
299
300   /**
301    * @see BroadcastDispatcher#getNode()
302    */

303   public String JavaDoc getNode() {
304     return _broadcast.getNode();
305   }
306
307   public static class ChannelEventListener implements AsyncEventListener,
308     SocketTimeoutListener {
309     private EventChannel _owner;
310
311     ChannelEventListener(EventChannel channel) {
312       _owner = channel;
313     }
314
315     /**
316      * @see org.sapia.ubik.mcast.SocketTimeoutListener#handleSoTimeout()
317      */

318     public void handleSoTimeout() {
319       _owner._view.removeDeadHosts();
320
321       List JavaDoc siblings = _owner._view.getHosts();
322
323       if (_owner._address != null) {
324         for (int i = 0; i < siblings.size(); i++) {
325           try {
326             //System.out.println("sending heartbeat");
327
_owner.dispatch((ServerAddress) siblings.get(i), HEARTBEAT_EVT,
328               _owner._address);
329           } catch (IOException JavaDoc e) {
330             e.printStackTrace();
331
332             break;
333           }
334         }
335       }
336     }
337
338     /**
339      * @see org.sapia.ubik.mcast.AsyncEventListener#onAsyncEvent(RemoteEvent)
340      */

341     public void onAsyncEvent(RemoteEvent evt) {
342       if (evt.getType().equals(DISCOVER_EVT)) {
343         ServerAddress addr;
344
345         try {
346           addr = (ServerAddress) evt.getData();
347           if(addr == null){
348             return;
349           }
350           _owner._view.addHost(addr, evt.getNode());
351
352           List JavaDoc listeners = _owner._discoListeners;
353
354           for (int i = 0; i < listeners.size(); i++) {
355             ((DiscoveryListener) listeners.get(i)).onDiscovery(addr, evt);
356           }
357         } catch (IOException JavaDoc e) {
358           e.printStackTrace();
359         }
360       } else if (evt.getType().equals(PUBLISH_EVT)) {
361         try {
362           ServerAddress addr = (ServerAddress) evt.getData();
363           if(addr == null){
364             return;
365           }
366
367           _owner._view.addHost(addr, evt.getNode());
368           _owner.dispatch(false, DISCOVER_EVT, _owner._address);
369
370           List JavaDoc listeners = _owner._discoListeners;
371
372           for (int i = 0; i < listeners.size(); i++) {
373             ((DiscoveryListener) listeners.get(i)).onDiscovery(addr, evt);
374           }
375         } catch (IOException JavaDoc e) {
376           e.printStackTrace();
377         }
378       } else if (evt.getType().equals(HEARTBEAT_EVT)) {
379         try {
380           ServerAddress addr = (ServerAddress) evt.getData();
381
382           _owner._view.heartbeat(addr, evt.getNode());
383         } catch (IOException JavaDoc e) {
384           e.printStackTrace();
385         }
386       }
387     }
388   }
389   
390   private void init(){
391     String JavaDoc bufsizeStr = System.getProperty(Consts.MCAST_BUFSIZE_KEY);
392     if(bufsizeStr != null){
393       try{
394         int buf = Integer.parseInt(bufsizeStr);
395         if(buf > 0){
396           _broadcast.setBufsize(buf);
397           _unicast.setBufsize(buf);
398         }
399       }catch(NumberFormatException JavaDoc e){
400         // use default;
401
}
402     }
403     
404     String JavaDoc heartBeatTimeout = System.getProperty(Consts.MCAST_HEARTBEAT_TIMEOUT);
405     if(heartBeatTimeout != null){
406       try{
407         _view.setTimeout(Long.parseLong(heartBeatTimeout));
408       }catch(NumberFormatException JavaDoc e){
409         // use default;
410
}
411     }
412   }
413 }
414
Popular Tags