|                                                                                                              1
 16
 17  package org.apache.catalina.cluster.tcp;
 18
 19
 20
 21
 22  import java.net.Socket
  ; 23  import java.net.ServerSocket
  ; 24  import java.net.InetSocketAddress
  ; 25  import java.nio.channels.Selector
  ; 26
 27  import org.apache.catalina.cluster.io.ListenCallback;
 28  import org.apache.catalina.cluster.io.Jdk13ObjectReader;
 29
 30
 34  public class Jdk13ReplicationListener implements Runnable
  35  {
 36
 37      private static org.apache.commons.logging.Log log =
 38          org.apache.commons.logging.LogFactory.getLog( Jdk13ReplicationListener.class );
 39      private ThreadPool pool = null;
 40      private boolean doListen = false;
 41      private ListenCallback callback;
 42      private java.net.InetAddress
  bind; 43      private int port;
 44      private long timeout = 0;
 45      ServerSocket
  serverSocket = null; 46
 47
 50      private boolean sendAck = true ;
 51
 54      private boolean compress = true ;
 55
 56
 57      public Jdk13ReplicationListener(ListenCallback callback,
 58                                 int poolSize,
 59                                 java.net.InetAddress
  bind, 60                                 int port,
 61                                 long timeout,
 62                                 boolean sendAck)
 63      {
 64          this.sendAck=sendAck;
 65          this.callback = callback;
 66          this.bind = bind;
 67          this.port = port;
 68          this.timeout = timeout;
 69      }
 70
 71
 74      public boolean isCompress() {
 75          return compress;
 76      }
 77
 78
 81      public void setCompress(boolean compress) {
 82          this.compress = compress;
 83      }
 84      public boolean isSendAck() {
 85          return sendAck;
 86      }
 87      public void setSendAck(boolean sendAck) {
 88          this.sendAck = sendAck;
 89      }
 90
 91      public void run()
 92      {
 93          try
 94          {
 95              listen();
 96          }
 97          catch ( Exception
  x ) 98          {
 99              log.fatal("Unable to start cluster listener.",x);
 100         }
 101     }
 102
 103     public void listen ()
 104         throws Exception
  105     {
 106         doListen = true;
 107                 serverSocket = new ServerSocket
  (); 109         serverSocket.bind (new InetSocketAddress
  (bind,port)); 110         while (doListen) {
 111             Socket
  socket = serverSocket.accept(); 112             ClusterListenThread t = new ClusterListenThread(socket,new Jdk13ObjectReader(socket,callback,compress),sendAck);
 113             t.setDaemon(true);
 114             t.start();
 115         }        serverSocket.close();
 117     }
 118
 119     public void stopListening(){
 120         doListen = false;
 121         try {
 122             serverSocket.close();
 123         } catch ( Exception
  x ) { 124             log.error("Unable to stop the replication listen socket",x);
 125         }
 126     }
 127
 128     protected static class ClusterListenThread extends Thread
  { 129         private Socket
  socket; 130         private Jdk13ObjectReader reader;
 131         private boolean keepRunning = true;
 132         private boolean sendAck ;
 133         private static byte[] ACK_COMMAND = new byte[] {6,2,3};
 134         ClusterListenThread(Socket
  socket, Jdk13ObjectReader reader, boolean sendAck) { 135             this.socket = socket;
 136             this.reader = reader;
 137             this.sendAck = sendAck ;
 138         }
 139
 140         public void run() {
 141             try {
 142                 byte[] buffer = new byte[1024];
 143                 while (keepRunning) {
 144                     java.io.InputStream
  in = socket.getInputStream(); 145                     int cnt = in.read(buffer);
 146                     int ack = 0;
 147                     if ( cnt > 0 ) {
 148                         ack = reader.append(buffer, 0, cnt);
 149                     }
 150                     if(sendAck) {
 151                         while ( ack > 0 ) {
 152                             sendAck();
 153                             ack--;
 154                         }
 155                     }
 156                 }
 157             } catch ( Exception
  x ) { 158                 keepRunning = false;
 159                 log.error("Unable to read data from client, disconnecting.",x);
 160                 try { socket.close(); } catch ( Exception
  ignore ) {} 161             }
 162         }
 163
 164         private void sendAck() throws java.io.IOException
  { 165                         socket.getOutputStream().write(ACK_COMMAND);
 167         }
 168
 169     }
 170 }
 171
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |