1 package org.jgroups.protocols; 2 3 import org.jgroups.blocks.ConnectionTableNIO; 4 import org.jgroups.blocks.BasicConnectionTable; 5 import org.jgroups.Address; 6 import org.jgroups.stack.IpAddress; 7 8 import java.net.InetAddress ; 9 import java.util.Properties ; 10 import java.util.Collection ; 11 12 19 public class TCP_NIO extends BasicTCP implements BasicConnectionTable.Receiver 20 { 21 22 27 protected ConnectionTableNIO getConnectionTable(long ri, long cet, 28 InetAddress b_addr, InetAddress bc_addr, int s_port, int e_port) throws Exception { 29 ConnectionTableNIO retval=null; 30 if (ri == 0 && cet == 0) { 31 retval = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port, false ); 32 } 33 else { 34 if (ri == 0) { 35 ri = 5000; 36 if(log.isWarnEnabled()) log.warn("reaper_interval was 0, set it to " + ri); 37 } 38 if (cet == 0) { 39 cet = 1000 * 60 * 5; 40 if(log.isWarnEnabled()) log.warn("conn_expire_time was 0, set it to " + cet); 41 } 42 retval = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port, ri, cet, false); 43 } 44 45 retval.setProcessorMaxThreads(getProcessorMaxThreads()); 46 retval.setProcessorQueueSize(getProcessorQueueSize()); 47 retval.setProcessorMinThreads(getProcessorMinThreads()); 48 retval.setProcessorKeepAliveTime(getProcessorKeepAliveTime()); 49 retval.setProcessorThreads(getProcessorThreads()); 50 retval.start(); 51 return retval; 52 } 53 54 public String printConnections() {return ct.toString();} 55 56 public void send(Address dest, byte[] data, int offset, int length) throws Exception { 57 ct.send(dest, data, offset, length); 58 } 59 60 public void start() throws Exception { 61 ct=getConnectionTable(reaper_interval,conn_expire_time,bind_addr,external_addr,start_port,end_port); 62 ct.setUseSendQueues(use_send_queues); 63 ct.setReceiveBufferSize(recv_buf_size); 65 ct.setSendBufferSize(send_buf_size); 66 ct.setSocketConnectionTimeout(sock_conn_timeout); 67 ct.setTcpNodelay(tcp_nodelay); 68 ct.setLinger(linger); 69 local_addr=ct.getLocalAddress(); 70 if(additional_data != null && local_addr instanceof IpAddress) 71 ((IpAddress)local_addr).setAdditionalData(additional_data); 72 super.start(); 73 } 74 75 public void retainAll(Collection members) { 76 ct.retainAll(members); 77 } 78 79 public void stop() { 80 ct.stop(); 81 super.stop(); 82 } 83 84 public String getName() { 85 return "TCP_NIO"; 86 } 87 88 public int getReaderThreads() { return m_reader_threads; } 89 public int getWriterThreads() { return m_writer_threads; } 90 public int getProcessorThreads() { return m_processor_threads; } 91 public int getProcessorMinThreads() { return m_processor_minThreads;} 92 public int getProcessorMaxThreads() { return m_processor_maxThreads;} 93 public int getProcessorQueueSize() { return m_processor_queueSize; } 94 public long getProcessorKeepAliveTime() { return m_processor_keepAliveTime; } 95 public int getOpenConnections() {return ct.getNumConnections();} 96 97 98 99 100 public boolean setProperties(Properties props) { 101 String str; 102 103 str=props.getProperty("reader_threads"); 104 if(str != null) { 105 m_reader_threads=Integer.parseInt(str); 106 props.remove("reader_threads"); 107 } 108 109 str=props.getProperty("writer_threads"); 110 if(str != null) { 111 m_writer_threads=Integer.parseInt(str); 112 props.remove("writer_threads"); 113 } 114 115 str=props.getProperty("processor_threads"); 116 if(str != null) { 117 m_processor_threads=Integer.parseInt(str); 118 props.remove("processor_threads"); 119 } 120 121 str=props.getProperty("processor_minThreads"); 122 if(str != null) { 123 m_processor_minThreads=Integer.parseInt(str); 124 props.remove("processor_minThreads"); 125 } 126 127 str=props.getProperty("processor_maxThreads"); 128 if(str != null) { 129 m_processor_maxThreads =Integer.parseInt(str); 130 props.remove("processor_maxThreads"); 131 } 132 133 str=props.getProperty("processor_queueSize"); 134 if(str != null) { 135 m_processor_queueSize=Integer.parseInt(str); 136 props.remove("processor_queueSize"); 137 } 138 139 str=props.getProperty("processor_keepAliveTime"); 140 if(str != null) { 141 m_processor_keepAliveTime=Long.parseLong(str); 142 props.remove("processor_keepAliveTime"); 143 } 144 145 return super.setProperties(props); 146 } 147 148 private int m_reader_threads = 3; 149 150 private int m_writer_threads = 3; 151 152 private int m_processor_threads = 5; private int m_processor_minThreads = 5; private int m_processor_maxThreads = 5; private int m_processor_queueSize=100; private long m_processor_keepAliveTime = Long.MAX_VALUE; private ConnectionTableNIO ct; 161 } | Popular Tags |