1 3 package org.jgroups.blocks; 4 5 import org.jgroups.Address; 6 import org.jgroups.Message; 7 import org.jgroups.stack.IpAddress; 8 import org.jgroups.util.Util; 9 import org.jgroups.util.Util1_4; 10 11 import java.io.IOException ; 12 import java.net.*; 13 import java.nio.ByteBuffer ; 14 import java.nio.channels.SelectionKey ; 15 import java.nio.channels.Selector ; 16 import java.nio.channels.ServerSocketChannel ; 17 import java.nio.channels.SocketChannel ; 18 import java.util.ArrayList ; 19 import java.util.Iterator ; 20 import java.util.Set ; 21 22 23 34 public class ConnectionTable1_4 extends ConnectionTable implements Runnable { 35 36 private ServerSocketChannel srv_sock_ch=null; 37 private Selector selector=null; 38 private ArrayList pendingSocksList=null; 39 40 44 public ConnectionTable1_4(int srv_port) throws Exception { 45 super(srv_port); 46 } 47 48 54 public ConnectionTable1_4(int srv_port, long reaper_interval, 55 long conn_expire_time) throws Exception { 56 super(srv_port, reaper_interval, conn_expire_time); 57 } 58 59 67 public ConnectionTable1_4(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port) 68 throws Exception { 69 super(r, bind_addr, external_addr, srv_port, max_port); 70 } 71 72 82 public ConnectionTable1_4(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port, 83 long reaper_interval, long conn_expire_time) throws Exception { 84 super(r, bind_addr, external_addr, srv_port, max_port, reaper_interval, conn_expire_time); 85 } 86 87 90 ConnectionTable.Connection getConnection(Address dest) throws Exception { 91 Connection conn=null; 92 SocketChannel sock_ch; 93 94 synchronized(conns) { 95 conn=(Connection)conns.get(dest); 96 if(conn == null) { 97 InetSocketAddress destAddress= 98 new InetSocketAddress(((IpAddress)dest).getIpAddress(), 99 ((IpAddress)dest).getPort()); 100 sock_ch=SocketChannel.open(destAddress); 101 conn=new Connection(sock_ch, dest); 102 conn.sendLocalAddress(local_addr); 103 addConnection(dest, conn); 105 pendingSocksList.add(conn); 106 selector.wakeup(); 107 notifyConnectionOpened(dest); 108 if(log.isInfoEnabled()) log.info("created socket to " + dest); 109 } 110 return conn; 111 } 112 } 113 114 117 public void stop() { 118 if(srv_sock_ch != null) { 119 try { 120 ServerSocketChannel temp=srv_sock_ch; 121 srv_sock_ch=null; 122 temp.close(); 123 } 124 catch(Exception ex) { 125 } 126 } 127 super.stop(); 128 } 129 130 135 public void run() { 136 Socket client_sock; 137 Connection conn=null; 138 Address peer_addr; 139 140 while(srv_sock_ch != null) { 141 try { 142 if(selector.select() > 0) { 143 Set readyKeys=selector.selectedKeys(); 144 for(Iterator i=readyKeys.iterator(); i.hasNext();) { 145 SelectionKey key=(SelectionKey )i.next(); 146 i.remove(); 147 if((key.readyOps() & SelectionKey.OP_ACCEPT) 148 == SelectionKey.OP_ACCEPT) { 149 ServerSocketChannel readyChannel= 150 (ServerSocketChannel )key.channel(); 151 152 SocketChannel client_sock_ch= 153 readyChannel.accept(); 154 client_sock=client_sock_ch.socket(); 155 156 if(log.isInfoEnabled()) 157 log.info("accepted connection, client_sock=" 158 + client_sock); 159 160 conn=new Connection(client_sock_ch, null); 161 peer_addr=conn.readPeerAddress(client_sock); 164 165 conn.setPeerAddress(peer_addr); 166 167 synchronized(conns) { 168 if(conns.containsKey(peer_addr)) { 169 170 if(log.isWarnEnabled()) 171 log.warn(peer_addr 172 + " is already there, will terminate connection"); 173 conn.destroy(); 174 return; 175 } 176 addConnection(peer_addr, conn); 177 } 178 conn.init(); 179 notifyConnectionOpened(peer_addr); 180 } 181 else 182 if( 183 (key.readyOps() & SelectionKey.OP_READ) 184 == SelectionKey.OP_READ) { 185 conn=(Connection)key.attachment(); 186 ByteBuffer buff=conn.getNIOMsgReader().readCompleteMsgBuffer(); 187 if(buff != null) { 188 receive((Message)Util.objectFromByteBuffer(buff.array())); 189 conn.getNIOMsgReader().reset(); 190 } 191 } 192 } 193 } 194 else { 195 202 synchronized(conns) { 203 Connection pendingConnection; 204 while((pendingSocksList.size() > 0) && (null != (pendingConnection=(Connection)pendingSocksList.remove(0)))) { 205 pendingConnection.init(); 206 } 207 } 208 } 209 } 210 catch(SocketException sock_ex) { 211 if(log.isInfoEnabled()) log.info("exception is " + sock_ex); 212 if(conn != null) 213 conn.destroy(); 214 if(srv_sock == null) 215 break; } 217 catch(Throwable ex) { 218 219 if(log.isWarnEnabled()) log.warn("exception is " + ex); 220 } 221 } 222 } 223 224 227 protected ServerSocket createServerSocket(int start_port) throws Exception { 228 this.selector=Selector.open(); 229 srv_sock_ch=ServerSocketChannel.open(); 230 srv_sock_ch.configureBlocking(false); 231 while(true) { 232 try { 233 if(bind_addr == null) 234 srv_sock_ch.socket().bind(new InetSocketAddress(start_port)); 235 else 236 srv_sock_ch.socket().bind(new InetSocketAddress(bind_addr, start_port), backlog); 237 } 238 catch(BindException bind_ex) { 239 start_port++; 240 continue; 241 } 242 catch(IOException io_ex) { 243 if(log.isErrorEnabled()) log.error("exception is " + io_ex); 244 } 245 srv_port=start_port; 246 break; 247 } 248 pendingSocksList=new ArrayList (); 249 srv_sock_ch.register(this.selector, SelectionKey.OP_ACCEPT); 250 return srv_sock_ch.socket(); 251 } 252 253 class Connection extends ConnectionTable.Connection { 254 private SocketChannel sock_ch=null; 255 private static final int HEADER_SIZE=4; 256 private static final int DEFAULT_BUFF_SIZE=256; 257 final ByteBuffer headerBuffer=ByteBuffer.allocate(HEADER_SIZE); 258 NBMessageForm1_4 nioMsgReader=null; 259 260 Connection(SocketChannel s, Address peer_addr) { 261 super(s.socket(), peer_addr); 262 sock_ch=s; 263 } 264 265 void init() { 266 in=null; 267 out=null; 268 try { 269 sock_ch.configureBlocking(false); 270 nioMsgReader=new NBMessageForm1_4(DEFAULT_BUFF_SIZE, sock_ch); 271 sock_ch.register(selector, SelectionKey.OP_READ, this); 272 } 273 catch(IOException e) { 274 } 275 276 if(log.isInfoEnabled()) log.info("connection was created to " + peer_addr); 277 278 } 279 280 void destroy() { 281 closeSocket(); 282 nioMsgReader=null; 283 } 284 285 void doSend(Message msg) throws Exception { 286 IpAddress dst_addr=(IpAddress)msg.getDest(); 287 byte[] buffie=null; 288 289 if(dst_addr == null || dst_addr.getIpAddress() == null) { 290 if(log.isErrorEnabled()) log.error("the destination address is null; aborting send"); 291 return; 292 } 293 294 try { 295 if(msg.getSrc() == null) 297 msg.setSrc(local_addr); 298 299 buffie=Util.objectToByteBuffer(msg); 300 if(buffie.length <= 0) { 301 if(log.isErrorEnabled()) log.error("buffer.length is 0. Will not send message"); 302 return; 303 } 304 305 headerBuffer.clear(); 306 headerBuffer.putInt(buffie.length); 307 headerBuffer.flip(); 308 Util1_4.writeFully(headerBuffer, sock_ch); 309 ByteBuffer sendBuffer=ByteBuffer.wrap(buffie); 310 Util1_4.writeFully(sendBuffer, sock_ch); 311 } 312 catch(Exception ex) { 313 314 if(log.isErrorEnabled()) 315 log.error("to " + dst_addr + ", exception is " + ex + ", stack trace:\n" + 316 Util.printStackTrace(ex)); 317 remove(dst_addr); 318 throw ex; 319 } 320 } 321 322 void closeSocket() { 323 if(sock != null) { 324 try { 325 sock_ch.close(); 326 } 327 catch(Exception e) { 328 } 329 sock=null; 330 } 331 } 332 333 NBMessageForm1_4 getNIOMsgReader() { 334 return nioMsgReader; 335 } 336 } 337 } | Popular Tags |