1 package org.jacorb.poa; 2 3 22 23 import org.jacorb.poa.except.*; 24 25 import org.jacorb.util.*; 26 import org.jacorb.orb.dsi.ServerRequest; 27 import org.jacorb.orb.SystemExceptionHelper; 28 import org.jacorb.orb.portableInterceptor.*; 29 import org.jacorb.orb.giop.ReplyOutputStream; 30 31 import java.util.*; 32 33 import org.apache.avalon.framework.configuration.*; 34 import org.apache.avalon.framework.logger.Logger; 35 36 import org.omg.PortableServer.Servant ; 37 import org.omg.PortableServer.ServantManager ; 38 import org.omg.PortableServer.ServantActivator ; 39 import org.omg.PortableServer.ServantLocator ; 40 import org.omg.PortableServer.DynamicImplementation ; 41 import org.omg.PortableServer.ServantLocatorPackage.CookieHolder ; 42 43 import org.omg.CORBA.CompletionStatus ; 44 import org.omg.CORBA.portable.InvokeHandler ; 45 import org.omg.GIOP.ReplyStatusType_1_2; 46 import org.omg.PortableInterceptor.*; 47 import org.omg.IOP.ServiceContext ; 48 49 56 57 public class RequestProcessor 58 extends Thread 59 implements InvocationContext, Configurable 60 { 61 private boolean start; 62 private boolean terminate; 63 private RPPoolManager poolManager; 64 65 private RequestController controller; 66 private ServerRequest request; 67 private Servant servant; 68 private ServantManager servantManager; 69 private CookieHolder cookieHolder; 70 71 78 private boolean checkReplyEndTime = false; 79 80 81 private Logger logger; 82 83 private static Map specialOperations; 84 private static int count = 0; 85 86 static 87 { 88 specialOperations = new HashMap(50); 89 specialOperations.put("_is_a", ""); 90 specialOperations.put("_interface", ""); 91 specialOperations.put("_non_existent", ""); 92 93 specialOperations.put("_get_policy", ""); 94 specialOperations.put("_set_policy_overrides", ""); 95 } 96 97 RequestProcessor (RPPoolManager _poolManager) 98 { 99 super ("RequestProcessor-" + (++count)); 100 poolManager = _poolManager; 101 } 102 103 public void configure (Configuration configuration) 104 throws ConfigurationException 105 { 106 checkReplyEndTime = configuration.getAttributeAsBoolean 107 ( 108 "jacorb.poa.check_reply_end_time", false 109 ); 110 } 111 112 115 116 synchronized void begin() 117 { 118 start = true; 119 notify(); 120 } 121 122 125 126 synchronized void end() 127 { 128 terminate = true; 129 notify(); 130 } 131 132 135 136 public byte[] getObjectId() 137 { 138 if (!start) 139 throw new POAInternalError("error: RequestProcessor not started (getObjectId)"); 140 return request.objectId(); 141 } 142 143 146 147 public org.omg.CORBA.ORB getORB() 148 { 149 if (!start) 150 throw new POAInternalError("error: RequestProcessor not started (getORB)"); 151 return controller.getORB(); 152 } 153 154 157 158 public POA getPOA() 159 { 160 if (!start) 161 throw new POAInternalError("error: RequestProcessor not started (getPOA)"); 162 return controller.getPOA(); 163 } 164 165 168 169 public Servant getServant() 170 { 171 if (!start) 172 throw new POAInternalError("error: RequestProcessor not started (getServant)"); 173 return servant; 174 } 175 176 179 180 void init(RequestController controller, 181 ServerRequest request, 182 Servant servant, 183 ServantManager servantManager) 184 { 185 this.controller = controller; 186 this.request = request; 187 this.servant = servant; 188 this.servantManager = servantManager; 189 cookieHolder = null; 190 logger = controller.getLogger(); 191 } 192 193 private void clear() 194 { 195 controller = null; 196 request = null; 197 servant = null; 198 servantManager = null; 199 cookieHolder = null; 200 } 201 202 203 206 207 private void invokeIncarnate() 208 { 209 if (logger.isDebugEnabled()) 210 { 211 logger.debug("rid: " + request.requestId() + 212 " opname: " + request.operation() + 213 " invoke incarnate on servant activator"); 214 } 215 try 216 { 217 218 servant = controller.getAOM().incarnate( request.objectId(), 219 (ServantActivator ) servantManager, 220 controller.getPOA()); 221 if (servant == null) 222 { 223 if (logger.isWarnEnabled()) 224 { 225 logger.warn("rid: " + request.requestId() + 226 " opname: " + request.operation() + 227 " incarnate: returns null"); 228 } 229 230 request.setSystemException(new org.omg.CORBA.OBJ_ADAPTER ()); 231 } 232 233 controller.getORB().set_delegate(servant); } 235 catch (org.omg.CORBA.SystemException e) 236 { 237 if (logger.isWarnEnabled()) 238 { 239 logger.warn("rid: "+request.requestId() + 240 " opname: " + request.operation() + 241 " incarnate: system exception was thrown (" + 242 e.toString() + ")"); 243 } 244 request.setSystemException(e); 245 } 246 catch (org.omg.PortableServer.ForwardRequest e) 247 { 248 if (logger.isWarnEnabled()) 249 { 250 logger.warn("rid: " + request.requestId() + 251 " opname: " + request.operation() + 252 " incarnate: forward exception was thrown (" + 253 e.getMessage() + ")"); 254 } 255 request.setLocationForward(e); 256 257 } 258 catch (Throwable e) 259 { 260 261 if (logger.isErrorEnabled()) 262 { 263 logger.error("rid: " + request.requestId() + 264 " opname: " + request.operation() + 265 " incarnate: throwable was thrown (" + 266 e.getClass().getName() + ")", e); 267 } 268 request.setSystemException(new org.omg.CORBA.OBJ_ADAPTER (e.getMessage())); 269 } 270 } 271 272 273 276 277 private void invokeOperation() 278 { 279 try 280 { 281 if (servant instanceof org.omg.CORBA.portable.InvokeHandler ) 282 { 283 if (logger.isDebugEnabled()) 284 { 285 logger.debug("rid: " + request.requestId() + 286 " opname: " + request.operation() + 287 " invokeOperation on servant (stream based)"); 288 } 289 290 if( specialOperations.containsKey(request.operation())) 291 { 292 ((org.jacorb.orb.ServantDelegate)servant._get_delegate())._invoke(servant, 293 request.operation(), 294 request.getInputStream(), 295 request); 296 } 297 else 298 { 299 ((InvokeHandler ) servant)._invoke(request.operation(), 300 request.getInputStream(), 301 request); 302 } 303 304 } 305 else if (servant instanceof org.omg.PortableServer.DynamicImplementation ) 306 { 307 if (logger.isDebugEnabled()) 308 { 309 logger.debug("rid: " + request.requestId() + 310 " opname: " + request.operation() + 311 " invoke operation on servant (dsi based)"); 312 } 313 if( specialOperations.containsKey(request.operation()) && 314 !(servant instanceof org.jacorb.orb.Forwarder) ) 315 { 316 ((org.jacorb.orb.ServantDelegate)servant._get_delegate()) 317 ._invoke(servant, 318 request.operation(), 319 request.getInputStream(), 320 request); 321 } 322 else 323 { 324 ((DynamicImplementation ) servant).invoke(request); 325 } 326 } 327 else 328 { 329 if (logger.isWarnEnabled()) 330 { 331 logger.warn("rid: " + request.requestId() + 332 " opname: " + request.operation() + 333 " unknown servant type (neither stream nor dsi based)"); 334 } 335 } 336 337 } 338 catch (org.omg.CORBA.SystemException e) 339 { 340 if (logger.isInfoEnabled()) 341 { 342 logger.info("rid: " + request.requestId() + 343 " opname: " + request.operation() + 344 " invocation: system exception was thrown (" + 345 e.toString() + ")"); 346 } 347 request.setSystemException(e); 348 } 349 catch (Throwable e) 350 { 351 352 if (logger.isErrorEnabled()) 353 { 354 logger.error("rid: " + request.requestId() + 355 " opname: " + request.operation() + 356 " invocation: throwable was thrown (" + 357 e.getClass().getName() + ")", e); 358 } 359 request.setSystemException (new org.omg.CORBA.UNKNOWN (e.toString())); 360 } 361 } 362 363 364 367 368 private void invokePostInvoke() 369 { 370 try 371 { 372 if (logger.isDebugEnabled()) 373 { 374 logger.debug("rid: " + request.requestId() + 375 " opname: " + request.operation() + 376 " invoke postinvoke on servant locator"); 377 } 378 379 ((ServantLocator ) servantManager).postinvoke(request.objectId(), 380 controller.getPOA(), 381 request.operation(), 382 cookieHolder.value, 383 servant); 384 } 385 catch (org.omg.CORBA.SystemException e) 386 { 387 if (logger.isInfoEnabled()) 388 { 389 logger.info("rid: " + request.requestId() + 390 " opname: " + request.operation() + 391 " postinvoke: system exception was thrown (" + 392 e.getMessage()+")"); 393 } 394 request.setSystemException(e); 395 396 } 397 catch (Throwable e) 398 { 399 400 if (logger.isWarnEnabled()) 401 { 402 logger.warn("rid: " + request.requestId() + 403 " opname: " + request.operation() + 404 " postinvoke: throwable was thrown" + e.getMessage()); 405 } 406 request.setSystemException(new org.omg.CORBA.OBJ_ADAPTER ()); 407 408 } 409 } 410 411 412 415 416 private void invokePreInvoke() 417 { 418 if (logger.isDebugEnabled()) 419 { 420 logger.debug("rid: " + request.requestId() + 421 " opname: " + request.operation() + 422 " invoke preinvoke on servant locator"); 423 } 424 try 425 { 426 cookieHolder = new CookieHolder (); 427 servant = ((ServantLocator ) servantManager).preinvoke(request.objectId(), 428 controller.getPOA(), 429 request.operation(), 430 cookieHolder); 431 if (servant == null) 432 { 433 if (logger.isWarnEnabled()) 434 { 435 logger.warn("rid: " + request.requestId() + 436 " opname: " + request.operation() + 437 " preinvoke: returns null"); 438 } 439 request.setSystemException(new org.omg.CORBA.OBJ_ADAPTER ()); 440 } 441 controller.getORB().set_delegate( servant ); 443 } 444 catch (org.omg.CORBA.SystemException e) 445 { 446 if (logger.isInfoEnabled()) 447 { 448 logger.info("rid: " + request.requestId() + 449 " opname: " + request.operation() + 450 " preinvoke: system exception was thrown (" + 451 e.getMessage() +")"); 452 } 453 request.setSystemException(e); 454 455 } 456 catch (org.omg.PortableServer.ForwardRequest e) 457 { 458 if (logger.isInfoEnabled()) 459 { 460 logger.info("rid: " + request.requestId() + 461 " opname: " + request.operation() + 462 " preinvoke: forward exception was thrown (" + 463 e.getMessage() + ")"); 464 } 465 request.setLocationForward(e); 466 } 467 catch (Throwable e) 468 { 469 470 if (logger.isWarnEnabled()) 471 { 472 logger.warn("rid: " + request.requestId() + 473 " opname: " + request.operation() + 474 " preinvoke: throwable was thrown, " + 475 e.getMessage()); 476 } 477 request.setSystemException(new org.omg.CORBA.OBJ_ADAPTER (e.getMessage())); 478 479 } 480 } 481 482 boolean isActive() 483 { 484 return start; 485 } 486 487 488 491 492 private void process() 493 { 494 ServerRequestInfoImpl info = null; 495 496 if (controller.getORB().hasServerRequestInterceptors()) 497 { 498 info = new ServerRequestInfoImpl(controller.getORB(), 500 request, 501 servant); 502 503 InterceptorManager manager = controller.getORB().getInterceptorManager(); 504 info.setCurrent (manager.getEmptyCurrent()); 505 506 if(! invokeInterceptors( info, 507 ServerInterceptorIterator. 508 RECEIVE_REQUEST_SERVICE_CONTEXTS)) 509 { 510 ReplyOutputStream out = request.getReplyOutputStream(); 514 Enumeration ctx = info.getReplyServiceContexts(); 515 516 while( ctx.hasMoreElements() ) 517 { 518 out.addServiceContext( (ServiceContext ) ctx.nextElement() ); 519 } 520 521 return; 522 } 523 524 manager.setTSCurrent(info.current()); 525 } 526 527 if (Time.hasPassed (request.getRequestEndTime())) 530 { 531 request.setSystemException 532 (new org.omg.CORBA.TIMEOUT ("Request End Time exceeded", 533 0, CompletionStatus.COMPLETED_NO)); 534 return; 535 } 536 if (checkReplyEndTime && Time.hasPassed (request.getReplyEndTime())) 537 { 538 request.setSystemException 539 (new org.omg.CORBA.TIMEOUT ("Reply End Time exceeded", 540 0, CompletionStatus.COMPLETED_NO)); 541 return; 542 } 543 544 Time.waitFor (request.getRequestStartTime()); 545 546 if (servantManager != null) 547 { 548 if (servantManager instanceof org.omg.PortableServer.ServantActivator ) 549 invokeIncarnate(); 550 else 551 invokePreInvoke(); 552 } 553 554 if (servant != null) 555 { 556 if (info != null) 557 { 558 info.setServant(servant); 559 560 if (servant instanceof org.omg.CORBA.portable.InvokeHandler ) 561 { 562 if(! invokeInterceptors(info, 563 ServerInterceptorIterator.RECEIVE_REQUEST )) 564 { 565 570 if( cookieHolder != null ) 571 { 572 invokePostInvoke(); 573 } 574 575 ReplyOutputStream out = 576 request.getReplyOutputStream(); 577 Enumeration ctx = 578 info.getReplyServiceContexts(); 579 580 while( ctx.hasMoreElements() ) 581 { 582 out.addServiceContext( (ServiceContext ) ctx.nextElement() ); 583 } 584 585 return; 586 } 587 } 588 else if (servant instanceof org.omg.PortableServer.DynamicImplementation ) 589 request.setServerRequestInfo(info); 590 591 } 592 593 invokeOperation(); 594 } 595 596 599 if (cookieHolder != null) 600 { 601 invokePostInvoke(); 602 } 603 604 if (checkReplyEndTime && Time.hasPassed (request.getReplyEndTime())) 605 { 606 request.setSystemException 607 (new org.omg.CORBA.TIMEOUT ("Reply End Time exceeded after invocation", 608 0, CompletionStatus.COMPLETED_YES)); 609 } 610 611 if (info != null) 612 { 613 InterceptorManager manager = 614 controller.getORB().getInterceptorManager(); 615 info.setCurrent (manager.getCurrent()); 616 617 short op = 0; 618 switch(request.status().value()) 619 { 620 case ReplyStatusType_1_2._NO_EXCEPTION : 621 op = ServerInterceptorIterator.SEND_REPLY; 622 info.setReplyStatus (SUCCESSFUL.value); 623 break; 624 625 case ReplyStatusType_1_2._USER_EXCEPTION : 626 info.setReplyStatus (USER_EXCEPTION.value); 627 SystemExceptionHelper.insert(info.sending_exception, 628 new org.omg.CORBA.UNKNOWN ("Stream-based UserExceptions are not available!")); 629 op = ServerInterceptorIterator.SEND_EXCEPTION; 630 break; 631 632 case ReplyStatusType_1_2._SYSTEM_EXCEPTION : 633 info.setReplyStatus (SYSTEM_EXCEPTION.value); 634 SystemExceptionHelper.insert(info.sending_exception, 635 request.getSystemException()); 636 op = ServerInterceptorIterator.SEND_EXCEPTION; 637 break; 638 639 case ReplyStatusType_1_2._LOCATION_FORWARD : 640 info.setReplyStatus (LOCATION_FORWARD.value); 641 op = ServerInterceptorIterator.SEND_OTHER; 642 break; 643 } 644 645 invokeInterceptors(info, op); 646 647 ReplyOutputStream out = 648 request.get_out(); 649 Enumeration ctx = 650 info.getReplyServiceContexts(); 651 652 while( ctx.hasMoreElements() ) 653 { 654 out.addServiceContext( (ServiceContext ) ctx.nextElement() ); 655 } 656 657 manager.removeTSCurrent(); 658 } 659 } 660 661 private boolean invokeInterceptors( ServerRequestInfoImpl info, 662 short op ) 663 { 664 665 ServerInterceptorIterator intercept_iter = 666 controller.getORB().getInterceptorManager().getServerIterator(); 667 668 try 669 { 670 intercept_iter.iterate(info, op); 671 } 672 catch(org.omg.CORBA.UserException ue) 673 { 674 if (ue instanceof org.omg.PortableInterceptor.ForwardRequest ) 675 { 676 org.omg.PortableInterceptor.ForwardRequest fwd = 677 (org.omg.PortableInterceptor.ForwardRequest ) ue; 678 679 request.setLocationForward( new org.omg.PortableServer. 680 ForwardRequest(fwd.forward) ); 681 } 682 return false; 683 684 } 685 catch (org.omg.CORBA.SystemException _sys_ex) 686 { 687 request.setSystemException(_sys_ex); 688 return false; 689 } 690 return true; 691 } 692 693 694 697 698 public void run() 699 { 700 while (!terminate) 701 { 702 synchronized (this) 703 { 704 while (! start) 705 { 706 try 707 { 708 wait(); 709 } 710 catch (InterruptedException e) 711 { 712 } 713 714 if (terminate) 715 { 716 return; 717 } 718 } 719 } 720 721 if (logger.isDebugEnabled()) 722 { 723 logger.debug("rid: " + request.requestId() + 724 " opname: " + request.operation() + 725 " starts with request processing"); 726 } 727 728 if (request.syncScope() == org.omg.Messaging.SYNC_WITH_SERVER.value) 729 { 730 controller.returnResult (request); 731 process(); 732 } 733 else 734 { 735 process(); 736 controller.returnResult (request); 737 738 } 739 740 if (logger.isDebugEnabled()) 742 { 743 logger.debug("rid: " + request.requestId() + 744 " opname: " + request.operation() + 745 " ends with request processing"); 746 } 747 748 controller.finish (request); 749 750 start = false; 751 clear(); 752 753 poolManager.releaseProcessor(this); 755 } 756 } 757 } 758
| Popular Tags
|