KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > TUNNEL


1 // $Id: TUNNEL.java,v 1.12 2005/04/20 20:25:47 belaban Exp $
2

3
4 package org.jgroups.protocols;
5
6
7 import org.jgroups.Address;
8 import org.jgroups.Event;
9 import org.jgroups.Message;
10 import org.jgroups.View;
11 import org.jgroups.util.TimeScheduler;
12 import org.jgroups.util.Util;
13 import org.jgroups.stack.Protocol;
14 import org.jgroups.stack.RouterStub;
15 import org.jgroups.stack.IpAddress;
16
17 import java.util.Enumeration JavaDoc;
18 import java.util.Properties JavaDoc;
19 import java.util.Vector JavaDoc;
20 import java.util.HashMap JavaDoc;
21
22
23
24
25 /**
26  * Replacement for UDP. Instead of sending packets via UDP, a TCP connection is opened to a Router
27  * (using the RouterStub client-side stub),
28  * the IP address/port of which was given using channel properties <code>router_host</code> and
29  * <code>router_port</code>. All outgoing traffic is sent via this TCP socket to the Router which
30  * distributes it to all connected TUNNELs in this group. Incoming traffic received from Router will
31  * simply be passed up the stack.
32  * <p>A TUNNEL layer can be used to penetrate a firewall, most firewalls allow creating TCP connections
33  * to the outside world, however, they do not permit outside hosts to initiate a TCP connection to a host
34  * inside the firewall. Therefore, the connection created by the inside host is reused by Router to
35  * send traffic from an outside host to a host inside the firewall.
36  * @author Bela Ban
37  */

38 public class TUNNEL extends Protocol implements Runnable JavaDoc {
39     final Properties JavaDoc properties=null;
40     String JavaDoc channel_name=null;
41     final Vector JavaDoc members=new Vector JavaDoc();
42     String JavaDoc router_host=null;
43     int router_port=0;
44     Address local_addr=null; // sock's local addr and local port
45
Thread JavaDoc receiver=null;
46     RouterStub stub=null;
47     private final Object JavaDoc stub_mutex=new Object JavaDoc();
48
49     /** If true, messages sent to self are treated specially: unicast messages are
50      * looped back immediately, multicast messages get a local copy first and -
51      * when the real copy arrives - it will be discarded. Useful for Window
52      * media (non)sense */

53     boolean loopback=true;
54
55     TimeScheduler timer=null;
56
57     Reconnector reconnector=null;
58     private final Object JavaDoc reconnector_mutex=new Object JavaDoc();
59
60     /** If set it will be added to <tt>local_addr</tt>. Used to implement
61      * for example transport independent addresses */

62     byte[] additional_data=null;
63
64
65     public TUNNEL() {
66     }
67
68
69     public String JavaDoc toString() {
70         return "Protocol TUNNEL(local_addr=" + local_addr + ')';
71     }
72
73
74
75
76     /*------------------------------ Protocol interface ------------------------------ */
77
78     public String JavaDoc getName() {
79         return "TUNNEL";
80     }
81
82     public void init() throws Exception JavaDoc {
83         super.init();
84         timer=stack.timer;
85     }
86
87     public void start() throws Exception JavaDoc {
88         createTunnel(); // will generate and pass up a SET_LOCAL_ADDRESS event
89
passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
90     }
91
92     public void stop() {
93         if(receiver != null)
94             receiver=null;
95         teardownTunnel();
96         stopReconnector();
97     }
98
99
100
101     /**
102      * DON'T REMOVE ! This prevents the up-handler thread to be created, which essentially is superfluous:
103      * messages are received from the network rather than from a layer below.
104      */

105     public void startUpHandler() {
106         ;
107     }
108
109     /** Setup the Protocol instance acording to the configuration string */
110     public boolean setProperties(Properties JavaDoc props) {
111         String JavaDoc str;
112
113         super.setProperties(props);
114         str=props.getProperty("router_host");
115         if(str != null) {
116             router_host=str;
117             props.remove("router_host");
118         }
119
120         str=props.getProperty("router_port");
121         if(str != null) {
122             router_port=Integer.parseInt(str);
123             props.remove("router_port");
124         }
125
126         if(log.isDebugEnabled()) {
127             log.debug("router_host=" + router_host + ";router_port=" + router_port);
128         }
129
130         if(router_host == null || router_port == 0) {
131             if(log.isErrorEnabled()) {
132                 log.error("both router_host and router_port have to be set !");
133                 return false;
134             }
135         }
136
137         str=props.getProperty("loopback");
138         if(str != null) {
139             loopback=Boolean.valueOf(str).booleanValue();
140             props.remove("loopback");
141         }
142
143         if(props.size() > 0) {
144             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
145             for(Enumeration JavaDoc e=props.propertyNames(); e.hasMoreElements();) {
146                 sb.append(e.nextElement().toString());
147                 if(e.hasMoreElements()) {
148                     sb.append(", ");
149                 }
150             }
151             if(log.isErrorEnabled()) log.error("The following properties are not recognized: " + sb);
152             return false;
153         }
154         return true;
155     }
156
157
158     /** Caller by the layer above this layer. We just pass it on to the router. */
159     public void down(Event evt) {
160         Message msg;
161         TunnelHeader hdr;
162         Address dest;
163
164         if(log.isDebugEnabled()) {
165             log.debug(evt.toString());
166         }
167
168         if(evt.getType() != Event.MSG) {
169             handleDownEvent(evt);
170             return;
171         }
172
173         hdr=new TunnelHeader(channel_name);
174         msg=(Message)evt.getArg();
175         dest=msg.getDest();
176         msg.putHeader(getName(), hdr);
177
178         if(msg.getSrc() == null)
179             msg.setSrc(local_addr);
180
181         // Don't send if destination is local address. Instead, switch dst and src and put in up_queue.
182
// If multicast message, loopback a copy directly to us (but still multicast). Once we receive this,
183
// we will discard our own multicast message
184
if(loopback && (dest == null || dest.equals(local_addr) || dest.isMulticastAddress())) {
185             Message copy=msg.copy();
186             // copy.removeHeader(name); // we don't remove the header
187
copy.setSrc(local_addr);
188             copy.setDest(dest);
189             evt=new Event(Event.MSG, copy);
190
191             /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
192                This allows e.g. PerfObserver to get the time of reception of a message */

193             if(observer != null)
194                 observer.up(evt, up_queue.size());
195             if(log.isTraceEnabled()) log.trace("looped back local message " + copy);
196             passUp(evt);
197             if(dest != null && !dest.isMulticastAddress())
198                 return;
199         }
200
201         if(!stub.isConnected() || !stub.send(msg, channel_name)) { // if msg is not sent okay,
202
startReconnector();
203         }
204     }
205
206
207     /** Creates a TCP connection to the router */
208     void createTunnel() throws Exception JavaDoc {
209         if(router_host == null || router_port == 0)
210             throw new Exception JavaDoc("router_host and/or router_port not set correctly; tunnel cannot be created");
211
212         synchronized(stub_mutex) {
213             stub=new RouterStub(router_host, router_port);
214             local_addr=stub.connect();
215             if(additional_data != null && local_addr instanceof IpAddress)
216                 ((IpAddress)local_addr).setAdditionalData(additional_data);
217         }
218         if(local_addr == null)
219             throw new Exception JavaDoc("could not obtain local address !");
220     }
221
222
223     /** Tears the TCP connection to the router down */
224     void teardownTunnel() {
225         synchronized(stub_mutex) {
226             if(stub != null) {
227                 stub.disconnect();
228                 stub=null;
229             }
230         }
231     }
232
233     /*--------------------------- End of Protocol interface -------------------------- */
234
235
236
237
238
239
240     public void run() {
241         Message msg;
242
243         if(stub == null) {
244             if(log.isErrorEnabled()) log.error("router stub is null; cannot receive messages from router !");
245             return;
246         }
247
248         while(receiver != null) {
249             msg=stub.receive();
250             if(msg == null) {
251                 if(receiver == null) break;
252                 if(log.isErrorEnabled()) log.error("received a null message. Trying to reconnect to router");
253                 if(!stub.isConnected())
254                     startReconnector();
255                 Util.sleep(5000);
256                 continue;
257             }
258             handleIncomingMessage(msg);
259         }
260     }
261
262
263
264
265
266     /* ------------------------------ Private methods -------------------------------- */
267
268
269
270
271     public void handleIncomingMessage(Message msg) {
272         TunnelHeader hdr=(TunnelHeader)msg.removeHeader(getName());
273
274         // discard my own multicast loopback copy
275
if(loopback) {
276             Address dst=msg.getDest();
277             Address SRC=msg.getSrc();
278
279             if(dst != null && dst.isMulticastAddress() && src != null && local_addr.equals(src)) {
280                 if(log.isTraceEnabled())
281                     log.trace("discarded own loopback multicast packet");
282                 return;
283             }
284         }
285
286          if(log.isDebugEnabled()) {
287              log.debug("received message " + msg);
288          }
289
290
291         /* Discard all messages destined for a channel with a different name */
292
293         String JavaDoc ch_name=hdr != null? hdr.channel_name : null;
294         if(ch_name != null && !channel_name.equals(ch_name))
295             return;
296
297         passUp(new Event(Event.MSG, msg));
298     }
299
300
301     void handleDownEvent(Event evt) {
302
303         switch(evt.getType()) {
304
305         case Event.TMP_VIEW:
306         case Event.VIEW_CHANGE:
307             synchronized(members) {
308                 members.removeAllElements();
309                 Vector JavaDoc tmpvec=((View)evt.getArg()).getMembers();
310                 for(int i=0; i < tmpvec.size(); i++)
311                     members.addElement(tmpvec.elementAt(i));
312             }
313             break;
314
315         case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local)
316
passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
317             break;
318
319         case Event.SET_LOCAL_ADDRESS:
320             local_addr=(Address)evt.getArg();
321             if(local_addr instanceof IpAddress && additional_data != null)
322                 ((IpAddress)local_addr).setAdditionalData(additional_data);
323             break;
324
325         case Event.CONNECT:
326             channel_name=(String JavaDoc)evt.getArg();
327             if(stub == null) {
328                 if(log.isErrorEnabled()) log.error("CONNECT: router stub is null!");
329             }
330             else {
331                 stub.register(channel_name);
332             }
333
334             receiver=new Thread JavaDoc(this, "TUNNEL receiver thread");
335             receiver.setDaemon(true);
336             receiver.start();
337
338             passUp(new Event(Event.CONNECT_OK));
339             break;
340
341         case Event.DISCONNECT:
342             if(receiver != null) {
343                 receiver=null;
344                 if(stub != null)
345                     stub.disconnect();
346             }
347             teardownTunnel();
348             passUp(new Event(Event.DISCONNECT_OK));
349             passUp(new Event(Event.SET_LOCAL_ADDRESS, null));
350             break;
351
352         case Event.CONFIG:
353             if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
354             handleConfigEvent((HashMap JavaDoc)evt.getArg());
355             break;
356         }
357     }
358
359     private void startReconnector() {
360         synchronized(reconnector_mutex) {
361             if(reconnector == null || reconnector.cancelled()) {
362                 reconnector=new Reconnector();
363                 timer.add(reconnector);
364             }
365         }
366     }
367
368     private void stopReconnector() {
369         synchronized(reconnector_mutex) {
370             if(reconnector != null) {
371                 reconnector.stop();
372                 reconnector=null;
373             }
374         }
375     }
376
377     void handleConfigEvent(HashMap JavaDoc map) {
378         if(map == null) return;
379         if(map.containsKey("additional_data"))
380             additional_data=(byte[])map.get("additional_data");
381     }
382
383     /* ------------------------------------------------------------------------------- */
384
385
386     private class Reconnector implements TimeScheduler.Task {
387         boolean cancelled=false;
388
389
390         public void stop() {
391             cancelled=true;
392         }
393
394         public boolean cancelled() {
395             return cancelled;
396         }
397
398         public long nextInterval() {
399             return 5000;
400         }
401
402         public void run() {
403             if(stub.reconnect()) {
404                 stub.register(channel_name);
405                 if(log.isDebugEnabled()) log.debug("reconnected");
406                 stop();
407             }
408         }
409     }
410
411
412 }
413
Popular Tags