1 4 package com.tc.net.protocol; 5 6 import org.apache.commons.io.CopyUtils; 7 8 import com.tc.bytes.TCByteBuffer; 9 import com.tc.io.TCByteBufferInputStream; 10 import com.tc.io.TCByteBufferOutputStream; 11 import com.tc.net.core.TCConnection; 12 import com.tc.net.core.Verifier; 13 import com.tc.net.core.event.TCConnectionErrorEvent; 14 import com.tc.net.core.event.TCConnectionEvent; 15 import com.tc.net.core.event.TCConnectionEventListener; 16 import com.tc.util.Assert; 17 18 import java.io.IOException ; 19 import java.util.HashMap ; 20 import java.util.Map ; 21 22 27 public class EchoSink implements GenericNetworkMessageSink, TCConnectionEventListener { 28 29 private final Map states = new HashMap (); 30 31 public interface ErrorListener { 32 void error(Throwable t); 33 } 34 35 private final ErrorListener listener; 36 private final boolean verify; 37 38 private static final ErrorListener defaultListener = new ErrorListener() { 39 public void error(Throwable t) { 40 t.printStackTrace(); 41 } 42 }; 43 44 public EchoSink() { 45 this(false); 46 } 47 48 public EchoSink(boolean verify) { 49 this(verify, defaultListener); 50 } 51 52 public EchoSink(boolean verify, ErrorListener listener) { 53 this.verify = verify; 54 this.listener = listener; 55 } 56 57 public void putMessage(GenericNetworkMessage msg) { 58 try { 59 putMessage0(msg); 60 } catch (Throwable t) { 61 listener.error(t); 62 } 63 } 64 65 public void putMessage0(GenericNetworkMessage msg) throws IOException { 66 final TCConnection source = msg.getSource(); 67 68 if (verify) { 69 verifyIncomingMessage(source, msg); 70 } 71 72 TCByteBuffer[] recvData = msg.getPayload(); 74 TCByteBufferOutputStream out = new TCByteBufferOutputStream(); 75 TCByteBufferInputStream in = new TCByteBufferInputStream(recvData); 76 77 final int bytesCopied = CopyUtils.copy(in, out); 78 Assert.assertEquals(bytesCopied, msg.getDataLength()); 79 80 GenericNetworkMessage send = new GenericNetworkMessage(source, out.toArray()); 81 Assert.assertEquals(msg.getDataLength(), send.getDataLength()); 82 send.setSequence(msg.getSequence()); 83 send.setClientNum(msg.getClientNum()); 84 85 if (verify) { 86 compareData(msg.getEntireMessageData(), send.getEntireMessageData()); 87 } 88 89 source.putMessage(send); 90 } 91 92 static void compareData(TCByteBuffer[] in, TCByteBuffer[] out) { 93 TCByteBufferInputStream ins = new TCByteBufferInputStream(in); 94 TCByteBufferInputStream outs = new TCByteBufferInputStream(out); 95 96 final int numBytes = ins.available(); 97 if (numBytes != outs.available()) { throw new RuntimeException ("different data lengths: " + numBytes + " vs " 98 + outs.available()); } 99 100 for (int i = 0; i < numBytes; i++) { 101 final int inByte = ins.read(); 102 final int outByte = outs.read(); 103 104 if ((inByte == -1) || (outByte == -1)) { throw new RuntimeException ("premature EOF in stream"); } 105 106 if (inByte != outByte) { throw new RuntimeException ("different byte " + inByte + " != " + outByte); } 107 } 108 } 109 110 private void verifyIncomingMessage(TCConnection source, GenericNetworkMessage msg) { 111 final Verifier verifier; 112 synchronized (states) { 113 if (!states.containsKey(source)) { 114 states.put(source, new Verifier(msg.getClientNum())); 115 } 116 verifier = (Verifier) states.get(source); 117 source.addListener(this); 118 } 119 120 verifier.putMessage(msg); 121 } 122 123 public void connectEvent(TCConnectionEvent event) { 124 } 126 127 public void closeEvent(TCConnectionEvent event) { 128 synchronized (states) { 129 states.remove(event.getSource()); 130 } 131 } 132 133 public void errorEvent(TCConnectionErrorEvent errorEvent) { 134 } 136 137 public void endOfFileEvent(TCConnectionEvent event) { 138 } 140 } 141 142 | Popular Tags |