KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > orb > ReplyReceiver


1 package org.jacorb.orb;
2
3 /*
4  * JacORB - a free Java ORB
5  *
6  * Copyright (C) 1997-2004 Gerald Brose.
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the Free
20  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */

22
23 import java.util.*;
24
25 import org.apache.avalon.framework.logger.Logger;
26 import org.apache.avalon.framework.configuration.Configurable;
27 import org.apache.avalon.framework.configuration.DefaultConfiguration;
28 import org.apache.avalon.framework.configuration.ConfigurationException;
29
30 import org.jacorb.orb.giop.MessageInputStream;
31 import org.jacorb.orb.giop.ReplyInputStream;
32 import org.jacorb.orb.giop.ReplyPlaceholder;
33 import org.jacorb.util.Time;
34
35 import org.omg.CORBA.MARSHAL JavaDoc;
36 import org.omg.CORBA.SystemException JavaDoc;
37 import org.omg.CORBA.portable.ApplicationException JavaDoc;
38 import org.omg.CORBA.portable.InvokeHandler JavaDoc;
39 import org.omg.CORBA.portable.RemarshalException JavaDoc;
40 import org.omg.CORBA.portable.ServantObject JavaDoc;
41 import org.omg.GIOP.ReplyStatusType_1_2;
42 import org.omg.Messaging.ExceptionHolder;
43 import org.omg.TimeBase.UtcT;
44
45 /**
46  * A special ReplyPlaceholder that receives replies to normal requests,
47  * either synchronously or asynchronously. A ReplyReceiver
48  * handles all ORB-internal work that needs to be done for the reply,
49  * such as checking for exceptions and invoking the interceptors.
50  * The client stub can either do a blocking wait on the ReplyReceiver
51  * (via getReply()), or a ReplyHandler can be supplied when the
52  * ReplyReceiver is created; then the reply is delivered to that
53  * ReplyHandler.
54  *
55  * @author Andre Spiegel <spiegel@gnu.org>
56  * @version $Id: ReplyReceiver.java,v 1.25 2005/03/02 22:01:47 andre.spiegel Exp $
57  */

58
59 public class ReplyReceiver
60     extends ReplyPlaceholder
61     implements Configurable
62 {
63     private org.jacorb.orb.Delegate delegate = null;
64     private ClientInterceptorHandler interceptors = null;
65
66     private org.omg.Messaging.ReplyHandler replyHandler = null;
67
68     private String JavaDoc operation;
69     private UtcT replyEndTime;
70     private Timer timer;
71
72     /** the configuration object for this delegate */
73     private org.apache.avalon.framework.configuration.Configuration configuration = null;
74     private Logger logger;
75
76     /** configuration properties */
77     private boolean retry_on_failure = false;
78
79
80     public ReplyReceiver( org.jacorb.orb.Delegate delegate,
81                           String JavaDoc operation,
82                           org.omg.TimeBase.UtcT replyEndTime,
83                           ClientInterceptorHandler interceptors,
84                           org.omg.Messaging.ReplyHandler replyHandler )
85     {
86         super((org.jacorb.orb.ORB)delegate.orb(null));
87
88         this.delegate = delegate;
89         this.operation = operation;
90         this.replyEndTime = replyEndTime;
91         this.interceptors = interceptors;
92         this.replyHandler = replyHandler;
93
94         if (replyEndTime != null)
95         {
96             timer = new Timer(replyEndTime);
97             timer.setName("ReplyReceiver Timer" );
98             timer.start();
99         }
100         else
101         {
102             timer = null;
103         }
104
105     }
106
107     public void configure(org.apache.avalon.framework.configuration.Configuration configuration)
108         throws org.apache.avalon.framework.configuration.ConfigurationException
109     {
110         this.configuration = configuration;
111         logger =
112             ((org.jacorb.config.Configuration)configuration).getNamedLogger("jacorb.orb.rep_recv");
113         retry_on_failure =
114             configuration.getAttribute("jacorb.connection.client.retry_on_failure","off").equals("on");
115     }
116
117
118     public void replyReceived( MessageInputStream in )
119     {
120         if (timeoutException)
121             return; // discard reply
122
if (timer != null)
123             timer.wakeup();
124
125         Set pending_replies = delegate.get_pending_replies();
126         // grab pending_replies lock BEFORE my own,
127
// then I will already have it in the replyDone call below.
128
synchronized ( pending_replies )
129         {
130         // This internal synchronization prevents a deadlock
131
// when a timeout and a reply coincide, suggested
132
// by Jimmy Wilson, 2005-01. It is only a temporary
133
// work-around though, until I can simplify this entire
134
// logic much more thoroughly, AS.
135
synchronized (this)
136         {
137             if (timeoutException)
138                 return; // discard reply
139

140             this.in = in;
141             delegate.replyDone (this);
142
143             if (replyHandler != null)
144             {
145                 // asynchronous delivery
146
performCallback ((ReplyInputStream)in);
147             }
148             else
149             {
150                 // synchronous delivery
151
ready = true;
152                 notifyAll();
153             }
154         }
155         }
156     }
157
158     private void performCallback ( ReplyInputStream reply )
159     {
160         // TODO: Call interceptors.
161

162         org.omg.CORBA.portable.Delegate JavaDoc replyHandlerDelegate =
163             ( ( org.omg.CORBA.portable.ObjectImpl JavaDoc ) replyHandler )
164                                                          ._get_delegate();
165
166         ServantObject JavaDoc so =
167             replyHandlerDelegate.servant_preinvoke( replyHandler,
168                                                     operation,
169                                                     InvokeHandler JavaDoc.class );
170         try
171         {
172             switch ( reply.getStatus().value() )
173             {
174                 case ReplyStatusType_1_2._NO_EXCEPTION:
175                 {
176                     ((InvokeHandler JavaDoc)so.servant)
177                         ._invoke( operation,
178                                   reply,
179                                   new DummyResponseHandler() );
180                     break;
181                 }
182                 case ReplyStatusType_1_2._USER_EXCEPTION:
183                 case ReplyStatusType_1_2._SYSTEM_EXCEPTION:
184                 {
185                     ExceptionHolderImpl holder =
186                         new ExceptionHolderImpl( reply );
187
188                     org.omg.CORBA_2_3.ORB JavaDoc orb =
189                         ( org.omg.CORBA_2_3.ORB JavaDoc )replyHandlerDelegate
190                                                               .orb( null );
191                     orb.register_value_factory
192                         ( "IDL:omg.org/Messaging/ExceptionHolder:1.0",
193                           new ExceptionHolderFactory() );
194
195                     CDRInputStream input =
196                         new CDRInputStream( orb, holder.marshal() );
197
198                     ((InvokeHandler JavaDoc)so.servant)
199                         ._invoke( operation + "_excep",
200                                   input,
201                                   new DummyResponseHandler() );
202                     break;
203                 }
204             }
205         }
206         catch ( Exception JavaDoc e )
207         {
208             if (logger.isWarnEnabled())
209                 logger.warn("Exception during callback: " + e.getMessage() );
210         }
211         finally
212         {
213             replyHandlerDelegate.servant_postinvoke( replyHandler, so );
214         }
215     }
216
217     /**
218      * There's a lot of code duplication in this method right now.
219      * This should be merged with performCallback() above.
220      */

221     private void performExceptionCallback (ExceptionHolderImpl holder)
222     {
223         // TODO: Call interceptors.
224

225         org.omg.CORBA.portable.Delegate JavaDoc replyHandlerDelegate =
226             ( ( org.omg.CORBA.portable.ObjectImpl JavaDoc ) replyHandler )
227                                                          ._get_delegate();
228
229         ServantObject JavaDoc so =
230             replyHandlerDelegate.servant_preinvoke( replyHandler,
231                                                     operation,
232                                                     InvokeHandler JavaDoc.class );
233         try
234         {
235             org.omg.CORBA_2_3.ORB JavaDoc orb =
236                     ( org.omg.CORBA_2_3.ORB JavaDoc )replyHandlerDelegate
237                                                               .orb( null );
238             orb.register_value_factory
239                 ( "IDL:omg.org/Messaging/ExceptionHolder:1.0",
240                   new ExceptionHolderFactory() );
241
242             CDRInputStream input =
243                 new CDRInputStream( orb, holder.marshal() );
244
245             ((InvokeHandler JavaDoc)so.servant)
246                 ._invoke( operation + "_excep",
247                           input,
248                           new DummyResponseHandler() );
249         }
250         catch ( Exception JavaDoc e )
251         {
252             if (logger.isWarnEnabled())
253                 logger.warn("Exception during callback: " + e.getMessage() );
254         }
255         finally
256         {
257             replyHandlerDelegate.servant_postinvoke( replyHandler, so );
258         }
259     }
260
261
262
263     /**
264      * This method blocks until a reply becomes available.
265      * If the reply contains any exceptions, they are rethrown.
266      */

267     public synchronized ReplyInputStream getReply()
268         throws RemarshalException JavaDoc, ApplicationException JavaDoc
269     {
270         try
271         {
272            // On NT connection closure due to service shutdown is not
273
// detected until this point, resulting in a COMM_FAILURE.
274
// Map to RemarshalException to force rebind attempt.
275
try
276            {
277                getInputStream(); // block until reply is available
278
}
279            catch (org.omg.CORBA.COMM_FAILURE JavaDoc ex)
280            {
281                if (retry_on_failure)
282                {
283                    throw new RemarshalException JavaDoc();
284                }
285                else
286                {
287                    //rethrow
288
throw ex;
289                }
290            }
291         }
292         catch ( SystemException JavaDoc se )
293         {
294             interceptors.handle_receive_exception( se );
295             throw se;
296         }
297         catch ( RemarshalException JavaDoc re )
298         {
299             // Wait until the thread that received the actual
300
// forward request rebound the Delegate
301
delegate.waitOnBarrier();
302             throw new RemarshalException JavaDoc();
303         }
304
305         ReplyInputStream reply = ( ReplyInputStream ) in;
306
307         ReplyStatusType_1_2 status = delegate.doNotCheckExceptions()
308                                      ? ReplyStatusType_1_2.NO_EXCEPTION
309                                      : reply.getStatus();
310
311         switch ( status.value() )
312         {
313             case ReplyStatusType_1_2._NO_EXCEPTION:
314             {
315                 interceptors.handle_receive_reply ( reply );
316                 return reply;
317             }
318             case ReplyStatusType_1_2._USER_EXCEPTION:
319             {
320                 ApplicationException JavaDoc ae = getApplicationException ( reply );
321                 interceptors.handle_receive_exception( ae, reply );
322                 throw ae;
323             }
324             case ReplyStatusType_1_2._SYSTEM_EXCEPTION:
325             {
326                 SystemException JavaDoc se = SystemExceptionHelper.read ( reply );
327                 interceptors.handle_receive_exception( se, reply );
328                 throw se;
329             }
330             case ReplyStatusType_1_2._LOCATION_FORWARD:
331             case ReplyStatusType_1_2._LOCATION_FORWARD_PERM:
332             {
333                 org.omg.CORBA.Object JavaDoc forward_reference = reply.read_Object();
334                 interceptors.handle_location_forward( reply, forward_reference );
335                 doRebind( forward_reference );
336                 throw new RemarshalException JavaDoc();
337             }
338             case ReplyStatusType_1_2._NEEDS_ADDRESSING_MODE:
339             {
340                 throw new org.omg.CORBA.NO_IMPLEMENT JavaDoc(
341                             "WARNING: Got reply status NEEDS_ADDRESSING_MODE "
342                           + "(not implemented)." );
343             }
344             default:
345             {
346                 throw new MARSHAL JavaDoc
347                     ("Received unexpected reply status: " + status.value() );
348             }
349         }
350     }
351
352     private void doRebind ( org.omg.CORBA.Object JavaDoc forward_reference )
353     {
354         try
355         {
356             // make other threads that have unreturned replies wait
357
delegate.lockBarrier();
358
359             // tell every pending request to remarshal
360
// they will be blocked on the barrier
361
Set pending_replies = delegate.get_pending_replies();
362             synchronized ( pending_replies )
363             {
364                 for ( Iterator i = pending_replies.iterator(); i.hasNext(); )
365                 {
366                     ReplyPlaceholder p = ( ReplyPlaceholder ) i.next();
367                     p.retry();
368                 }
369             }
370
371             // do the actual rebind
372
delegate.rebind ( forward_reference );
373         }
374         finally
375         {
376             // now other threads can safely remarshal
377
delegate.openBarrier();
378         }
379     }
380
381     private ApplicationException JavaDoc getApplicationException ( ReplyInputStream reply )
382     {
383         reply.mark( 0 );
384         String JavaDoc id = reply.read_string();
385
386         try
387         {
388             reply.reset();
389         }
390         catch( java.io.IOException JavaDoc ioe )
391         {
392             //should not happen anyway
393
if (logger.isErrorEnabled())
394                 logger.error("Exception int reset(): " + ioe.getMessage() );
395         }
396
397         return new ApplicationException JavaDoc( id, reply );
398     }
399
400     /**
401      * A ResponseHandler that is passed to the ReplyHandler's POA
402      * when we invoke it. Since ReplyHandler operations never generate
403      * replies, this ResponseHandler does nothing to this effect.
404      * The createReply() method, however, is the last method that
405      * is called before control goes to the ReplyHandler servant,
406      * so we use it to check for timing constraints.
407      */

408     private class DummyResponseHandler
409         implements org.omg.CORBA.portable.ResponseHandler JavaDoc
410     {
411         public org.omg.CORBA.portable.OutputStream JavaDoc createReply()
412         {
413             // the latest possible time at which we can do this
414
Time.waitFor (delegate.getReplyStartTime());
415             return null;
416         }
417
418         public org.omg.CORBA.portable.OutputStream JavaDoc createExceptionReply()
419         {
420             return null;
421         }
422     }
423
424     private static class ExceptionHolderFactory
425         implements org.omg.CORBA.portable.ValueFactory JavaDoc
426     {
427         public java.io.Serializable JavaDoc read_value
428                         ( org.omg.CORBA_2_3.portable.InputStream JavaDoc is )
429         {
430             ExceptionHolder result = new ExceptionHolderImpl();
431             result._read( is );
432             return result;
433         }
434     }
435
436     /**
437      * This class implements timeouts while we are waiting for
438      * replies. When it is instantiated, it takes a CORBA UtcT
439      * constructor parameter that specifies the timeout expiration
440      * time. The timer starts running as soon as the Thread is
441      * started. When the timeout goes off, this Timer makes sure
442      * that the enclosing ReplyReceiver is deactivated, and that
443      * everybody associated with it is notified appropriately.
444      * The timeout can be cancelled by calling wakeup() on a Timer.
445      */

446     private class Timer extends Thread JavaDoc
447     {
448         private boolean awakened = false;
449         private UtcT endTime;
450
451         public Timer (UtcT endTime)
452         {
453             this.endTime = endTime;
454         }
455
456         public void run()
457         {
458             synchronized (this)
459             {
460                 ReplyReceiver.this.timeoutException = false;
461                 if (!awakened)
462                 {
463                     long time = org.jacorb.util.Time.millisTo (endTime);
464                     if (time > 0)
465                     {
466                         try
467                         {
468                             this.wait (time);
469                         }
470                         catch (InterruptedException JavaDoc ex)
471                         {
472                             if (logger.isInfoEnabled())
473                                 logger.info("Interrupted while waiting for timeout");
474                         }
475                     }
476                     if (!awakened)
477                     {
478                         synchronized (ReplyReceiver.this)
479                         {
480                             ReplyReceiver.this.timeoutException = true;
481
482                             if (replyHandler != null)
483                             {
484                                 ExceptionHolderImpl exHolder =
485                                     new ExceptionHolderImpl(new org.omg.CORBA.TIMEOUT JavaDoc());
486                                 performExceptionCallback(exHolder);
487                             }
488                             ReplyReceiver.this.ready = true;
489                             ReplyReceiver.this.notifyAll();
490                         }
491                     }
492                 }
493             }
494         }
495
496         public void wakeup()
497         {
498             synchronized (this)
499             {
500                 awakened = true;
501                 ReplyReceiver.this.timeoutException = false;
502                 this.notifyAll();
503             }
504         }
505     }
506 }
507
Popular Tags