KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > jcifs > util > transport > Transport


1 package jcifs.util.transport;
2
3 import java.io.*;
4 import java.net.*;
5 import java.util.*;
6 import jcifs.util.LogStream;
7
8 /**
9  * This class simplifies communication for protocols that support
10  * multiplexing requests. It encapsulates a stream and some protocol
11  * knowledge (provided by a concrete subclass) so that connecting,
12  * disconnecting, sending, and receiving can be syncronized
13  * properly. Apparatus is provided to send and receive requests
14  * concurrently.
15  */

16
17 public abstract class Transport implements Runnable JavaDoc {
18
19     static int id = 0;
20     static LogStream log = LogStream.getInstance();
21
22     public static int readn( InputStream in,
23                 byte[] b,
24                 int off,
25                 int len ) throws IOException {
26         int i = 0, n = -5;
27
28         while (i < len) {
29             n = in.read( b, off + i, len - i );
30             if (n <= 0) {
31                 break;
32             }
33             i += n;
34         }
35
36         return i;
37     }
38
39     /* state values
40      * 0 - not connected
41      * 1 - connecting
42      * 2 - run connected
43      * 3 - connected
44      * 4 - error
45      */

46     int state = 0;
47
48     String JavaDoc name = "Transport" + id++;
49     Thread JavaDoc thread;
50     TransportException te;
51
52     protected HashMap response_map = new HashMap( 4 );
53
54     protected abstract void makeKey( Request request ) throws IOException;
55     protected abstract Request peekKey() throws IOException;
56     protected abstract void doSend( Request request ) throws IOException;
57     protected abstract void doRecv( Response response ) throws IOException;
58     protected abstract void doSkip() throws IOException;
59
60     public void sendrecv( Request request,
61                     Response response,
62                     long timeout ) throws IOException {
63         synchronized (response_map) {
64             makeKey( request );
65             response.isReceived = false;
66             try {
67                 response_map.put( request, response );
68                 doSend( request );
69                 response.expiration = System.currentTimeMillis() + timeout;
70                 while (!response.isReceived) {
71                     response_map.wait( timeout );
72                     timeout = response.expiration - System.currentTimeMillis();
73                     if (timeout <= 0) {
74                         throw new TransportException( name +
75                                 " timedout waiting for response to " +
76                                 request );
77                     }
78                 }
79             } catch( IOException ioe ) {
80                 if (log.level > 2)
81                     ioe.printStackTrace( log );
82                 try {
83                     disconnect( true );
84                 } catch( IOException ioe2 ) {
85                     ioe2.printStackTrace( log );
86                 }
87                 throw ioe;
88             } catch( InterruptedException JavaDoc ie ) {
89                 throw new TransportException( ie );
90             } finally {
91                 response_map.remove( request );
92             }
93         }
94     }
95     private void loop() {
96         while( thread == Thread.currentThread() ) {
97             try {
98                 Request key = peekKey();
99                 if (key == null)
100                     throw new IOException( "end of stream" );
101                 synchronized (response_map) {
102                     Response response = (Response)response_map.get( key );
103                     if (response == null) {
104                         if (log.level > 2)
105                             log.println( "Invalid key, skipping message" );
106                         doSkip();
107                     } else {
108                         doRecv( response );
109                         response.isReceived = true;
110                         response_map.notifyAll();
111                     }
112                 }
113             } catch( IOException ex ) {
114                 String JavaDoc msg = ex.getMessage();
115                 boolean hard = true;
116
117                 if (log.level > 2)
118                     ex.printStackTrace( log );
119
120                 if (msg != null && msg.equals( "Read timed out" )) {
121                     hard = false;
122                 }
123                 try {
124                     disconnect( hard );
125                 } catch( IOException ioe ) {
126                     ioe.printStackTrace( log );
127                 }
128             }
129         }
130     }
131
132     /* Build a connection. Only one thread will ever call this method at
133      * any one time. If this method throws an exception or the connect timeout
134      * expires an encapsulating TransportException will be thrown from connect
135      * and the transport will be in error.
136      */

137
138     protected abstract void doConnect() throws Exception JavaDoc;
139
140     /* Tear down a connection. If the hard parameter is true, the diconnection
141      * procedure should not initiate or wait for any outstanding requests on
142      * this transport.
143      */

144
145     protected abstract void doDisconnect( boolean hard ) throws IOException;
146
147     public synchronized void connect( long timeout ) throws TransportException {
148         switch (state) {
149             case 0:
150 System.out.println( name + ": connect: state=" + state );
151                 break;
152             case 3:
153                 return; // already connected
154
case 4:
155                 throw new TransportException( "Connection in error", te );
156             default:
157                 throw new TransportException( "Invalid state: " + state );
158         }
159
160         state = 1;
161         te = null;
162         thread = new Thread JavaDoc( this, name );
163         thread.setDaemon( true );
164
165         synchronized (thread) {
166             thread.start();
167             try {
168                 thread.wait( timeout ); /* wait for doConnect */
169             } catch( InterruptedException JavaDoc ie ) {
170                 throw new TransportException( ie );
171             }
172
173             switch (state) {
174                 case 1: /* doConnect never returned */
175                     te = new TransportException( "Connection timeout" );
176                 case 2:
177                     if (te != null) { /* doConnect throw Exception */
178                         state = 4; /* error */
179                         thread = null;
180                         throw te;
181                     }
182                     state = 3; /* Success! */
183 System.out.println( name + ": connected: state=" + state );
184                     return;
185                 default:
186                     te = new TransportException( "Invalid state: " + state );
187             }
188         }
189     }
190     public synchronized void disconnect( boolean hard ) throws IOException {
191         switch (state) {
192             case 0: /* not connected - just return */
193                 return;
194             case 3: /* connected - go ahead and disconnect */
195 System.out.println( name + ": disconnecting: state=" + state + ",mapsize=" + response_map.size() + ",hard=" + hard);
196                 if (response_map.size() != 0 && !hard) {
197                     break; /* outstanding requests */
198                 }
199                 doDisconnect( hard );
200 System.out.println( name + ": disconnected: state=" + state );
201             case 4: /* in error - reset the transport */
202                 thread = null;
203                 state = 0;
204                 break;
205             default:
206                 throw new TransportException( "Invalid state: " + state );
207         }
208     }
209     public void run() {
210         Thread JavaDoc run_thread = Thread.currentThread();
211         Exception JavaDoc ex0 = null;
212
213         try {
214             /* We cannot synchronize (run_thread) here or the caller's
215              * thread.wait( timeout ) cannot reaquire the lock and
216              * return which would render the timeout effectively useless.
217              */

218             doConnect();
219         } catch( Exception JavaDoc ex ) {
220             ex0 = ex; // Defer to below where we're locked
221
return;
222         } finally {
223             synchronized (run_thread) {
224                 if (run_thread != thread) {
225                     /* Thread no longer the one setup for this transport --
226                      * doConnect returned too late, just ignore.
227                      */

228                     if (ex0 != null) {
229                         ex0.printStackTrace();
230                     }
231                     return;
232                 }
233                 if (ex0 != null) {
234                     te = new TransportException( ex0 );
235                 }
236                 state = 2; // run connected
237
run_thread.notify();
238 System.out.println( name + ": run connected" );
239             }
240         }
241
242         /* Proccess responses
243          */

244         loop();
245     }
246
247     public String JavaDoc toString() {
248         return name;
249     }
250 }
251
Popular Tags