KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > TCP_NIO


1 package org.jgroups.protocols;
2
3 import org.jgroups.blocks.ConnectionTableNIO;
4 import org.jgroups.blocks.BasicConnectionTable;
5 import org.jgroups.Address;
6 import org.jgroups.stack.IpAddress;
7
8 import java.net.InetAddress JavaDoc;
9 import java.util.Properties JavaDoc;
10 import java.util.Collection JavaDoc;
11
12 /**
13  * Transport using NIO
14  * @author Scott Marlow
15  * @author Alex Fu
16  * @author Bela Ban
17  * @version $Id: TCP_NIO.java,v 1.14 2007/04/27 07:59:20 belaban Exp $
18  */

19 public class TCP_NIO extends BasicTCP implements BasicConnectionTable.Receiver
20 {
21
22    /*
23    * (non-Javadoc)
24    *
25    * @see org.jgroups.protocols.TCP#getConnectionTable(long, long)
26    */

27    protected ConnectionTableNIO getConnectionTable(long ri, long cet,
28                                                    InetAddress JavaDoc b_addr, InetAddress JavaDoc bc_addr, int s_port, int e_port) throws Exception JavaDoc {
29        ConnectionTableNIO retval=null;
30        if (ri == 0 && cet == 0) {
31            retval = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port, false );
32        }
33        else {
34            if (ri == 0) {
35                ri = 5000;
36                if(log.isWarnEnabled()) log.warn("reaper_interval was 0, set it to " + ri);
37            }
38            if (cet == 0) {
39                cet = 1000 * 60 * 5;
40                if(log.isWarnEnabled()) log.warn("conn_expire_time was 0, set it to " + cet);
41            }
42            retval = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port, ri, cet, false);
43        }
44
45        retval.setProcessorMaxThreads(getProcessorMaxThreads());
46        retval.setProcessorQueueSize(getProcessorQueueSize());
47        retval.setProcessorMinThreads(getProcessorMinThreads());
48        retval.setProcessorKeepAliveTime(getProcessorKeepAliveTime());
49        retval.setProcessorThreads(getProcessorThreads());
50        retval.start();
51        return retval;
52    }
53
54     public String JavaDoc printConnections() {return ct.toString();}
55
56    public void send(Address dest, byte[] data, int offset, int length) throws Exception JavaDoc {
57       ct.send(dest, data, offset, length);
58    }
59
60    public void start() throws Exception JavaDoc {
61        ct=getConnectionTable(reaper_interval,conn_expire_time,bind_addr,external_addr,start_port,end_port);
62        ct.setUseSendQueues(use_send_queues);
63        // ct.addConnectionListener(this);
64
ct.setReceiveBufferSize(recv_buf_size);
65        ct.setSendBufferSize(send_buf_size);
66        ct.setSocketConnectionTimeout(sock_conn_timeout);
67        ct.setTcpNodelay(tcp_nodelay);
68        ct.setLinger(linger);
69        local_addr=ct.getLocalAddress();
70        if(additional_data != null && local_addr instanceof IpAddress)
71            ((IpAddress)local_addr).setAdditionalData(additional_data);
72        super.start();
73    }
74
75    public void retainAll(Collection JavaDoc members) {
76       ct.retainAll(members);
77    }
78
79    public void stop() {
80        ct.stop();
81        super.stop();
82    }
83
84    public String JavaDoc getName() {
85         return "TCP_NIO";
86     }
87
88    public int getReaderThreads() { return m_reader_threads; }
89    public int getWriterThreads() { return m_writer_threads; }
90    public int getProcessorThreads() { return m_processor_threads; }
91    public int getProcessorMinThreads() { return m_processor_minThreads;}
92    public int getProcessorMaxThreads() { return m_processor_maxThreads;}
93    public int getProcessorQueueSize() { return m_processor_queueSize; }
94    public long getProcessorKeepAliveTime() { return m_processor_keepAliveTime; }
95    public int getOpenConnections() {return ct.getNumConnections();}
96
97     
98
99    /** Setup the Protocol instance acording to the configuration string */
100    public boolean setProperties(Properties JavaDoc props) {
101        String JavaDoc str;
102
103        str=props.getProperty("reader_threads");
104        if(str != null) {
105           m_reader_threads=Integer.parseInt(str);
106           props.remove("reader_threads");
107        }
108
109        str=props.getProperty("writer_threads");
110        if(str != null) {
111           m_writer_threads=Integer.parseInt(str);
112           props.remove("writer_threads");
113        }
114
115        str=props.getProperty("processor_threads");
116        if(str != null) {
117           m_processor_threads=Integer.parseInt(str);
118           props.remove("processor_threads");
119        }
120
121       str=props.getProperty("processor_minThreads");
122       if(str != null) {
123          m_processor_minThreads=Integer.parseInt(str);
124          props.remove("processor_minThreads");
125       }
126
127       str=props.getProperty("processor_maxThreads");
128       if(str != null) {
129          m_processor_maxThreads =Integer.parseInt(str);
130          props.remove("processor_maxThreads");
131       }
132
133       str=props.getProperty("processor_queueSize");
134       if(str != null) {
135          m_processor_queueSize=Integer.parseInt(str);
136          props.remove("processor_queueSize");
137       }
138
139       str=props.getProperty("processor_keepAliveTime");
140       if(str != null) {
141          m_processor_keepAliveTime=Long.parseLong(str);
142          props.remove("processor_keepAliveTime");
143       }
144
145       return super.setProperties(props);
146    }
147
148    private int m_reader_threads = 3;
149
150    private int m_writer_threads = 3;
151
152    private int m_processor_threads = 5; // PooledExecutor.createThreads()
153
private int m_processor_minThreads = 5; // PooledExecutor.setMinimumPoolSize()
154
private int m_processor_maxThreads = 5; // PooledExecutor.setMaxThreads()
155
private int m_processor_queueSize=100; // Number of queued requests that can be pending waiting
156
// for a background thread to run the request.
157
private long m_processor_keepAliveTime = Long.MAX_VALUE; // PooledExecutor.setKeepAliveTime( milliseconds);
158
// negative value used to mean (before 2.5 release) to wait forever,
159
// instead set to Long.MAX_VALUE to keep alive forever
160
private ConnectionTableNIO ct;
161 }
Popular Tags