KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > net > protocol > delivery > OnceAndOnlyOnceProtocolNetworkLayerImpl


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
3  */

4 package com.tc.net.protocol.delivery;
5
6 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
7
8 import com.tc.async.api.Sink;
9 import com.tc.bytes.TCByteBuffer;
10 import com.tc.exception.TCRuntimeException;
11 import com.tc.logging.TCLogger;
12 import com.tc.logging.TCLogging;
13 import com.tc.net.MaxConnectionsExceededException;
14 import com.tc.net.protocol.NetworkLayer;
15 import com.tc.net.protocol.NetworkStackID;
16 import com.tc.net.protocol.TCNetworkMessage;
17 import com.tc.net.protocol.TCProtocolException;
18 import com.tc.net.protocol.transport.MessageTransport;
19 import com.tc.util.Assert;
20 import com.tc.util.TCTimeoutException;
21
22 import java.io.IOException JavaDoc;
23 import java.net.UnknownHostException JavaDoc;
24
25 /**
26  * NetworkLayer implementation for once and only once message delivery protocol.
27  */

28 public class OnceAndOnlyOnceProtocolNetworkLayerImpl implements OnceAndOnlyOnceProtocolNetworkLayer,
29     OOOProtocolMessageDelivery {
30   private static final TCLogger logger = TCLogging.getLogger(OnceAndOnlyOnceProtocolNetworkLayerImpl.class);
31   private final OOOProtocolMessageFactory messageFactory;
32   private final OOOProtocolMessageParser messageParser;
33   boolean wasConnected = false;
34   private NetworkLayer receiveLayer;
35   private NetworkLayer sendLayer;
36   private GuaranteedDeliveryProtocol delivery;
37
38   public OnceAndOnlyOnceProtocolNetworkLayerImpl(OOOProtocolMessageFactory messageFactory,
39                                                  OOOProtocolMessageParser messageParser, Sink workSink) {
40     this.messageFactory = messageFactory;
41     this.messageParser = messageParser;
42     this.delivery = new GuaranteedDeliveryProtocol(this, workSink, new LinkedQueue());
43     this.delivery.start();
44   }
45
46   /*********************************************************************************************************************
47    * Network layer interface...
48    */

49
50   public void setSendLayer(NetworkLayer layer) {
51     this.sendLayer = layer;
52   }
53
54   public void setReceiveLayer(NetworkLayer layer) {
55     this.receiveLayer = layer;
56   }
57
58   public void send(TCNetworkMessage message) {
59     delivery.send(message);
60   }
61
62   public void receive(TCByteBuffer[] msgData) {
63     OOOProtocolMessage msg = createProtocolMessage(msgData);
64     delivery.receive(msg);
65   }
66
67   public boolean isConnected() {
68     Assert.assertNotNull(sendLayer);
69     return sendLayer.isConnected();
70   }
71
72   public NetworkStackID open() throws TCTimeoutException, UnknownHostException JavaDoc, IOException JavaDoc, MaxConnectionsExceededException {
73     Assert.assertNotNull(sendLayer);
74     return sendLayer.open();
75   }
76
77   public void close() {
78     Assert.assertNotNull(sendLayer);
79
80     // TODO: There is definitely something missing here. We need to cancel/quiesce the delivery instance before closing
81
// the transport
82

83     sendLayer.close();
84   }
85
86   /*********************************************************************************************************************
87    * Transport listener interface...
88    */

89
90   public void notifyTransportConnected(MessageTransport transport) {
91     logNotifyTransportConnected(transport);
92     this.delivery.resume();
93   }
94
95   private void logNotifyTransportConnected(MessageTransport transport) {
96     if (logger.isDebugEnabled()) {
97       logger.debug("notifyTransportConnected(" + transport + ")");
98     }
99   }
100
101   public void notifyTransportDisconnected(MessageTransport transport) {
102     this.delivery.pause();
103   }
104
105   public void notifyTransportConnectAttempt(MessageTransport transport) {
106     //
107
}
108
109   public void notifyTransportClosed(MessageTransport transport) {
110     // XXX: do we do anything here? We've probably done everything we need to do when close() was called.
111
}
112
113   /*********************************************************************************************************************
114    * Protocol Message Delivery interface
115    */

116
117   public void sendAckRequest() {
118     sendToSendLayer(this.messageFactory.createNewAckRequestMessage());
119   }
120
121   public void sendAck(long sequence) {
122     sendToSendLayer(this.messageFactory.createNewAckMessage(sequence));
123   }
124
125   public void sendMessage(OOOProtocolMessage msg) {
126     sendToSendLayer(msg);
127   }
128
129   public void receiveMessage(OOOProtocolMessage msg) {
130     Assert.assertNotNull("Receive layer is null.", this.receiveLayer);
131     Assert.assertNotNull("Attempt to null msg", msg);
132     Assert.eval(msg.isSend());
133
134     this.receiveLayer.receive(msg.getPayload());
135   }
136
137   public OOOProtocolMessage createProtocolMessage(long sequence, final TCNetworkMessage msg) {
138     OOOProtocolMessage rv = messageFactory.createNewSendMessage(sequence, msg);
139
140     final Runnable JavaDoc callback = msg.getSentCallback();
141     if (callback != null) {
142       rv.setSentCallback(new Runnable JavaDoc() {
143         public void run() {
144           callback.run();
145         }
146       });
147     }
148
149     return rv;
150   }
151
152   private void sendToSendLayer(OOOProtocolMessage msg) {
153     // this method doesn't do anything at the moment, but it is a good spot to plug in things you might want to do
154
// every message flowing down from the layer (like logging for example)
155
this.sendLayer.send(msg);
156   }
157
158   private OOOProtocolMessage createProtocolMessage(TCByteBuffer[] msgData) {
159     try {
160       return messageParser.parseMessage(msgData);
161     } catch (TCProtocolException e) {
162       // XXX: this isn't the right thing to do here
163
throw new TCRuntimeException(e);
164     }
165   }
166 }
Popular Tags