1 16 17 package org.apache.catalina.cluster.tcp; 18 19 20 import java.net.InetSocketAddress ; 21 import java.net.ServerSocket ; 22 import java.nio.channels.SelectableChannel ; 23 import java.nio.channels.SelectionKey ; 24 import java.nio.channels.Selector ; 25 import java.nio.channels.ServerSocketChannel ; 26 import java.nio.channels.SocketChannel ; 27 import java.util.Iterator ; 28 29 import org.apache.catalina.cluster.CatalinaCluster; 30 import org.apache.catalina.cluster.ClusterReceiver; 31 import org.apache.catalina.cluster.tcp.Constants; 32 import org.apache.catalina.cluster.io.ListenCallback; 33 import org.apache.catalina.cluster.io.ObjectReader; 34 import org.apache.catalina.util.StringManager; 35 42 public class ReplicationListener implements Runnable ,ClusterReceiver 43 { 44 private static org.apache.commons.logging.Log log = 45 org.apache.commons.logging.LogFactory.getLog( ReplicationListener.class ); 46 47 50 private static final String info = "ReplicationListener/1.1"; 51 52 55 protected StringManager sm = StringManager.getManager(Constants.Package); 56 57 58 private ThreadPool pool = null; 59 private boolean doListen = false; 60 private ListenCallback callback; 61 private java.net.InetAddress bind; 62 private String tcpListenAddress; 63 private int tcpThreadCount; 64 private long tcpSelectorTimeout; 65 private int tcpListenPort; 66 private boolean sendAck; 67 70 private boolean compress = true ; 71 72 private Selector selector = null; 73 74 private Object interestOpsMutex = new Object (); 75 76 public ReplicationListener() { 77 } 78 79 82 public boolean isCompress() { 83 return compress; 84 } 85 86 89 public void setCompress(boolean compressMessageData) { 90 this.compress = compressMessageData; 91 } 92 93 97 public void start() { 98 try { 99 pool = new ThreadPool(tcpThreadCount, TcpReplicationThread.class, interestOpsMutex); 100 if ( "auto".equals(tcpListenAddress) ) { 101 tcpListenAddress = java.net.InetAddress.getLocalHost(). 102 getHostAddress(); 103 } 104 if(log.isDebugEnabled()) 105 log.debug("Starting replication listener on address:"+tcpListenAddress); 106 bind = java.net.InetAddress.getByName(tcpListenAddress); 107 Thread t = new Thread (this,"ClusterReceiver"); 108 t.setDaemon(true); 109 t.start(); 110 } catch ( Exception x ) { 111 log.fatal("Unable to start cluster receiver",x); 112 } 113 114 } 115 116 public void stop() { 117 stopListening(); 118 } 119 120 121 public void run() 122 { 123 try 124 { 125 listen(); 126 } 127 catch ( Exception x ) 128 { 129 log.error("Unable to start cluster listener.",x); 130 } 131 } 132 133 138 public void listen () 139 throws Exception 140 { 141 doListen = true; 142 ServerSocketChannel serverChannel = ServerSocketChannel.open(); 144 ServerSocket serverSocket = serverChannel.socket(); 146 selector = Selector.open(); 148 serverSocket.bind (new InetSocketAddress (bind,tcpListenPort)); 150 serverChannel.configureBlocking (false); 152 serverChannel.register (selector, SelectionKey.OP_ACCEPT); 154 while (doListen) { 155 try { 158 159 int n = selector.select(tcpSelectorTimeout); 160 if (n == 0) { 161 synchronized (interestOpsMutex) { 167 } 171 continue; } 173 Iterator it = selector.selectedKeys().iterator(); 175 while (it.hasNext()) { 177 SelectionKey key = (SelectionKey ) it.next(); 178 if (key.isAcceptable()) { 180 ServerSocketChannel server = 181 (ServerSocketChannel ) key.channel(); 182 SocketChannel channel = server.accept(); 183 Object attach = attach = new ObjectReader(channel, selector, 184 callback,isCompress()) ; 185 registerChannel(selector, 186 channel, 187 SelectionKey.OP_READ, 188 attach); 189 } 190 if (key.isReadable()) { 192 readDataFromSocket(key); 193 } else { 194 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); 195 } 196 197 it.remove(); 199 } 200 } 201 catch (java.nio.channels.CancelledKeyException nx) { 202 log.warn( 203 "Replication client disconnected, error when polling key. Ignoring client."); 204 } 205 catch (Exception x) { 206 log.error("Unable to process request in ReplicationListener", x); 207 } 208 209 } 210 serverChannel.close(); 211 selector.close(); 212 } 213 214 public void stopListening(){ 215 doListen = false; 216 if ( selector != null ) { 217 try { 218 selector.close(); 219 selector = null; 220 } catch ( Exception x ) { 221 log.error("Unable to close cluster receiver selector.",x); 222 } 223 } 224 } 225 226 public void setCatalinaCluster(CatalinaCluster cluster) { 227 callback = cluster; 228 } 229 230 public CatalinaCluster getCatalinaCluster() { 231 return (CatalinaCluster)callback ; 232 } 233 234 236 240 protected void registerChannel (Selector selector, 241 SelectableChannel channel, 242 int ops, 243 Object attach) 244 throws Exception { 245 if (channel == null) return; channel.configureBlocking (false); 248 channel.register (selector, ops, attach); 250 } 251 252 254 262 protected void readDataFromSocket (SelectionKey key) 263 throws Exception 264 { 265 TcpReplicationThread worker = (TcpReplicationThread)pool.getWorker(); 266 if (worker == null) { 267 if(log.isDebugEnabled()) 272 log.debug("No TcpReplicationThread available"); 273 } else { 274 worker.serviceChannel(key, sendAck); 276 } 277 } 278 public String getTcpListenAddress() { 279 return tcpListenAddress; 280 } 281 public void setTcpListenAddress(String tcpListenAddress) { 282 this.tcpListenAddress = tcpListenAddress; 283 } 284 public int getTcpListenPort() { 285 return tcpListenPort; 286 } 287 public void setTcpListenPort(int tcpListenPort) { 288 this.tcpListenPort = tcpListenPort; 289 } 290 public long getTcpSelectorTimeout() { 291 return tcpSelectorTimeout; 292 } 293 public void setTcpSelectorTimeout(long tcpSelectorTimeout) { 294 this.tcpSelectorTimeout = tcpSelectorTimeout; 295 } 296 public int getTcpThreadCount() { 297 return tcpThreadCount; 298 } 299 public void setTcpThreadCount(int tcpThreadCount) { 300 this.tcpThreadCount = tcpThreadCount; 301 } 302 public boolean isSendAck() { 303 return sendAck; 304 } 305 public void setSendAck(boolean sendAck) { 306 this.sendAck = sendAck; 307 } 308 309 public String getHost() { 310 return getTcpListenAddress(); 311 } 312 313 public int getPort() { 314 return getTcpListenPort(); 315 } 316 public Object getInterestOpsMutex() { 317 return interestOpsMutex; 318 } 319 320 } 321 | Popular Tags |