1 16 17 package org.apache.catalina.cluster.tcp; 18 19 20 21 22 import java.net.Socket ; 23 import java.net.ServerSocket ; 24 import java.net.InetSocketAddress ; 25 import java.nio.channels.Selector ; 26 27 import org.apache.catalina.cluster.io.ListenCallback; 28 import org.apache.catalina.cluster.io.Jdk13ObjectReader; 29 30 34 public class Jdk13ReplicationListener implements Runnable 35 { 36 37 private static org.apache.commons.logging.Log log = 38 org.apache.commons.logging.LogFactory.getLog( Jdk13ReplicationListener.class ); 39 private ThreadPool pool = null; 40 private boolean doListen = false; 41 private ListenCallback callback; 42 private java.net.InetAddress bind; 43 private int port; 44 private long timeout = 0; 45 ServerSocket serverSocket = null; 46 47 50 private boolean sendAck = true ; 51 54 private boolean compress = true ; 55 56 57 public Jdk13ReplicationListener(ListenCallback callback, 58 int poolSize, 59 java.net.InetAddress bind, 60 int port, 61 long timeout, 62 boolean sendAck) 63 { 64 this.sendAck=sendAck; 65 this.callback = callback; 66 this.bind = bind; 67 this.port = port; 68 this.timeout = timeout; 69 } 70 71 74 public boolean isCompress() { 75 return compress; 76 } 77 78 81 public void setCompress(boolean compress) { 82 this.compress = compress; 83 } 84 public boolean isSendAck() { 85 return sendAck; 86 } 87 public void setSendAck(boolean sendAck) { 88 this.sendAck = sendAck; 89 } 90 91 public void run() 92 { 93 try 94 { 95 listen(); 96 } 97 catch ( Exception x ) 98 { 99 log.fatal("Unable to start cluster listener.",x); 100 } 101 } 102 103 public void listen () 104 throws Exception 105 { 106 doListen = true; 107 serverSocket = new ServerSocket (); 109 serverSocket.bind (new InetSocketAddress (bind,port)); 110 while (doListen) { 111 Socket socket = serverSocket.accept(); 112 ClusterListenThread t = new ClusterListenThread(socket,new Jdk13ObjectReader(socket,callback,compress),sendAck); 113 t.setDaemon(true); 114 t.start(); 115 } serverSocket.close(); 117 } 118 119 public void stopListening(){ 120 doListen = false; 121 try { 122 serverSocket.close(); 123 } catch ( Exception x ) { 124 log.error("Unable to stop the replication listen socket",x); 125 } 126 } 127 128 protected static class ClusterListenThread extends Thread { 129 private Socket socket; 130 private Jdk13ObjectReader reader; 131 private boolean keepRunning = true; 132 private boolean sendAck ; 133 private static byte[] ACK_COMMAND = new byte[] {6,2,3}; 134 ClusterListenThread(Socket socket, Jdk13ObjectReader reader, boolean sendAck) { 135 this.socket = socket; 136 this.reader = reader; 137 this.sendAck = sendAck ; 138 } 139 140 public void run() { 141 try { 142 byte[] buffer = new byte[1024]; 143 while (keepRunning) { 144 java.io.InputStream in = socket.getInputStream(); 145 int cnt = in.read(buffer); 146 int ack = 0; 147 if ( cnt > 0 ) { 148 ack = reader.append(buffer, 0, cnt); 149 } 150 if(sendAck) { 151 while ( ack > 0 ) { 152 sendAck(); 153 ack--; 154 } 155 } 156 } 157 } catch ( Exception x ) { 158 keepRunning = false; 159 log.error("Unable to read data from client, disconnecting.",x); 160 try { socket.close(); } catch ( Exception ignore ) {} 161 } 162 } 163 164 private void sendAck() throws java.io.IOException { 165 socket.getOutputStream().write(ACK_COMMAND); 167 } 168 169 } 170 } 171 | Popular Tags |