KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > ring > TcpRingNode


1 //$Id: TcpRingNode.java,v 1.3 2004/09/23 16:29:40 belaban Exp $
2

3 package org.jgroups.protocols.ring;
4
5 import org.apache.commons.logging.Log;
6 import org.apache.commons.logging.LogFactory;
7 import org.jgroups.Address;
8 import org.jgroups.SuspectedException;
9 import org.jgroups.TimeoutException;
10 import org.jgroups.blocks.GroupRequest;
11 import org.jgroups.stack.IpAddress;
12 import org.jgroups.stack.RpcProtocol;
13 import org.jgroups.util.Util;
14
15 import java.io.IOException JavaDoc;
16 import java.io.InterruptedIOException JavaDoc;
17 import java.io.ObjectInputStream JavaDoc;
18 import java.io.ObjectOutputStream JavaDoc;
19 import java.net.ServerSocket JavaDoc;
20 import java.net.Socket JavaDoc;
21 import java.util.Vector JavaDoc;
22
23
24 public class TcpRingNode implements RingNode
25 {
26
27    final ServerSocket JavaDoc tokenReceiver;
28    Socket JavaDoc previous,next;
29    final Address thisNode;
30      Address nextNode;
31    ObjectInputStream JavaDoc ios;
32    ObjectOutputStream JavaDoc oos;
33    final RpcProtocol rpcProtocol;
34    final boolean failedOnTokenLostException = false;
35
36    final Object JavaDoc socketMutex = new Object JavaDoc();
37     protected final Log log=LogFactory.getLog(this.getClass());
38
39    public TcpRingNode(RpcProtocol owner, Address memberAddress)
40    {
41       tokenReceiver = Util.createServerSocket(12000);
42       rpcProtocol = owner;
43       thisNode = memberAddress;
44       nextNode = null;
45    }
46
47    public IpAddress getTokenReceiverAddress()
48    {
49       return new IpAddress(tokenReceiver.getLocalPort());
50    }
51
52    public Object JavaDoc receiveToken(int timeout) throws TokenLostException
53    {
54       RingToken token = null;
55       Address wasNextNode = nextNode;
56       try
57       {
58          if (previous == null)
59          {
60             previous = tokenReceiver.accept();
61             ios = new ObjectInputStream JavaDoc((previous.getInputStream()));
62
63          }
64          previous.setSoTimeout(timeout);
65          token = new RingToken();
66          token.readExternal(ios);
67       }
68       catch (InterruptedIOException JavaDoc io)
69       {
70          //read was blocked for more than a timeout, assume token lost
71
throw new TokenLostException(io.getMessage(), io, wasNextNode, TokenLostException.WHILE_RECEIVING);
72       }
73       catch (ClassNotFoundException JavaDoc cantHappen)
74       {
75       }
76       catch (IOException JavaDoc ioe)
77       {
78          closeSocket(previous);
79          previous = null;
80          if (ios != null)
81          {
82             try
83             {
84                ios.close();
85             }
86             catch (IOException JavaDoc ignored)
87             {
88             }
89          }
90
91          token = (RingToken) receiveToken(timeout);
92       }
93       return token;
94    }
95
96    public Object JavaDoc receiveToken() throws TokenLostException
97    {
98       return receiveToken(0);
99    }
100
101    public void passToken(Object JavaDoc token) throws TokenLostException
102    {
103       synchronized (socketMutex)
104       {
105          try
106          {
107             ((RingToken) token).writeExternal(oos);
108             oos.flush();
109             oos.reset();
110          }
111          catch (IOException JavaDoc e)
112          {
113             e.printStackTrace();
114             //something went wrong with the next neighbour while it was receiving
115
//token, assume token lost
116
throw new TokenLostException(e.getMessage(), e, nextNode, TokenLostException.WHILE_SENDING);
117          }
118
119       }
120    }
121
122    public void tokenArrived(Object JavaDoc token)
123    {
124       //not needed , callback for udp ring
125
}
126
127    public void reconfigureAll(Vector JavaDoc newMembers)
128    {
129
130    }
131
132
133    public void reconfigure(Vector JavaDoc newMembers)
134    {
135
136       if (isNextNeighbourChanged(newMembers))
137       {
138          IpAddress tokenRecieverAddress = null;
139          synchronized (socketMutex)
140          {
141             nextNode = getNextNode(newMembers);
142
143                if(log.isInfoEnabled()) log.info(" next node " + nextNode);
144
145             try
146             {
147                tokenRecieverAddress = (IpAddress) rpcProtocol.callRemoteMethod(nextNode, "getTokenReceiverAddress", GroupRequest.GET_FIRST, 0);
148             }
149             catch (TimeoutException tim)
150             {
151                if(log.isErrorEnabled()) log.error(" timeouted while doing rpc call getTokenReceiverAddress" + tim);
152                tim.printStackTrace();
153             }
154             catch (SuspectedException sus)
155             {
156                if(log.isErrorEnabled()) log.error(" suspected node while doing rpc call getTokenReceiverAddress" + sus);
157                sus.printStackTrace();
158             }
159             try
160             {
161                closeSocket(next);
162                next = new Socket JavaDoc(tokenRecieverAddress.getIpAddress(), tokenRecieverAddress.getPort());
163                next.setTcpNoDelay(true);
164                oos = new ObjectOutputStream JavaDoc(next.getOutputStream());
165             }
166             catch (IOException JavaDoc ioe)
167             {
168                if(log.isErrorEnabled()) log.error("could not connect to next node " + ioe);
169                ioe.printStackTrace();
170             }
171          }
172       }
173    }
174
175    private void closeSocket(Socket JavaDoc socket)
176    {
177       if (socket == null) return;
178       try
179       {
180          socket.close();
181       }
182       catch (IOException JavaDoc ioe)
183       {
184          ioe.printStackTrace();
185       }
186
187    }
188
189    private boolean isNextNeighbourChanged(Vector JavaDoc newMembers)
190    {
191       Address oldNeighbour = nextNode;
192       Address newNeighbour = getNextNode(newMembers);
193       return !(newNeighbour.equals(oldNeighbour));
194    }
195
196    private Address getNextNode(Vector JavaDoc otherNodes)
197    {
198       int myIndex = otherNodes.indexOf(thisNode);
199       return (myIndex == otherNodes.size() - 1)?
200             (Address) otherNodes.firstElement():
201             (Address) otherNodes.elementAt(myIndex + 1);
202
203    }
204 }
205
Popular Tags