1 package org.jacorb.orb; 2 3 22 23 import java.util.*; 24 25 import org.apache.avalon.framework.logger.Logger; 26 import org.apache.avalon.framework.configuration.Configurable; 27 import org.apache.avalon.framework.configuration.DefaultConfiguration; 28 import org.apache.avalon.framework.configuration.ConfigurationException; 29 30 import org.jacorb.orb.giop.MessageInputStream; 31 import org.jacorb.orb.giop.ReplyInputStream; 32 import org.jacorb.orb.giop.ReplyPlaceholder; 33 import org.jacorb.util.Time; 34 35 import org.omg.CORBA.MARSHAL ; 36 import org.omg.CORBA.SystemException ; 37 import org.omg.CORBA.portable.ApplicationException ; 38 import org.omg.CORBA.portable.InvokeHandler ; 39 import org.omg.CORBA.portable.RemarshalException ; 40 import org.omg.CORBA.portable.ServantObject ; 41 import org.omg.GIOP.ReplyStatusType_1_2; 42 import org.omg.Messaging.ExceptionHolder; 43 import org.omg.TimeBase.UtcT; 44 45 58 59 public class ReplyReceiver 60 extends ReplyPlaceholder 61 implements Configurable 62 { 63 private org.jacorb.orb.Delegate delegate = null; 64 private ClientInterceptorHandler interceptors = null; 65 66 private org.omg.Messaging.ReplyHandler replyHandler = null; 67 68 private String operation; 69 private UtcT replyEndTime; 70 private Timer timer; 71 72 73 private org.apache.avalon.framework.configuration.Configuration configuration = null; 74 private Logger logger; 75 76 77 private boolean retry_on_failure = false; 78 79 80 public ReplyReceiver( org.jacorb.orb.Delegate delegate, 81 String operation, 82 org.omg.TimeBase.UtcT replyEndTime, 83 ClientInterceptorHandler interceptors, 84 org.omg.Messaging.ReplyHandler replyHandler ) 85 { 86 super((org.jacorb.orb.ORB)delegate.orb(null)); 87 88 this.delegate = delegate; 89 this.operation = operation; 90 this.replyEndTime = replyEndTime; 91 this.interceptors = interceptors; 92 this.replyHandler = replyHandler; 93 94 if (replyEndTime != null) 95 { 96 timer = new Timer(replyEndTime); 97 timer.setName("ReplyReceiver Timer" ); 98 timer.start(); 99 } 100 else 101 { 102 timer = null; 103 } 104 105 } 106 107 public void configure(org.apache.avalon.framework.configuration.Configuration configuration) 108 throws org.apache.avalon.framework.configuration.ConfigurationException 109 { 110 this.configuration = configuration; 111 logger = 112 ((org.jacorb.config.Configuration)configuration).getNamedLogger("jacorb.orb.rep_recv"); 113 retry_on_failure = 114 configuration.getAttribute("jacorb.connection.client.retry_on_failure","off").equals("on"); 115 } 116 117 118 public void replyReceived( MessageInputStream in ) 119 { 120 if (timeoutException) 121 return; if (timer != null) 123 timer.wakeup(); 124 125 Set pending_replies = delegate.get_pending_replies(); 126 synchronized ( pending_replies ) 129 { 130 synchronized (this) 136 { 137 if (timeoutException) 138 return; 140 this.in = in; 141 delegate.replyDone (this); 142 143 if (replyHandler != null) 144 { 145 performCallback ((ReplyInputStream)in); 147 } 148 else 149 { 150 ready = true; 152 notifyAll(); 153 } 154 } 155 } 156 } 157 158 private void performCallback ( ReplyInputStream reply ) 159 { 160 162 org.omg.CORBA.portable.Delegate replyHandlerDelegate = 163 ( ( org.omg.CORBA.portable.ObjectImpl ) replyHandler ) 164 ._get_delegate(); 165 166 ServantObject so = 167 replyHandlerDelegate.servant_preinvoke( replyHandler, 168 operation, 169 InvokeHandler .class ); 170 try 171 { 172 switch ( reply.getStatus().value() ) 173 { 174 case ReplyStatusType_1_2._NO_EXCEPTION: 175 { 176 ((InvokeHandler )so.servant) 177 ._invoke( operation, 178 reply, 179 new DummyResponseHandler() ); 180 break; 181 } 182 case ReplyStatusType_1_2._USER_EXCEPTION: 183 case ReplyStatusType_1_2._SYSTEM_EXCEPTION: 184 { 185 ExceptionHolderImpl holder = 186 new ExceptionHolderImpl( reply ); 187 188 org.omg.CORBA_2_3.ORB orb = 189 ( org.omg.CORBA_2_3.ORB )replyHandlerDelegate 190 .orb( null ); 191 orb.register_value_factory 192 ( "IDL:omg.org/Messaging/ExceptionHolder:1.0", 193 new ExceptionHolderFactory() ); 194 195 CDRInputStream input = 196 new CDRInputStream( orb, holder.marshal() ); 197 198 ((InvokeHandler )so.servant) 199 ._invoke( operation + "_excep", 200 input, 201 new DummyResponseHandler() ); 202 break; 203 } 204 } 205 } 206 catch ( Exception e ) 207 { 208 if (logger.isWarnEnabled()) 209 logger.warn("Exception during callback: " + e.getMessage() ); 210 } 211 finally 212 { 213 replyHandlerDelegate.servant_postinvoke( replyHandler, so ); 214 } 215 } 216 217 221 private void performExceptionCallback (ExceptionHolderImpl holder) 222 { 223 225 org.omg.CORBA.portable.Delegate replyHandlerDelegate = 226 ( ( org.omg.CORBA.portable.ObjectImpl ) replyHandler ) 227 ._get_delegate(); 228 229 ServantObject so = 230 replyHandlerDelegate.servant_preinvoke( replyHandler, 231 operation, 232 InvokeHandler .class ); 233 try 234 { 235 org.omg.CORBA_2_3.ORB orb = 236 ( org.omg.CORBA_2_3.ORB )replyHandlerDelegate 237 .orb( null ); 238 orb.register_value_factory 239 ( "IDL:omg.org/Messaging/ExceptionHolder:1.0", 240 new ExceptionHolderFactory() ); 241 242 CDRInputStream input = 243 new CDRInputStream( orb, holder.marshal() ); 244 245 ((InvokeHandler )so.servant) 246 ._invoke( operation + "_excep", 247 input, 248 new DummyResponseHandler() ); 249 } 250 catch ( Exception e ) 251 { 252 if (logger.isWarnEnabled()) 253 logger.warn("Exception during callback: " + e.getMessage() ); 254 } 255 finally 256 { 257 replyHandlerDelegate.servant_postinvoke( replyHandler, so ); 258 } 259 } 260 261 262 263 267 public synchronized ReplyInputStream getReply() 268 throws RemarshalException , ApplicationException 269 { 270 try 271 { 272 try 276 { 277 getInputStream(); } 279 catch (org.omg.CORBA.COMM_FAILURE ex) 280 { 281 if (retry_on_failure) 282 { 283 throw new RemarshalException (); 284 } 285 else 286 { 287 throw ex; 289 } 290 } 291 } 292 catch ( SystemException se ) 293 { 294 interceptors.handle_receive_exception( se ); 295 throw se; 296 } 297 catch ( RemarshalException re ) 298 { 299 delegate.waitOnBarrier(); 302 throw new RemarshalException (); 303 } 304 305 ReplyInputStream reply = ( ReplyInputStream ) in; 306 307 ReplyStatusType_1_2 status = delegate.doNotCheckExceptions() 308 ? ReplyStatusType_1_2.NO_EXCEPTION 309 : reply.getStatus(); 310 311 switch ( status.value() ) 312 { 313 case ReplyStatusType_1_2._NO_EXCEPTION: 314 { 315 interceptors.handle_receive_reply ( reply ); 316 return reply; 317 } 318 case ReplyStatusType_1_2._USER_EXCEPTION: 319 { 320 ApplicationException ae = getApplicationException ( reply ); 321 interceptors.handle_receive_exception( ae, reply ); 322 throw ae; 323 } 324 case ReplyStatusType_1_2._SYSTEM_EXCEPTION: 325 { 326 SystemException se = SystemExceptionHelper.read ( reply ); 327 interceptors.handle_receive_exception( se, reply ); 328 throw se; 329 } 330 case ReplyStatusType_1_2._LOCATION_FORWARD: 331 case ReplyStatusType_1_2._LOCATION_FORWARD_PERM: 332 { 333 org.omg.CORBA.Object forward_reference = reply.read_Object(); 334 interceptors.handle_location_forward( reply, forward_reference ); 335 doRebind( forward_reference ); 336 throw new RemarshalException (); 337 } 338 case ReplyStatusType_1_2._NEEDS_ADDRESSING_MODE: 339 { 340 throw new org.omg.CORBA.NO_IMPLEMENT ( 341 "WARNING: Got reply status NEEDS_ADDRESSING_MODE " 342 + "(not implemented)." ); 343 } 344 default: 345 { 346 throw new MARSHAL 347 ("Received unexpected reply status: " + status.value() ); 348 } 349 } 350 } 351 352 private void doRebind ( org.omg.CORBA.Object forward_reference ) 353 { 354 try 355 { 356 delegate.lockBarrier(); 358 359 Set pending_replies = delegate.get_pending_replies(); 362 synchronized ( pending_replies ) 363 { 364 for ( Iterator i = pending_replies.iterator(); i.hasNext(); ) 365 { 366 ReplyPlaceholder p = ( ReplyPlaceholder ) i.next(); 367 p.retry(); 368 } 369 } 370 371 delegate.rebind ( forward_reference ); 373 } 374 finally 375 { 376 delegate.openBarrier(); 378 } 379 } 380 381 private ApplicationException getApplicationException ( ReplyInputStream reply ) 382 { 383 reply.mark( 0 ); 384 String id = reply.read_string(); 385 386 try 387 { 388 reply.reset(); 389 } 390 catch( java.io.IOException ioe ) 391 { 392 if (logger.isErrorEnabled()) 394 logger.error("Exception int reset(): " + ioe.getMessage() ); 395 } 396 397 return new ApplicationException ( id, reply ); 398 } 399 400 408 private class DummyResponseHandler 409 implements org.omg.CORBA.portable.ResponseHandler 410 { 411 public org.omg.CORBA.portable.OutputStream createReply() 412 { 413 Time.waitFor (delegate.getReplyStartTime()); 415 return null; 416 } 417 418 public org.omg.CORBA.portable.OutputStream createExceptionReply() 419 { 420 return null; 421 } 422 } 423 424 private static class ExceptionHolderFactory 425 implements org.omg.CORBA.portable.ValueFactory 426 { 427 public java.io.Serializable read_value 428 ( org.omg.CORBA_2_3.portable.InputStream is ) 429 { 430 ExceptionHolder result = new ExceptionHolderImpl(); 431 result._read( is ); 432 return result; 433 } 434 } 435 436 446 private class Timer extends Thread 447 { 448 private boolean awakened = false; 449 private UtcT endTime; 450 451 public Timer (UtcT endTime) 452 { 453 this.endTime = endTime; 454 } 455 456 public void run() 457 { 458 synchronized (this) 459 { 460 ReplyReceiver.this.timeoutException = false; 461 if (!awakened) 462 { 463 long time = org.jacorb.util.Time.millisTo (endTime); 464 if (time > 0) 465 { 466 try 467 { 468 this.wait (time); 469 } 470 catch (InterruptedException ex) 471 { 472 if (logger.isInfoEnabled()) 473 logger.info("Interrupted while waiting for timeout"); 474 } 475 } 476 if (!awakened) 477 { 478 synchronized (ReplyReceiver.this) 479 { 480 ReplyReceiver.this.timeoutException = true; 481 482 if (replyHandler != null) 483 { 484 ExceptionHolderImpl exHolder = 485 new ExceptionHolderImpl(new org.omg.CORBA.TIMEOUT ()); 486 performExceptionCallback(exHolder); 487 } 488 ReplyReceiver.this.ready = true; 489 ReplyReceiver.this.notifyAll(); 490 } 491 } 492 } 493 } 494 } 495 496 public void wakeup() 497 { 498 synchronized (this) 499 { 500 awakened = true; 501 ReplyReceiver.this.timeoutException = false; 502 this.notifyAll(); 503 } 504 } 505 } 506 } 507 | Popular Tags |