1 4 package com.tc.net.protocol.transport; 5 6 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 7 import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef; 8 9 import com.tc.net.protocol.NetworkMessageSink; 10 import com.tc.net.protocol.TCNetworkMessage; 11 12 import junit.framework.Assert; 13 14 abstract class HandshakeMessageResponderBase implements NetworkMessageSink, HandshakeMessageResponder { 15 protected final ConnectionID assignedConnectionId; 16 private final MessageTransportBase transport; 17 protected final TransportHandshakeMessageFactory messageFactory; 18 protected LinkedQueue sentQueue; 19 protected LinkedQueue receivedQueue; 20 private final SynchronizedRef errorRef; 21 22 protected HandshakeMessageResponderBase(LinkedQueue sentQueue, LinkedQueue receivedQueue, 23 TransportHandshakeMessageFactory messageFactory, 24 ConnectionID assignedConnectionId, MessageTransportBase transport, 25 SynchronizedRef errorRef) { 26 super(); 27 this.sentQueue = sentQueue; 28 this.receivedQueue = receivedQueue; 29 this.messageFactory = messageFactory; 30 this.assignedConnectionId = assignedConnectionId; 31 this.transport = transport; 32 this.errorRef = errorRef; 33 } 34 35 public void putMessage(TCNetworkMessage msg) { 36 Assert.assertTrue(msg instanceof TransportHandshakeMessage); 37 TransportHandshakeMessage message = (TransportHandshakeMessage) msg; 38 39 try { 40 this.receivedQueue.put(message); 41 handleHandshakeMessage(message); 42 } catch (InterruptedException e) { 43 setError(e); 44 } 45 } 46 47 protected void setError(Exception e) { 48 e.printStackTrace(); 49 errorRef.set(e); 50 } 51 52 protected void sendResponseMessage(final TransportHandshakeMessage responseMessage) { 53 new Thread (new Runnable () { 54 public void run() { 55 try { 56 sentQueue.put(responseMessage); 57 transport.receiveTransportMessage(responseMessage); 58 } catch (Exception e) { 59 setError(e); 60 } 61 } 62 }).start(); 63 } 64 } | Popular Tags |