KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > proxies > tcp > IOControl


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2004 - ScalAgent Distributed Technologies
4  * Copyright (C) 2004 - France Telecom R&D
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): ScalAgent Distributed Technologies
22  */

23 package org.objectweb.joram.mom.proxies.tcp;
24
25 import java.io.*;
26 import java.util.*;
27 import java.net.*;
28
29 import org.objectweb.joram.shared.client.AbstractJmsMessage;
30 import org.objectweb.joram.shared.client.AbstractJmsRequest;
31 import org.objectweb.joram.mom.proxies.*;
32 import org.objectweb.joram.shared.stream.StreamUtil;
33
34 import fr.dyade.aaa.util.Debug;
35 import org.objectweb.util.monolog.api.BasicLevel;
36 import org.objectweb.util.monolog.api.Logger;
37
38 public class IOControl {
39   public static Logger logger = Debug.getLogger(IOControl.class.getName());
40
41   private long inputCounter;
42
43   private Socket sock;
44
45   private NetOutputStream nos;
46
47   private BufferedInputStream bis;
48
49   private int windowSize;
50
51   private int unackCounter;
52
53   public IOControl(Socket sock) throws IOException {
54     this(sock, -1);
55   }
56     
57   public IOControl(Socket sock,
58            long inputCounter) throws IOException {
59     windowSize = Integer.getInteger(
60       fr.dyade.aaa.util.ReliableTcpConnection.WINDOW_SIZE_PROP_NAME,
61       fr.dyade.aaa.util.ReliableTcpConnection.DEFAULT_WINDOW_SIZE).intValue();
62     unackCounter = 0;
63     this.inputCounter = inputCounter;
64     this.sock = sock;
65
66     nos = new NetOutputStream(sock);
67     bis = new BufferedInputStream(sock.getInputStream());
68   }
69
70   public synchronized void send(ProxyMessage msg) throws IOException {
71     if (logger.isLoggable(BasicLevel.DEBUG))
72       logger.log(BasicLevel.DEBUG, "IOControl.send:" + msg);
73
74     try {
75       nos.send(msg.getId(), msg.getAckId(), msg.getObject());
76       unackCounter = 0;
77     } catch (IOException exc) {
78       if (logger.isLoggable(BasicLevel.DEBUG))
79         logger.log(BasicLevel.DEBUG, "IOControl.send", exc);
80       close();
81       throw exc;
82     }
83   }
84
85   static class NetOutputStream extends ByteArrayOutputStream {
86     private OutputStream os = null;
87
88     NetOutputStream(Socket sock) throws IOException {
89       super(1024);
90       reset();
91       os = sock.getOutputStream();
92     }
93
94     public void reset() {
95       count = 4;
96     }
97
98     void send(long id, long ackId, AbstractJmsMessage msg) throws IOException {
99       try {
100         StreamUtil.writeTo(id, this);
101         StreamUtil.writeTo(ackId, this);
102         AbstractJmsMessage.write(msg, this);
103
104         buf[0] = (byte) ((count -4) >>> 24);
105         buf[1] = (byte) ((count -4) >>> 16);
106         buf[2] = (byte) ((count -4) >>> 8);
107         buf[3] = (byte) ((count -4) >>> 0);
108
109         writeTo(os);
110         os.flush();
111       } finally {
112         reset();
113       }
114     }
115   }
116   
117   public ProxyMessage receive() throws Exception JavaDoc {
118     if (logger.isLoggable(BasicLevel.DEBUG))
119       logger.log(BasicLevel.DEBUG, "IOControl.receive()");
120
121     try {
122       while (true) {
123         int len = StreamUtil.readIntFrom(bis);
124         long messageId = StreamUtil.readLongFrom(bis);
125         long ackId = StreamUtil.readLongFrom(bis);
126         AbstractJmsRequest obj = (AbstractJmsRequest) AbstractJmsMessage.read(bis);
127
128     if (messageId > inputCounter) {
129       inputCounter = messageId;
130           synchronized (this) {
131             if (unackCounter < windowSize) {
132               unackCounter++;
133             } else {
134               send(new ProxyMessage(-1, messageId, null));
135             }
136           }
137       return new ProxyMessage(messageId, ackId, obj);
138     } else {
139       logger.log(BasicLevel.DEBUG,
140                      "IOControl.receive: already received message: " + messageId + " -> " + obj);
141     }
142       }
143     } catch (IOException exc) {
144       if (logger.isLoggable(BasicLevel.DEBUG))
145         logger.log(BasicLevel.DEBUG, "IOControl.receive", exc);
146       close();
147       throw exc;
148     }
149   }
150
151   public void close() {
152     if (logger.isLoggable(BasicLevel.DEBUG))
153       logger.log(BasicLevel.DEBUG, "IOControl.close()");
154
155     try {
156       if (bis != null) bis.close();
157       bis = null;
158     } catch (IOException exc) {}
159     try {
160       if (sock != null) sock.getOutputStream().close();
161     } catch (IOException exc) {}
162     try {
163       if (sock != null) sock.close();
164       sock = null;
165     } catch (IOException exc) {}
166   }
167 }
168
Popular Tags