1 package org.jacorb.poa; 2 3 22 23 import org.jacorb.poa.util.*; 24 import org.jacorb.poa.except.*; 25 26 import org.jacorb.orb.dsi.ServerRequest; 27 28 import org.omg.PortableServer.POAManagerPackage.State ; 29 import org.omg.PortableServer.Servant ; 30 import org.omg.PortableServer.ServantManager ; 31 32 import org.apache.avalon.framework.logger.Logger; 33 import org.apache.avalon.framework.configuration.*; 34 35 import java.util.*; 36 37 44 45 public final class RequestController 46 extends Thread 47 implements Configurable 48 { 49 private POA poa; 50 private org.jacorb.orb.ORB orb; 51 private RequestQueue requestQueue; 52 private AOM aom; 53 private RPPoolManager poolManager; 54 private int localRequests = 0; 55 56 58 private static RPPoolManager singletonPoolManager; 59 private static int count = 0; 60 61 62 private org.jacorb.config.Configuration configuration = null; 63 64 65 private Logger logger; 66 private int threadPoolMin = 0; 67 private int threadPoolMax = 0; 68 private boolean configured = false; 69 70 private Hashtable activeRequestTable; 72 private Vector deactivationList = new Vector(); 75 77 private boolean terminate; 79 private boolean waitForCompletionCalled; 80 private boolean waitForShutdownCalled; 81 private java.lang.Object queueLog = new java.lang.Object (); 82 private int threadPriority = Thread.MAX_PRIORITY; 83 84 87 88 private RequestController() 89 { 90 } 91 92 95 96 RequestController( POA _poa, 97 org.jacorb.orb.ORB _orb, 98 AOM _aom) 99 { 100 super("RequestController-" + (++count)); 101 poa = _poa; 102 aom = _aom; 103 orb = _orb; 104 105 requestQueue = new RequestQueue(this); 106 107 activeRequestTable = 108 poa.isSingleThreadModel() ? 109 new Hashtable(1) : 110 new Hashtable(threadPoolMax); 111 } 112 113 114 public void configure(Configuration myConfiguration) 115 throws ConfigurationException 116 { 117 this.configuration = 118 (org.jacorb.config.Configuration)myConfiguration; 119 120 logger = configuration.getNamedLogger("jacorb.poa.controller"); 121 122 requestQueue.configure(myConfiguration); 123 124 threadPoolMin = 125 configuration.getAttributeAsInteger("jacorb.poa.thread_pool_min", 5); 126 127 threadPoolMax = 128 configuration.getAttributeAsInteger("jacorb.poa.thread_pool_max", 20); 129 130 threadPriority = 131 configuration.getAttributeAsInteger("jacorb.poa.thread_priority", 132 Thread.MAX_PRIORITY); 133 134 if( threadPriority < Thread.MIN_PRIORITY ) 135 threadPriority = Thread.MIN_PRIORITY; 136 else if( threadPriority > Thread.MAX_PRIORITY ) 137 threadPriority = Thread.MAX_PRIORITY; 138 139 setPriority(threadPriority); 140 setDaemon(true); 141 142 configured = true; 143 getPoolManager(); 144 start(); 145 } 146 147 148 void clearUpPool() 149 { 150 getPoolManager().destroy(); 151 } 152 153 156 157 void clearUpQueue(org.omg.CORBA.SystemException exception) 158 { 159 ServerRequest request; 160 while ((request = requestQueue.removeLast()) != null) 161 { 162 rejectRequest(request, exception); 163 } 164 } 165 166 171 172 void continueToWork() 173 { 174 synchronized (queueLog) 175 { 176 queueLog.notifyAll(); 177 } 178 } 179 180 synchronized void end() 181 { 182 terminate = true; 183 continueToWork(); 184 } 185 186 190 191 synchronized void freeObject( byte[] oid ) 192 { 193 deactivationList.removeElement( new ByteArrayKey( oid ) ); 194 } 195 196 AOM getAOM() 197 { 198 return aom; 199 } 200 201 202 Logger getLogger() 203 { 204 return logger; 205 } 206 207 208 org.jacorb.orb.ORB getORB() 209 { 210 return orb; 211 } 212 213 214 POA getPOA() 215 { 216 return poa; 217 } 218 219 220 RPPoolManager getPoolManager() 221 { 222 if (!configured) 223 throw new Error ("Internal: not configured"); 224 225 if (poolManager == null) 226 { 227 if (poa.isSingleThreadModel()) 228 { 229 if (singletonPoolManager == null) 230 { 231 singletonPoolManager = 232 new RPPoolManager(orb.getPOACurrent(), 1, 1, 233 logger, configuration); 234 } 235 poolManager = singletonPoolManager; 236 } 237 else 238 { 239 poolManager = 240 new RPPoolManager(orb.getPOACurrent(), 241 threadPoolMin, threadPoolMax, 242 logger, configuration); 243 } 244 } 245 return poolManager; 246 } 247 248 249 RequestQueue getRequestQueue() { 250 return requestQueue; 251 } 252 253 254 boolean isDeactivating (ByteArrayKey oid) 255 { 256 return deactivationList.contains( oid ); 257 } 258 259 260 265 266 private void processRequest(ServerRequest request) 267 throws ShutdownInProgressException, CompletionRequestedException 268 { 269 Servant servant = null; 270 ServantManager servantManager = null; 271 boolean invalid = false; 272 ByteArrayKey oid = new ByteArrayKey( request.objectId() ); 273 274 synchronized (this) 275 { 276 if (waitForCompletionCalled) 277 { 278 279 280 if (logger.isInfoEnabled()) 281 { 282 logger.info("rid: " + request.requestId() + 283 " opname: " + request.operation() + 284 " cannot process request because waitForCompletion was called"); 285 } 286 throw new CompletionRequestedException(); 287 } 288 289 if (waitForShutdownCalled) 290 { 291 292 if (logger.isInfoEnabled()) 293 { 294 logger.info("rid: " + request.requestId() + 295 " opname: " + request.operation() + 296 " cannot process request because POA shutdown in progress"); 297 } 298 throw new ShutdownInProgressException(); 299 } 300 301 302 303 if ((aom != null && aom.isDeactivating( oid )) || 304 deactivationList.contains( oid )) 305 { 306 if (!poa.isUseServantManager() && !poa.isUseDefaultServant()) 307 { 308 if (logger.isInfoEnabled()) 309 { 310 logger.info("rid: " + request.requestId() + 311 " opname: " + request.operation() + 312 " cannot process request, because object is already in the deactivation process"); 313 } 314 315 throw new org.omg.CORBA.OBJECT_NOT_EXIST (); 316 } 317 invalid = true; 318 } 319 320 322 323 if (!invalid && poa.isRetain()) 324 { 325 servant = aom.getServant(request.objectId()); 326 } 327 328 if (servant == null) 329 { 330 if (poa.isUseDefaultServant()) 331 { 332 if ((servant = poa.defaultServant) == null) 333 { 334 if (logger.isWarnEnabled()) 335 { 336 logger.warn("rid: " + request.requestId() + 337 " opname: " + request.operation() + 338 " cannot process request because default servant is not set"); 339 } 340 throw new org.omg.CORBA.OBJ_ADAPTER (); 341 } 342 343 } 344 else if (poa.isUseServantManager()) 345 { 346 if ((servantManager = poa.servantManager) == null) 347 { 348 if (logger.isWarnEnabled()) 349 { 350 logger.warn("rid: " + request.requestId() + 351 " opname: " + request.operation() + 352 " cannot process request because servant manager is not set"); 353 } 354 throw new org.omg.CORBA.OBJ_ADAPTER (); 355 } 356 } 358 else 359 { 360 if (logger.isWarnEnabled()) 361 { 362 logger.warn("rid: " + request.requestId() + 363 " opname: " + request.operation() + 364 " cannot process request, because object doesn't exist"); 365 } 366 throw new org.omg.CORBA.OBJECT_NOT_EXIST (); 367 } 368 } 369 371 activeRequestTable.put(request, oid); 372 } 373 374 if (logger.isDebugEnabled()) 376 { 377 logger.debug("rid: " + request.requestId() + 378 " opname: " + request.operation() + 379 " trying to get a RequestProcessor"); 380 } 381 382 RequestProcessor processor = getPoolManager().getProcessor(); 383 processor.init(this, request, servant, servantManager); 384 processor.begin(); 385 } 386 387 388 void queueRequest(ServerRequest request) 389 throws ResourceLimitReachedException 390 { 391 requestQueue.add(request); 392 } 393 394 398 399 void rejectRequest(ServerRequest request, 400 org.omg.CORBA.SystemException exception) 401 { 402 if (exception != null) 403 request.setSystemException(exception); 404 405 orb.getBasicAdapter().return_result(request); 406 407 if (logger.isWarnEnabled()) 408 { 409 logger.warn("rid: " + request.requestId() + 410 " opname: " + request.operation() + 411 " request rejected with exception: " + 412 exception.getMessage()); 413 } 414 } 415 416 420 421 synchronized void resetPreviousCompletionCall() 422 { 423 if (logger.isDebugEnabled()) 424 logger.debug("reset a previous completion call"); 425 426 waitForCompletionCalled = false; 427 notifyAll(); 428 } 429 430 433 void returnResult(ServerRequest request) 434 { 435 orb.getBasicAdapter().return_result(request); 436 } 437 438 442 synchronized void finish (ServerRequest request) 443 { 444 activeRequestTable.remove (request); 445 notifyAll(); 446 } 447 448 451 452 public void run() 453 { 454 org.omg.PortableServer.POAManagerPackage.State state; 455 ServerRequest request; 456 org.omg.CORBA.OBJ_ADAPTER closed_connection_exception = 457 new org.omg.CORBA.OBJ_ADAPTER ("connection closed: adapter inactive"); 458 459 org.omg.CORBA.TRANSIENT transient_exception = new org.omg.CORBA.TRANSIENT (); 460 while (!terminate) 461 { 462 state = poa.getState(); 463 if (POAUtil.isActive(state)) 464 { 465 request = requestQueue.getFirst(); 466 467 468 if (request != null) 469 { 470 if (request.remainingPOAName() != null) 471 { 472 orb.getBasicAdapter().deliverRequest(request, poa); 473 requestQueue.removeFirst(); 474 } 475 else 476 { 477 try 478 { 479 processRequest(request); 480 requestQueue.removeFirst(); 481 } 482 catch (CompletionRequestedException e) 483 { 484 491 } 492 catch (ShutdownInProgressException e) 493 { 494 495 waitForQueue(); 496 } 497 catch (org.omg.CORBA.OBJ_ADAPTER e) 498 { 499 requestQueue.removeFirst(); 500 rejectRequest(request, e); 501 } 502 catch (org.omg.CORBA.OBJECT_NOT_EXIST e) 503 { 504 requestQueue.removeFirst(); 505 rejectRequest(request, e); 506 } 507 } 508 continue; 509 } 510 } 511 else 512 { 513 if (!waitForShutdownCalled && (POAUtil.isDiscarding(state) || POAUtil.isInactive(state))) 514 { 515 request = requestQueue.removeLast(); 516 517 518 if (request != null) 519 { 520 if (POAUtil.isDiscarding(state)) 521 { 522 rejectRequest(request, transient_exception); 523 } 524 else 525 { 526 rejectRequest(request, closed_connection_exception); 527 } 528 continue; 529 } 530 } 531 } 532 539 waitForQueue(); 540 } 541 } 542 543 549 550 synchronized void waitForCompletion() 551 { 552 waitForCompletionCalled = true; 553 554 while (waitForCompletionCalled && !activeRequestTable.isEmpty()) 555 { 556 try 557 { 558 if (logger.isDebugEnabled()) 559 logger.debug("somebody waits for completion and there are active processors"); 560 wait(); 561 } 562 catch (InterruptedException e) 563 { 564 } 565 } 566 } 567 568 578 579 synchronized void waitForObjectCompletion( byte[] oid ) 580 { 581 ByteArrayKey oidbak = new ByteArrayKey( oid ); 582 583 while (activeRequestTable.contains(oidbak)) 584 { 585 try 586 { 587 wait(); 588 } 589 catch (InterruptedException e) 590 { 591 } 592 } 593 if (logger.isDebugEnabled()) 594 { 595 logger.debug( POAUtil.convert(oid) + 596 "all active processors for this object have finished"); 597 598 } 599 600 deactivationList.addElement( oidbak ); 601 } 602 603 609 610 private void waitForQueue() 611 { 612 synchronized (queueLog) 613 { 614 if ((requestQueue.isEmpty() || poa.isHolding() || waitForShutdownCalled) && 615 !terminate) 616 { 617 try 618 { 619 if (logger.isDebugEnabled()) 620 { 621 logger.debug("waiting for queue"); 622 } 623 queueLog.wait(); 624 } 625 catch (java.lang.InterruptedException e) 626 { 627 } 628 } 629 } 630 } 631 632 638 synchronized void waitForShutdown() 639 { 640 waitForShutdownCalled = true; 641 642 while ((waitForShutdownCalled && ! activeRequestTable.isEmpty()) 643 || (localRequests != 0) 644 ) 645 { 646 try 647 { 648 if (logger.isDebugEnabled()) 649 { 650 logger.debug("somebody waits for shutdown and there are active processors"); 651 } 652 wait(); 653 } 654 catch (InterruptedException e) 655 { 656 } 657 } 658 } 659 660 synchronized void addLocalRequest() 661 { 662 localRequests++; 663 } 664 665 synchronized void removeLocalRequest() 666 { 667 localRequests--; 668 notifyAll(); 669 } 670 } 671 | Popular Tags |