KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > sapia > ubik > rmi > server > transport > nio > tcp > RmiChannelHandler


1 package org.sapia.ubik.rmi.server.transport.nio.tcp;
2
3 import java.io.EOFException JavaDoc;
4 import java.io.IOException JavaDoc;
5 import java.nio.ByteBuffer JavaDoc;
6 import java.rmi.RemoteException JavaDoc;
7
8 import org.sapia.ubik.net.ServerAddress;
9 import org.sapia.ubik.net.nio.ChannelHandler;
10 import org.sapia.ubik.net.nio.Cycle;
11 import org.sapia.ubik.rmi.server.Config;
12 import org.sapia.ubik.rmi.server.Log;
13 import org.sapia.ubik.rmi.server.RMICommand;
14 import org.sapia.ubik.rmi.server.VmId;
15 import org.sapia.ubik.rmi.server.invocation.InvokeCommand;
16 import org.sapia.ubik.rmi.server.perf.PerfAnalyzer;
17 import org.sapia.ubik.rmi.server.transport.RmiConnection;
18 import org.sapia.ubik.util.ByteVector;
19
20 /**
21  * @author Yanick Duchesne
22  *
23  * <dl>
24  * <dt><b>Copyright: </b>
25  * <dd>Copyright &#169; 2002-2005 <a HREF="http://www.sapia-oss.org">Sapia Open
26  * Source Software </a>. All Rights Reserved.</dd>
27  * </dt>
28  * <dt><b>License: </b>
29  * <dd>Read the license.txt file of the jar or visit the <a
30  * HREF="http://www.sapia-oss.org/license.html">license page </a> at the Sapia
31  * OSS web site</dd>
32  * </dt>
33  * </dl>
34  */

35 public class RmiChannelHandler implements ChannelHandler {
36
37   private static final int READ_HEADER = 0;
38   private static final int READ_PAYLOAD = 1;
39   private static final int WRITE_PAYLOAD = 2;
40
41   private final int HEADER_BYTES = 4;
42   private final int DEFAULT_BUFSZ = 1024;
43   private final int INCREMENT = 10;
44
45   private PerfAnalyzer _perf = PerfAnalyzer.getInstance();
46   private int _state = READ_HEADER;
47   private int _length = -1;
48   private int _processed = 0;
49   private ByteVector _bytes;
50   private byte[] _buf = new byte[DEFAULT_BUFSZ];
51   private RmiChannelHandlerFactory _fac;
52   private RmiConnection _conn;
53
54   RmiChannelHandler(RmiChannelHandlerFactory fac, int bufsize) {
55     _fac = fac;
56     _bytes = new ByteVector(bufsize, INCREMENT);
57     _conn = new NioTcpRmiServerConnection(fac.getServerAddress(), this);
58   }
59
60   /**
61    * @see org.sapia.ubik.net.nio.ChannelHandler#completed(org.sapia.ubik.net.nio.Cycle)
62    */

63   public void completed(Cycle cycle) {
64     _state = READ_HEADER;
65     _processed = 0;
66     _length = -1;
67     _bytes.clear(false);
68     if(cycle.state() == Cycle.STATE_COMPLETE) {
69       _fac.release(this);
70     }
71   }
72
73   /**
74    * @see org.sapia.ubik.net.nio.ChannelHandler#error(org.sapia.ubik.net.nio.Cycle)
75    */

76   public void error(Cycle cycle) {
77     _state = READ_HEADER;
78     _processed = 0;
79     _length = -1;
80     _bytes.clear(false);
81     if(cycle.state() == Cycle.STATE_ERROR) {
82       _fac.release(this);
83     }
84   }
85
86   /**
87    * @see org.sapia.ubik.net.nio.ChannelHandler#process(org.sapia.ubik.net.nio.Cycle)
88    */

89   public void process(Cycle cycle) {
90     
91     RMICommand cmd = null;
92     try {
93       if(Log.isDebug()) {
94         Log.debug(getClass(), "receiving command");
95       }
96
97       cmd = (RMICommand) _conn.receive();
98
99       if(Log.isDebug()) {
100         Log.debug(getClass(), "command received: " + cmd.getClass().getName()
101             + " from " + cmd.getServerAddress() + '@' + cmd.getVmId());
102       }
103
104       cmd.init(new Config(_fac.getServerAddress(), _conn));
105
106       Object JavaDoc resp = null;
107
108       try {
109         if(_perf.isEnabled()) {
110           if(cmd instanceof InvokeCommand) {
111             _perf.getTopic(getClass().getName() + ".RemoteCall").start();
112           }
113         }
114
115         resp = cmd.execute();
116
117         if(_perf.isEnabled()) {
118           if(cmd instanceof InvokeCommand) {
119             _perf.getTopic(getClass().getName() + ".RemoteCall").end();
120           }
121         }
122       } catch(Throwable JavaDoc t) {
123         t.printStackTrace();
124         t.fillInStackTrace();
125         resp = t;
126       }
127
128       if(_perf.isEnabled()) {
129         if(cmd instanceof InvokeCommand) {
130           _perf.getTopic(getClass().getName() + ".SendResponse").start();
131         }
132       }
133
134       doSend(resp, cmd.getVmId(), cmd.getServerAddress());
135       cycle.state(Cycle.STATE_WRITE);
136     }catch(RemoteException JavaDoc e){
137       Log.error(getClass(), "RemoteException caught sending response", e);
138       _conn.close();
139       cycle.state(Cycle.STATE_ERROR);
140       return;
141     }catch(EOFException JavaDoc e){
142       Log.error(getClass(), "EOFException caught sending response; client probably down", e);
143       _conn.close();
144       cycle.state(Cycle.STATE_ERROR);
145       return;
146     }catch(RuntimeException JavaDoc e){
147       Log.error(getClass(), "RuntimeException caught sending response", e);
148       try{
149         doSend(e, null, null);
150         cycle.state(Cycle.STATE_WRITE);
151       }catch(Exception JavaDoc e2){
152         _conn.close();
153         cycle.state(Cycle.STATE_ERROR);
154       }
155       return;
156     }catch(Exception JavaDoc e){
157       Log.error(getClass(), "Exception caught sending response", e);
158       try{
159         doSend(e, null, null);
160         cycle.state(Cycle.STATE_WRITE);
161       }catch(Exception JavaDoc e2){
162         _conn.close();
163         cycle.state(Cycle.STATE_ERROR);
164       }
165       return;
166     }
167       
168     if(_perf.isEnabled()) {
169       if(cmd != null && cmd instanceof InvokeCommand) {
170         _perf.getTopic(getClass().getName() + ".SendResponse").end();
171       }
172     }
173     cycle.state(Cycle.STATE_WRITE);
174   }
175   
176   private void doSend(Object JavaDoc response, VmId vmid, ServerAddress addr) throws IOException JavaDoc, RemoteException JavaDoc{
177     if(vmid != null){
178       _conn.send(response, vmid, addr.getTransportType());
179     }
180     else{
181       _conn.send(response);
182     }
183   }
184
185   /**
186    * @see org.sapia.ubik.net.nio.ChannelHandler#read(org.sapia.ubik.net.nio.Cycle)
187    */

188   public boolean read(Cycle cycle) {
189     switch(_state){
190     case READ_HEADER:
191       if(cycle.getByteBuffer().remaining() >= HEADER_BYTES) {
192         _length = cycle.getByteBuffer().getInt();
193         _state = READ_PAYLOAD;
194       }
195       return false;
196     case READ_PAYLOAD:
197       ByteBuffer JavaDoc input = cycle.getByteBuffer();
198       while(input.hasRemaining()) {
199         int toRead = input.remaining();
200         _bytes.write(input);
201         _processed += toRead;
202       }
203       if(_processed >= _length) {
204         cycle.state(Cycle.STATE_PROCESS);
205         return true;
206       }
207       return false;
208     default:
209       return false;
210     }
211   }
212
213   public boolean write(Cycle cycle) {
214     ByteBuffer JavaDoc output = cycle.getByteBuffer();
215     switch(_state){
216     case READ_PAYLOAD:
217       _bytes.reset();
218       _state = WRITE_PAYLOAD;
219       output.putInt(_bytes.length());
220       _processed = 0;
221     case WRITE_PAYLOAD:
222       if(_bytes.hasRemaining()) {
223         _bytes.read(output);
224       }
225       if(_bytes.hasRemaining()){
226         return false;
227       }
228       cycle.state(Cycle.STATE_RECYCLE);
229       return true;
230     default:
231       return false;
232     }
233   }
234
235   ByteVector getByteVector() {
236     return _bytes;
237   }
238
239   ByteVector getContent() {
240     _bytes.reset();
241     return _bytes;
242   }
243
244 }
245
Popular Tags