1 3 package org.jgroups.protocols.ring; 4 5 import org.apache.commons.logging.Log; 6 import org.apache.commons.logging.LogFactory; 7 import org.jgroups.Address; 8 import org.jgroups.SuspectedException; 9 import org.jgroups.TimeoutException; 10 import org.jgroups.blocks.GroupRequest; 11 import org.jgroups.stack.IpAddress; 12 import org.jgroups.stack.RpcProtocol; 13 import org.jgroups.util.Util; 14 15 import java.io.IOException ; 16 import java.io.InterruptedIOException ; 17 import java.io.ObjectInputStream ; 18 import java.io.ObjectOutputStream ; 19 import java.net.ServerSocket ; 20 import java.net.Socket ; 21 import java.util.Vector ; 22 23 24 public class TcpRingNode implements RingNode 25 { 26 27 final ServerSocket tokenReceiver; 28 Socket previous,next; 29 final Address thisNode; 30 Address nextNode; 31 ObjectInputStream ios; 32 ObjectOutputStream oos; 33 final RpcProtocol rpcProtocol; 34 final boolean failedOnTokenLostException = false; 35 36 final Object socketMutex = new Object (); 37 protected final Log log=LogFactory.getLog(this.getClass()); 38 39 public TcpRingNode(RpcProtocol owner, Address memberAddress) 40 { 41 tokenReceiver = Util.createServerSocket(12000); 42 rpcProtocol = owner; 43 thisNode = memberAddress; 44 nextNode = null; 45 } 46 47 public IpAddress getTokenReceiverAddress() 48 { 49 return new IpAddress(tokenReceiver.getLocalPort()); 50 } 51 52 public Object receiveToken(int timeout) throws TokenLostException 53 { 54 RingToken token = null; 55 Address wasNextNode = nextNode; 56 try 57 { 58 if (previous == null) 59 { 60 previous = tokenReceiver.accept(); 61 ios = new ObjectInputStream ((previous.getInputStream())); 62 63 } 64 previous.setSoTimeout(timeout); 65 token = new RingToken(); 66 token.readExternal(ios); 67 } 68 catch (InterruptedIOException io) 69 { 70 throw new TokenLostException(io.getMessage(), io, wasNextNode, TokenLostException.WHILE_RECEIVING); 72 } 73 catch (ClassNotFoundException cantHappen) 74 { 75 } 76 catch (IOException ioe) 77 { 78 closeSocket(previous); 79 previous = null; 80 if (ios != null) 81 { 82 try 83 { 84 ios.close(); 85 } 86 catch (IOException ignored) 87 { 88 } 89 } 90 91 token = (RingToken) receiveToken(timeout); 92 } 93 return token; 94 } 95 96 public Object receiveToken() throws TokenLostException 97 { 98 return receiveToken(0); 99 } 100 101 public void passToken(Object token) throws TokenLostException 102 { 103 synchronized (socketMutex) 104 { 105 try 106 { 107 ((RingToken) token).writeExternal(oos); 108 oos.flush(); 109 oos.reset(); 110 } 111 catch (IOException e) 112 { 113 e.printStackTrace(); 114 throw new TokenLostException(e.getMessage(), e, nextNode, TokenLostException.WHILE_SENDING); 117 } 118 119 } 120 } 121 122 public void tokenArrived(Object token) 123 { 124 } 126 127 public void reconfigureAll(Vector newMembers) 128 { 129 130 } 131 132 133 public void reconfigure(Vector newMembers) 134 { 135 136 if (isNextNeighbourChanged(newMembers)) 137 { 138 IpAddress tokenRecieverAddress = null; 139 synchronized (socketMutex) 140 { 141 nextNode = getNextNode(newMembers); 142 143 if(log.isInfoEnabled()) log.info(" next node " + nextNode); 144 145 try 146 { 147 tokenRecieverAddress = (IpAddress) rpcProtocol.callRemoteMethod(nextNode, "getTokenReceiverAddress", GroupRequest.GET_FIRST, 0); 148 } 149 catch (TimeoutException tim) 150 { 151 if(log.isErrorEnabled()) log.error(" timeouted while doing rpc call getTokenReceiverAddress" + tim); 152 tim.printStackTrace(); 153 } 154 catch (SuspectedException sus) 155 { 156 if(log.isErrorEnabled()) log.error(" suspected node while doing rpc call getTokenReceiverAddress" + sus); 157 sus.printStackTrace(); 158 } 159 try 160 { 161 closeSocket(next); 162 next = new Socket (tokenRecieverAddress.getIpAddress(), tokenRecieverAddress.getPort()); 163 next.setTcpNoDelay(true); 164 oos = new ObjectOutputStream (next.getOutputStream()); 165 } 166 catch (IOException ioe) 167 { 168 if(log.isErrorEnabled()) log.error("could not connect to next node " + ioe); 169 ioe.printStackTrace(); 170 } 171 } 172 } 173 } 174 175 private void closeSocket(Socket socket) 176 { 177 if (socket == null) return; 178 try 179 { 180 socket.close(); 181 } 182 catch (IOException ioe) 183 { 184 ioe.printStackTrace(); 185 } 186 187 } 188 189 private boolean isNextNeighbourChanged(Vector newMembers) 190 { 191 Address oldNeighbour = nextNode; 192 Address newNeighbour = getNextNode(newMembers); 193 return !(newNeighbour.equals(oldNeighbour)); 194 } 195 196 private Address getNextNode(Vector otherNodes) 197 { 198 int myIndex = otherNodes.indexOf(thisNode); 199 return (myIndex == otherNodes.size() - 1)? 200 (Address) otherNodes.firstElement(): 201 (Address) otherNodes.elementAt(myIndex + 1); 202 203 } 204 } 205 | Popular Tags |