KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > fr > dyade > aaa > util > ReliableTcpConnection


1 /*
2  * Copyright (C) 2004 - ScalAgent Distributed Technologies
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
17  * USA.
18  *
19  * Initial developer(s): ScalAgent Distributed Technologies
20  * Contributor(s):
21  */

22 package fr.dyade.aaa.util;
23
24 import java.io.*;
25 import java.util.*;
26 import java.net.*;
27
28 import org.objectweb.joram.shared.client.AbstractJmsMessage;
29 import org.objectweb.joram.shared.client.AbstractJmsReply;
30 import org.objectweb.joram.shared.stream.StreamUtil;
31
32 import org.objectweb.util.monolog.api.BasicLevel;
33 import org.objectweb.util.monolog.api.Logger;
34
35 public class ReliableTcpConnection {
36
37   public static final int INIT = 0;
38   public static final int CONNECT = 1;
39   public static final int CONNECTING = 2;
40
41   public static final String JavaDoc[] statusNames =
42   {"INIT", "CONNECT", "CONNECTING"};
43
44   public static Logger logger =
45       Debug.getLogger("fr.dyade.aaa.util.ReliableTcpConnection");
46
47   public static String JavaDoc WINDOW_SIZE_PROP_NAME =
48       "fr.dyade.aaa.util.ReliableTcpConnection.windowSize";
49
50   public static int DEFAULT_WINDOW_SIZE = 100;
51
52   private int windowSize;
53
54   private volatile long inputCounter;
55
56   private long outputCounter;
57
58   private volatile int unackCounter;
59
60   private Vector pendingMessages;
61
62   private Socket sock;
63
64   private NetOutputStream nos;
65
66   private BufferedInputStream bis;
67
68   private Object JavaDoc inputLock;
69
70   private Object JavaDoc outputLock;
71
72   private int status;
73   
74   private java.util.Timer JavaDoc timer;
75
76   public ReliableTcpConnection(java.util.Timer JavaDoc timer2) {
77     windowSize = Integer.getInteger(
78       WINDOW_SIZE_PROP_NAME,
79       DEFAULT_WINDOW_SIZE).intValue();
80     if (logger.isLoggable(BasicLevel.INFO))
81       logger.log(BasicLevel.INFO,
82                  "ReliableTcpConnection.windowSize=" + windowSize);
83     timer = timer2;
84     inputCounter = -1;
85     outputCounter = 0;
86     unackCounter = 0;
87     pendingMessages = new Vector();
88     inputLock = new Object JavaDoc();
89     outputLock = new Object JavaDoc();
90     
91     setStatus(INIT);
92   }
93
94   private synchronized void setStatus(int status) {
95     if (logger.isLoggable(BasicLevel.DEBUG))
96       logger.log(BasicLevel.DEBUG,
97                  "ReliableTcpConnection.setStatus(" + statusNames[status] + ')');
98     this.status = status;
99   }
100
101   private final synchronized int getStatus() {
102     return status;
103   }
104
105   public void init(Socket sock) throws IOException {
106     if (logger.isLoggable(BasicLevel.DEBUG))
107       logger.log(BasicLevel.DEBUG, "ReliableTcpConnection.init()");
108     synchronized (this) {
109       if (getStatus() != INIT)
110         throw new IOException("Already connected");
111       setStatus(CONNECTING);
112     }
113
114     try {
115       this.sock = sock;
116
117       synchronized (outputLock) {
118         nos = new NetOutputStream(sock);
119         
120         synchronized (pendingMessages) {
121           for (int i = 0; i < pendingMessages.size(); i++) {
122             TcpMessage pendingMsg = (TcpMessage) pendingMessages.elementAt(i);
123             doSend(pendingMsg.id, inputCounter, pendingMsg.object);
124           }
125         }
126       }
127
128       synchronized (inputLock) {
129         bis = new BufferedInputStream(sock.getInputStream());
130       }
131
132       setStatus(CONNECT);
133     } catch (IOException exc) {
134       if (logger.isLoggable(BasicLevel.DEBUG))
135         logger.log(BasicLevel.DEBUG, "", exc);
136       close();
137       throw exc;
138     }
139   }
140
141   public void send(AbstractJmsMessage request) throws IOException {
142     if (logger.isLoggable(BasicLevel.DEBUG))
143       logger.log(BasicLevel.DEBUG,
144                  "ReliableTcpConnection.send(" + request + ')');
145
146     if (getStatus() != CONNECT)
147       throw new IOException("Connection closed");
148     try {
149       synchronized (outputLock) {
150         doSend(outputCounter, inputCounter, request);
151         addPendingMessage(new TcpMessage(
152           outputCounter, request));
153         outputCounter++;
154       }
155     } catch (IOException exc) {
156       if (logger.isLoggable(BasicLevel.DEBUG))
157         logger.log(BasicLevel.DEBUG, "ReliableTcpConnection.send()", exc);
158       close();
159       throw exc;
160     }
161   }
162   
163   private void doSend(long id, long ackId, AbstractJmsMessage msg) throws IOException {
164     if (logger.isLoggable(BasicLevel.DEBUG))
165       logger.log(BasicLevel.DEBUG,
166                  "ReliableTcpConnection.doSend(" + id + ',' + ackId + ',' + msg + ')');
167     synchronized (outputLock) {
168       nos.send(id, ackId, msg);
169       unackCounter = 0;
170     }
171   }
172
173   static class NetOutputStream extends ByteArrayOutputStream {
174     private OutputStream os = null;
175
176     NetOutputStream(Socket sock) throws IOException {
177       super(1024);
178       reset();
179       os = sock.getOutputStream();
180     }
181
182     public void reset() {
183       count = 4;
184     }
185
186     void send(long id, long ackId, AbstractJmsMessage msg) throws IOException {
187       try {
188         StreamUtil.writeTo(id, this);
189         StreamUtil.writeTo(ackId, this);
190         AbstractJmsMessage.write(msg, this);
191
192         buf[0] = (byte) ((count -4) >>> 24);
193         buf[1] = (byte) ((count -4) >>> 16);
194         buf[2] = (byte) ((count -4) >>> 8);
195         buf[3] = (byte) ((count -4) >>> 0);
196
197         writeTo(os);
198         os.flush();
199       } finally {
200         reset();
201       }
202     }
203   }
204
205   private void addPendingMessage(TcpMessage msg) {
206     if (logger.isLoggable(BasicLevel.DEBUG))
207       logger.log(BasicLevel.DEBUG,
208                  "ReliableTcpConnection.addPendingMessage(" + msg + ')');
209     synchronized (pendingMessages) {
210       pendingMessages.addElement(msg);
211     }
212   }
213
214   private void ackPendingMessages(long ackId) {
215     if (logger.isLoggable(BasicLevel.DEBUG))
216       logger.log(BasicLevel.DEBUG,
217                  "ReliableTcpConnection.ackPendingMessages(" + ackId + ')');
218     synchronized (pendingMessages) {
219       while (pendingMessages.size() > 0) {
220         TcpMessage pendingMsg =
221           (TcpMessage)pendingMessages.elementAt(0);
222         if (ackId < pendingMsg.id) {
223           // It's an old acknowledge
224
break;
225         } else {
226           pendingMessages.removeElementAt(0);
227         }
228       }
229     }
230   }
231
232   public AbstractJmsReply receive() throws Exception JavaDoc {
233     if (logger.isLoggable(BasicLevel.DEBUG))
234       logger.log(BasicLevel.DEBUG,
235                  "ReliableTcpConnection.receive()");
236     if (getStatus() != CONNECT)
237       throw new IOException("Connection closed");
238     loop:
239     while (true) {
240       try {
241         long messageId;
242         long ackId;
243         AbstractJmsReply obj;
244
245         synchronized (inputLock) {
246           int len = StreamUtil.readIntFrom(bis);
247           messageId = StreamUtil.readLongFrom(bis);
248           ackId = StreamUtil.readLongFrom(bis);
249           obj = (AbstractJmsReply) AbstractJmsMessage.read(bis);
250         }
251         if (logger.isLoggable(BasicLevel.DEBUG))
252           logger.log(BasicLevel.DEBUG, " -> id = " + messageId);
253         ackPendingMessages(ackId);
254         if (obj != null) {
255           if (unackCounter < windowSize) {
256             if (logger.isLoggable(BasicLevel.DEBUG))
257               logger.log(BasicLevel.DEBUG, " -> unackCounter++");
258             unackCounter++;
259           } else {
260             if (logger.isLoggable(BasicLevel.DEBUG))
261               logger.log(BasicLevel.DEBUG, " -> schedule");
262             AckTimerTask ackTimertask = new AckTimerTask();
263             timer.schedule(ackTimertask, 0);
264           }
265           if (messageId > inputCounter) {
266             inputCounter = messageId;
267             return obj;
268           } else if (logger.isLoggable(BasicLevel.DEBUG))
269             logger.log(BasicLevel.DEBUG,
270                        " -> already received message: " + messageId + " " + obj);
271         }
272       } catch (IOException exc) {
273         if (logger.isLoggable(BasicLevel.DEBUG))
274           logger.log(BasicLevel.DEBUG, "", exc);
275         close();
276         throw exc;
277       }
278     }
279   }
280
281   public void close() {
282     if (logger.isLoggable(BasicLevel.DEBUG))
283       logger.log(BasicLevel.DEBUG, "ReliableTcpConnection.close()");
284     if (getStatus() == INIT)
285       return;
286 // Remove for SSL (bis.close() ==> lock)
287
// try {
288
// if (bis != null) bis.close();
289
// } catch (IOException exc) {}
290
try {
291       sock.getOutputStream().close();
292     } catch (IOException exc) {}
293     try {
294       sock.close();
295     } catch (IOException exc) {}
296     setStatus(INIT);
297   }
298
299   static class TcpMessage {
300     long id;
301     AbstractJmsMessage object;
302
303     TcpMessage(long id, AbstractJmsMessage object) {
304       this.id = id;
305       this.object = object;
306     }
307
308     public String JavaDoc toString() {
309       return '(' + super.toString() +
310         ",id=" + id +
311         ",object=" + object + ')';
312     }
313   }
314
315   class AckTimerTask extends java.util.TimerTask JavaDoc {
316     public void run() {
317       if (logger.isLoggable(BasicLevel.DEBUG))
318         logger.log(BasicLevel.DEBUG, "AckTimerTask.run()");
319       try {
320         doSend(-1, inputCounter, null);
321         cancel();
322       } catch (IOException exc) {
323         if (logger.isLoggable(BasicLevel.DEBUG))
324           logger.log(BasicLevel.DEBUG, "", exc);
325       }
326     }
327   }
328 }
329
Popular Tags