1 16 17 package org.apache.catalina.cluster.tcp; 18 import java.io.IOException ; 19 import java.nio.ByteBuffer ; 20 import java.nio.channels.SelectionKey ; 21 import java.nio.channels.SocketChannel ; 22 23 import org.apache.catalina.cluster.io.ObjectReader; 24 25 38 public class TcpReplicationThread extends WorkerThread { 39 private static final byte[] ACK_COMMAND = new byte[] {6, 2, 3}; 40 private static org.apache.commons.logging.Log log = 41 org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class ); 42 private ByteBuffer buffer = ByteBuffer.allocate (1024); 43 private SelectionKey key; 44 private boolean waitForAck=true; 45 46 TcpReplicationThread () 47 { 48 } 49 50 public synchronized void run() 52 { 53 while (doRun) { 54 try { 55 this.wait(); 57 } catch (InterruptedException e) { 58 if(log.isInfoEnabled()) 59 log.info("TCP worker thread interrupted in cluster",e); 60 Thread.interrupted(); 62 } 63 if (key == null) { 64 continue; } 66 try { 67 drainChannel (key); 68 } catch (Exception e) { 69 log.error ("TCP Worker thread in cluster caught '" 70 + e + "' closing channel", e); 71 72 try { 74 key.channel().close(); 75 } catch (IOException ex) { 76 log.error("Unable to close channel.",ex); 77 } 78 key.selector().wakeup(); 79 } 80 key = null; 81 this.pool.returnWorker (this); 83 } 84 } 85 86 97 synchronized void serviceChannel (SelectionKey key, boolean waitForAck) 98 { 99 this.key = key; 100 this.waitForAck=waitForAck; 101 key.interestOps (key.interestOps() & (~SelectionKey.OP_READ)); 102 key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE)); 103 this.notify(); } 105 106 114 private void drainChannel (SelectionKey key) 115 throws Exception 116 { 117 boolean packetReceived=false; 118 SocketChannel channel = (SocketChannel ) key.channel(); 119 int count; 120 buffer.clear(); ObjectReader reader = (ObjectReader)key.attachment(); 122 while ((count = channel.read (buffer)) > 0) { 124 buffer.flip(); reader.append(buffer.array(),0,count); 126 buffer.clear(); } 128 int pkgcnt = reader.execute(); 130 if (waitForAck) { 131 while ( pkgcnt > 0 ) { 132 sendAck(key,channel); 133 pkgcnt--; 134 } 135 } 136 137 if (count < 0) { 138 channel.close(); 140 return; 141 } 142 143 Object mutex = this.getPool().getInterestOpsMutex(); 145 synchronized (mutex) { 146 key.selector().wakeup(); 148 int resumeOps = key.interestOps() | SelectionKey.OP_READ; 150 key.interestOps(resumeOps); 151 } 152 153 } 154 155 160 private void sendAck(SelectionKey key, SocketChannel channel) { 161 162 try { 163 channel.write(ByteBuffer.wrap(ACK_COMMAND)); 164 } catch ( java.io.IOException x ) { 165 log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage()); 166 } 167 } 168 } 169 | Popular Tags |