KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > bsf > debug > util > SocketConnection


1 /*
2  * The Apache Software License, Version 1.1
3  *
4  * Copyright (c) 2002 The Apache Software Foundation. All rights
5  * reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  *
14  * 2. Redistributions in binary form must reproduce the above copyright
15  * notice, this list of conditions and the following disclaimer in
16  * the documentation and/or other materials provided with the
17  * distribution.
18  *
19  * 3. The end-user documentation included with the redistribution, if
20  * any, must include the following acknowlegement:
21  * "This product includes software developed by the
22  * Apache Software Foundation (http://www.apache.org/)."
23  * Alternately, this acknowlegement may appear in the software itself,
24  * if and wherever such third-party acknowlegements normally appear.
25  *
26  * 4. The names "Apache BSF", "Apache", and "Apache Software Foundation"
27  * must not be used to endorse or promote products derived from
28  * this software without prior written permission. For written
29  * permission, please contact apache@apache.org.
30  *
31  * 5. Products derived from this software may not be called "Apache"
32  * nor may "Apache" appear in their names without prior written
33  * permission of the Apache Group.
34  *
35  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
36  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
37  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
38  * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
39  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
40  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
41  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
42  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
43  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
44  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
45  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
46  * SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many individuals
50  * on behalf of the Apache Software Foundation and was originally created by
51  * Sanjiva Weerawarana and others at International Business Machines
52  * Corporation. For more information on the Apache Software Foundation,
53  * please see <http://www.apache.org/>.
54  */

55
56 package org.apache.bsf.debug.util;
57
58 import java.util.*;
59 import java.net.*;
60 import java.io.*;
61
62 import org.apache.bsf.debug.*;
63
64 public abstract class SocketConnection {
65
66     Vector m_rcells; // ResultCell
67
Hashtable m_tcells; // ThreadCell
68
IntHashtable m_tcellsById;
69
70     private boolean keep_listening;
71     int fCmdIdGenerator;
72     
73     IntHashtable m_skeletons;
74     protected StubTable fStubs;
75
76     protected InputStream fInputStream;
77     protected OutputStream fOutputStream;
78
79     protected DataInputStream fDataInputStream;
80     protected DataOutputStream fDataOutputStream;
81
82     protected SocketConnection() {
83         m_skeletons = new IntHashtable();
84         fStubs = null;
85         m_rcells = new Vector();
86         m_tcells = new Hashtable();
87         m_tcellsById = new IntHashtable();
88
89         if (ThreadCell.isServer) {
90             fCmdIdGenerator = 10000;
91         } else {
92             fCmdIdGenerator = 90000;
93         }
94     }
95
96     public void exportSkeleton(Skeleton skel) {
97         skel.allocOid(this);
98         m_skeletons.put(skel.getUid(), skel);
99     }
100
101     public Skeleton getSkeleton(int uid) {
102         return (Skeleton) m_skeletons.get(uid);
103     }
104
105     public Stub getStub(int tid, int uid) {
106         return (Stub) fStubs.swizzle(tid, uid);
107     }
108
109     public void listen() {
110         ResultCell cell = null;
111         int cmdId, thId, count;
112         int classId;
113         int methodId;
114         byte bytes[];
115         boolean errorOccured, isResult;
116             
117         setListening(true);
118             
119         while (keep_listening) {
120             try {
121                 count = fDataInputStream.readInt();
122                 // total count
123
thId = fDataInputStream.readInt();
124                 // distributed thread id
125
errorOccured = fDataInputStream.readBoolean();
126                 // cmdId.
127
cmdId = fDataInputStream.readInt();
128                 // cmdId.
129
isResult = fDataInputStream.readBoolean();
130                 // if result.
131

132                 if (count != 0) {
133                     bytes = new byte[count];
134                     if (count != fInputStream.read(bytes))
135                         throw new Error JavaDoc("Wire Protocol Error");
136                 }
137                 else bytes = new byte[0];
138                     
139                 if (errorOccured) {
140                     receivedException(thId,cmdId,bytes);
141                 }
142                 else {
143                     if (isResult) {
144                         // a result...
145
receivedResult(thId,cmdId,bytes);
146                     }
147                     else {
148                         // an invocation...
149
receivedInvocation(thId,cmdId,bytes);
150                     }
151                 }
152             }
153             catch (InterruptedIOException iioe) {
154                 // Continue on; this timeout is expected.
155
continue;
156             }
157             catch (Exception JavaDoc ex) {
158                 wireExceptionNotify(ex);
159             }
160         }
161     }
162     
163     public void stopListening() {
164         Enumeration e;
165         ResultCell cell;
166
167         e = m_rcells.elements();
168         while (e.hasMoreElements()) {
169             cell = (ResultCell) e.nextElement();
170             try {
171                 cell.disconnected = true;
172                 cell.completionNotify();
173             } catch (Throwable JavaDoc t) {
174                 t.printStackTrace();
175             }
176         }
177         fStubs.disconnectNotify();
178         setListening(false);
179     }
180
181     private synchronized void setListening(boolean listen) {
182         keep_listening = listen;
183     }
184
185     private void receivedException(int thId, int cmdId, byte bytes[])
186         throws IOException {
187         Exception JavaDoc ex;
188         ResultCell cell;
189
190         if (ThreadCell.isServer)
191             DebugLog.stdoutPrintln("Received error from CLIENT...",
192                                    DebugLog.BSF_LOG_L2);
193         else
194             DebugLog.stdoutPrintln("Received error from SERVER...",
195                                    DebugLog.BSF_LOG_L2);
196
197         DebugLog.stdoutPrintln(" **** ERROR: thId=" + thId +
198                                ", cmdId=" + cmdId,
199                                DebugLog.BSF_LOG_L2);
200         cell = searchCell(cmdId);
201         cell.setPacketBytes(bytes);
202         cell.readException();
203         cell.print();
204
205         // waking up invoker...
206
// no more reading from the socket is allowed.
207
cell.thread.completionNotify(cell);
208
209     }
210
211     private void receivedResult(int thId, int cmdId, byte bytes[]) {
212         ResultCell cell;
213         DebugLog.stdoutPrintln(" <<<< RESULT: thId=" +
214                                thId + ", cmdId=" + cmdId,
215                                DebugLog.BSF_LOG_L3);
216         if (bytes!=null)
217             DebugLog.stdoutPrintln(" byte count=" +
218                                    bytes.length, DebugLog.BSF_LOG_L3);
219         else
220             DebugLog.stdoutPrintln(" no bytes",
221                                    DebugLog.BSF_LOG_L3);
222                 
223         cell = searchCell(cmdId);
224         cell.setPacketBytes(bytes);
225         cell.parseResult();
226         // waking up invoker...
227
cell.completionNotify();
228     }
229
230     private synchronized void receivedInvocation(int thId, int cmdId,
231                                                  byte bytes[])
232         throws Exception JavaDoc {
233         
234         DebugLog.stdoutPrintln(" >>>> INVOCATION: thId=" +
235                                thId + ", cmdId=" + cmdId,
236                                DebugLog.BSF_LOG_L3);
237         if (bytes!=null)
238             DebugLog.stdoutPrintln(" byte count=" +
239                                    bytes.length,
240                                    DebugLog.BSF_LOG_L3);
241         else
242             DebugLog.stdoutPrintln(" no bytes",
243                                    DebugLog.BSF_LOG_L3);
244             
245         ResultCell rcell = new ResultCell(this);
246         m_rcells.addElement(rcell);
247
248         rcell.incomingInvocation(cmdId, bytes);
249             
250         invoke(rcell, thId);
251     }
252
253     /**
254      * Called from the completion of an incoming remote
255      * method from the ThreadCell class.
256      * The ResultCell encodes the result to send the same
257      * way a stub encodes it expected result.
258      */

259     void completionNotify(ResultCell rcell) {
260         try {
261             rcell.sendResult();
262         } catch (Exception JavaDoc ex) {
263             DebugLog.stdoutPrintln("Exception Raised while sending result.",
264                                    DebugLog.BSF_LOG_L0);
265             ex.printStackTrace();
266         }
267     }
268
269     /**
270      * Switch to the right thread to carry on the incoming
271      * invocation.
272      * If the invocation is part of a loopback, reuse
273      * the waiting thread...
274      * If not, create a new thread to carry on the call.
275      *
276      * IMPORTANT: the socket listener thread remains only
277      * a listener, it does not carry any other job than
278      * reading packets and dispatching.
279      */

280     private void invoke(ResultCell rcell, int thId) {
281
282         ThreadCell tcell = (ThreadCell) m_tcellsById.get(thId);
283         if (tcell == null) {
284             // Not a known distributed thread...
285
// so create a local thread for supporting the
286
// distributed execution and switch to that thread
287
// to carry on the call...
288
Thread JavaDoc thread;
289             tcell = new ThreadCell(this, thId);
290             thread = tcell.getThread();
291             m_tcells.put(thread, tcell);
292             m_tcellsById.put(thId, tcell);
293         }
294         tcell.pushLoopback(rcell);
295     }
296
297     /**
298      * First call made by a stub.
299      * It will allocate the ResultCell and the output buffer for
300      * the outgoing packet.
301      * It will also check if this out-going remote invocation
302      * is part of a global execution already or not.
303      * If not, a global execution (distributed thread) is set,
304      * other the current one is reused.
305      */

306     public synchronized ResultCell prepareOutgoingInvoke(Stub self,
307                                                          int classId,
308                                                          int methodId)
309         throws IOException {
310
311         ThreadCell tcell;
312         ResultCell rcell;
313
314         Thread JavaDoc thread;
315         thread = Thread.currentThread();
316         tcell = (ThreadCell) m_tcells.get(thread);
317         if (tcell == null) {
318             tcell = new ThreadCell(this, thread);
319             m_tcells.put(thread, tcell);
320             m_tcellsById.put(tcell.getThId(), tcell);
321         }
322
323         rcell = new ResultCell(this);
324
325         int cmdId;
326         if (ThreadCell.isServer)
327             cmdId = ++fCmdIdGenerator;
328         else
329             cmdId = --fCmdIdGenerator;
330         
331         rcell.outgoingInvocation(cmdId, classId, methodId, self);
332         m_rcells.addElement(rcell);
333         tcell.pushInvocation(rcell);
334
335         return rcell;
336     }
337
338     private boolean m_outStreamLocked = false;
339     private Object JavaDoc m_outStreamLock = new Object JavaDoc();
340
341     void lockOutStream() {
342         synchronized (m_outStreamLock) {
343             while (m_outStreamLocked) {
344                 try {
345                     m_outStreamLock.wait();
346                 } catch (InterruptedException JavaDoc ex) {
347
348                 }
349             }
350             m_outStreamLocked = true;
351         }
352     }
353
354     void releaseOutStream() {
355         synchronized (m_outStreamLock) {
356             m_outStreamLocked = false;
357             m_outStreamLock.notifyAll();
358         }
359     }
360
361     void sendPacket(int thId, int cmdId, boolean isResult,
362                     byte bytes[], boolean errorOccured) {
363         try {
364             synchronized (fDataOutputStream) {
365                 fDataOutputStream.writeInt(bytes.length);
366                 fDataOutputStream.writeInt(thId);
367                 fDataOutputStream.writeBoolean(errorOccured);
368                 fDataOutputStream.writeInt(cmdId);
369                 fDataOutputStream.writeBoolean(isResult);
370                 if (bytes.length != 0) {
371                     fOutputStream.write(bytes);
372                 }
373             }
374         } catch (Exception JavaDoc ex) {
375             DebugLog.stdoutPrintln("Exception during sending result...",
376                                    DebugLog.BSF_LOG_L0);
377             ex.printStackTrace();
378             this.wireExceptionNotify(ex);
379         }
380     }
381
382     public synchronized ResultCell searchCell(int cmdId) {
383         ResultCell cell = null;
384         Enumeration e;
385         e = m_rcells.elements();
386         while (e.hasMoreElements()) {
387             cell = (ResultCell) e.nextElement();
388             if (cell.cmdId == cmdId)
389                 break;
390             cell = null;
391         }
392         if (cell == null)
393             throw new Error JavaDoc("Error in Wire Protocol, can't find CmdId=" +
394                             cmdId);
395         return cell;
396     }
397
398     public Stub swizzle(int tid, int uid) {
399         return fStubs.swizzle(tid, uid);
400     }
401
402     protected abstract void dispatchInvocation(ResultCell rcell)
403         throws Exception JavaDoc;
404
405     /**
406      * A Wire-related exception occurred.
407      * We will consider that we have lost the connection.
408      * All stubs will be revoked... allowing higher-level
409      * listener to pick up that some remote objects have
410      * been revoked through the StubListener mechanism.
411      *
412      * Log at lower priority than a standard exception,
413      * since this is the client quit mechanism too.
414      */

415     protected void wireExceptionNotify(Exception JavaDoc ex) {
416         DebugLog.stdoutPrintln("A WIRE exception occurred.",
417                                DebugLog.BSF_LOG_L2);
418         DebugLog.stdoutPrintln(ex.toString(),
419                    DebugLog.BSF_LOG_L2);
420         disconnectNotify(ex);
421         fStubs.disconnectNotify();
422         setListening(false);
423     }
424
425     /**
426      * Raise the exception in all waiting threads...
427      */

428     private synchronized void disconnectNotify(Exception JavaDoc ex) {
429         Enumeration e;
430         ResultCell cell;
431         DebugLog.stdoutPrintln("Raise the exception in all waiting threads...",
432                                DebugLog.BSF_LOG_L2);
433         e = m_rcells.elements();
434         while (e.hasMoreElements()) {
435             cell = (ResultCell) e.nextElement();
436             try {
437                 cell.setException(ex);
438                 cell.disconnected = true;
439                 cell.completionNotify();
440             } catch (Throwable JavaDoc t) {
441                 t.printStackTrace();
442             }
443         }
444         m_rcells = new Vector();
445         m_tcells = new Hashtable();
446         DebugLog.stdoutPrintln("Done with raising exceptions...",
447                                DebugLog.BSF_LOG_L2);
448     }
449 }
450
Popular Tags