KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > poa > RequestController


1 package org.jacorb.poa;
2
3 /*
4  * JacORB - a free Java ORB
5  *
6  * Copyright (C) 1997-2004 Gerald Brose.
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the Free
20  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */

22
23 import org.jacorb.poa.util.*;
24 import org.jacorb.poa.except.*;
25
26 import org.jacorb.orb.dsi.ServerRequest;
27
28 import org.omg.PortableServer.POAManagerPackage.State JavaDoc;
29 import org.omg.PortableServer.Servant JavaDoc;
30 import org.omg.PortableServer.ServantManager JavaDoc;
31
32 import org.apache.avalon.framework.logger.Logger;
33 import org.apache.avalon.framework.configuration.*;
34
35 import java.util.*;
36
37 /**
38  * This class manages all request processing affairs. The main thread takes the
39  * requests out from the queue and will see that the necessary steps are taken.
40  *
41  * @author Reimo Tiedemann, FU Berlin
42  * @version $Id: RequestController.java,v 1.32 2004/10/18 11:07:31 simon.mcqueen Exp $
43  */

44
45 public final class RequestController
46     extends Thread JavaDoc
47     implements Configurable
48 {
49     private POA poa;
50     private org.jacorb.orb.ORB orb;
51     private RequestQueue requestQueue;
52     private AOM aom;
53     private RPPoolManager poolManager;
54     private int localRequests = 0;
55
56     /* a singleton for all POA's in one virtual machine if
57        SINGLE_THREAD_MODEL is in use */

58     private static RPPoolManager singletonPoolManager;
59     private static int count = 0;
60
61     /** the configuration object for this controller */
62     private org.jacorb.config.Configuration configuration = null;
63
64     /** this controller's logger instance */
65     private Logger logger;
66     private int threadPoolMin = 0;
67     private int threadPoolMax = 0;
68     private boolean configured = false;
69
70     // stores all active requests
71
private Hashtable activeRequestTable;
72     // RequestProcessor -> oid
73
// for synchronisation with the object deactiviation process
74
private Vector deactivationList = new Vector();
75     // oid's
76

77     // other synchronisation stuff
78
private boolean terminate;
79     private boolean waitForCompletionCalled;
80     private boolean waitForShutdownCalled;
81     private java.lang.Object JavaDoc queueLog = new java.lang.Object JavaDoc();
82     private int threadPriority = Thread.MAX_PRIORITY;
83
84     /**
85      * private constructor
86      */

87
88     private RequestController()
89     {
90     }
91
92     /**
93      * constructor
94      */

95
96     RequestController( POA _poa,
97                        org.jacorb.orb.ORB _orb,
98                        AOM _aom)
99     {
100         super("RequestController-" + (++count));
101         poa = _poa;
102         aom = _aom;
103         orb = _orb;
104
105         requestQueue = new RequestQueue(this);
106
107         activeRequestTable =
108             poa.isSingleThreadModel() ?
109             new Hashtable(1) :
110             new Hashtable(threadPoolMax);
111     }
112
113    
114     public void configure(Configuration myConfiguration)
115         throws ConfigurationException
116     {
117         this.configuration =
118             (org.jacorb.config.Configuration)myConfiguration;
119
120         logger = configuration.getNamedLogger("jacorb.poa.controller");
121
122         requestQueue.configure(myConfiguration);
123
124         threadPoolMin =
125             configuration.getAttributeAsInteger("jacorb.poa.thread_pool_min", 5);
126
127         threadPoolMax =
128             configuration.getAttributeAsInteger("jacorb.poa.thread_pool_max", 20);
129
130         threadPriority =
131             configuration.getAttributeAsInteger("jacorb.poa.thread_priority",
132                                                 Thread.MAX_PRIORITY);
133
134         if( threadPriority < Thread.MIN_PRIORITY )
135             threadPriority = Thread.MIN_PRIORITY;
136         else if( threadPriority > Thread.MAX_PRIORITY )
137             threadPriority = Thread.MAX_PRIORITY;
138
139         setPriority(threadPriority);
140         setDaemon(true);
141
142         configured = true;
143         getPoolManager();
144         start();
145     }
146
147
148     void clearUpPool()
149     {
150         getPoolManager().destroy();
151     }
152
153     /**
154      * rejects all queued requests with specified system exception
155      */

156
157     void clearUpQueue(org.omg.CORBA.SystemException JavaDoc exception)
158     {
159         ServerRequest request;
160         while ((request = requestQueue.removeLast()) != null)
161         {
162             rejectRequest(request, exception);
163         }
164     }
165
166     /**
167      * indicates that the assumptions for blocking the
168      * request controller thread have changed,
169      * a waiting request controller thread will notified
170      */

171
172     void continueToWork()
173     {
174         synchronized (queueLog)
175         {
176             queueLog.notifyAll();
177         }
178     }
179
180     synchronized void end()
181     {
182         terminate = true;
183         continueToWork();
184     }
185
186     /**
187      * frees an object from the deactivation in progress state,
188      * a call indicates that the object deactivation process is complete
189      */

190
191     synchronized void freeObject( byte[] oid )
192     {
193         deactivationList.removeElement( new ByteArrayKey( oid ) );
194     }
195
196     AOM getAOM()
197     {
198         return aom;
199     }
200
201
202     Logger getLogger()
203     {
204         return logger;
205     }
206
207
208     org.jacorb.orb.ORB getORB()
209     {
210         return orb;
211     }
212
213
214     POA getPOA()
215     {
216         return poa;
217     }
218
219
220     RPPoolManager getPoolManager()
221     {
222         if (!configured)
223             throw new Error JavaDoc("Internal: not configured");
224
225         if (poolManager == null)
226         {
227             if (poa.isSingleThreadModel())
228             {
229                 if (singletonPoolManager == null)
230                 {
231                     singletonPoolManager =
232                         new RPPoolManager(orb.getPOACurrent(), 1, 1,
233                                           logger, configuration);
234                 }
235                 poolManager = singletonPoolManager;
236             }
237             else
238             {
239                 poolManager =
240                     new RPPoolManager(orb.getPOACurrent(),
241                                       threadPoolMin, threadPoolMax,
242                                       logger, configuration);
243             }
244         }
245         return poolManager;
246     }
247
248
249     RequestQueue getRequestQueue() {
250         return requestQueue;
251     }
252
253
254     boolean isDeactivating (ByteArrayKey oid)
255     {
256         return deactivationList.contains( oid );
257     }
258
259
260     /**
261      * requests will dispatched to request processors,
262      * attention, if the processor pool is empty, this method returns only
263      * if the getProcessor() method from RequestProcessorPool can satisfied
264      */

265
266     private void processRequest(ServerRequest request)
267         throws ShutdownInProgressException, CompletionRequestedException
268     {
269         Servant JavaDoc servant = null;
270         ServantManager JavaDoc servantManager = null;
271         boolean invalid = false;
272         ByteArrayKey oid = new ByteArrayKey( request.objectId() );
273
274         synchronized (this)
275         {
276             if (waitForCompletionCalled)
277             {
278                 /* state has changed to holding, discarding or inactive */
279
280                 if (logger.isInfoEnabled())
281                 {
282                     logger.info("rid: " + request.requestId() +
283                                 " opname: " + request.operation() +
284                                 " cannot process request because waitForCompletion was called");
285                 }
286                 throw new CompletionRequestedException();
287             }
288
289             if (waitForShutdownCalled)
290             {
291                 /* poa goes down */
292                 if (logger.isInfoEnabled())
293                 {
294                     logger.info("rid: " + request.requestId() +
295                                 " opname: " + request.operation() +
296                                 " cannot process request because POA shutdown in progress");
297                 }
298                 throw new ShutdownInProgressException();
299             }
300
301             /* below this point it's save that the poa is active */
302
303             if ((aom != null && aom.isDeactivating( oid )) ||
304                 deactivationList.contains( oid ))
305             {
306                 if (!poa.isUseServantManager() && !poa.isUseDefaultServant())
307                 {
308                     if (logger.isInfoEnabled())
309                     {
310                         logger.info("rid: " + request.requestId() +
311                                     " opname: " + request.operation() +
312                                     " cannot process request, because object is already in the deactivation process");
313                     }
314
315                     throw new org.omg.CORBA.OBJECT_NOT_EXIST JavaDoc();
316                 }
317                 invalid = true;
318             }
319
320             /* below this point it's save that the object is not in a
321                deactivation process */

322
323             if (!invalid && poa.isRetain())
324             {
325                 servant = aom.getServant(request.objectId());
326             }
327
328             if (servant == null)
329             {
330                 if (poa.isUseDefaultServant())
331                 {
332                     if ((servant = poa.defaultServant) == null)
333                     {
334                         if (logger.isWarnEnabled())
335                         {
336                             logger.warn("rid: " + request.requestId() +
337                                         " opname: " + request.operation() +
338                                         " cannot process request because default servant is not set");
339                         }
340                         throw new org.omg.CORBA.OBJ_ADAPTER JavaDoc();
341                     }
342
343                 }
344                 else if (poa.isUseServantManager())
345                 {
346                     if ((servantManager = poa.servantManager) == null)
347                     {
348                         if (logger.isWarnEnabled())
349                         {
350                             logger.warn("rid: " + request.requestId() +
351                                         " opname: " + request.operation() +
352                                         " cannot process request because servant manager is not set");
353                         }
354                         throw new org.omg.CORBA.OBJ_ADAPTER JavaDoc();
355                     }
356                     // USE_OBJECT_MAP_ONLY is in effect but object not exists
357
}
358                 else
359                 {
360                     if (logger.isWarnEnabled())
361                     {
362                         logger.warn("rid: " + request.requestId() +
363                                     " opname: " + request.operation() +
364                                     " cannot process request, because object doesn't exist");
365                     }
366                     throw new org.omg.CORBA.OBJECT_NOT_EXIST JavaDoc();
367                 }
368             }
369             /* below this point it's save that the request is valid
370                (all preconditions can be met) */

371             activeRequestTable.put(request, oid);
372         }
373
374         // get and initialize a processor for request processing
375
if (logger.isDebugEnabled())
376         {
377             logger.debug("rid: " + request.requestId() +
378                          " opname: " + request.operation() +
379                          " trying to get a RequestProcessor");
380         }
381
382         RequestProcessor processor = getPoolManager().getProcessor();
383         processor.init(this, request, servant, servantManager);
384         processor.begin();
385     }
386
387
388     void queueRequest(ServerRequest request)
389         throws ResourceLimitReachedException
390     {
391         requestQueue.add(request);
392     }
393
394     /**
395      * this method calls the basic adapter and hands out the request if
396      * something went wrong, the specified system exception will set
397      */

398
399     void rejectRequest(ServerRequest request,
400                        org.omg.CORBA.SystemException JavaDoc exception)
401     {
402         if (exception != null)
403             request.setSystemException(exception);
404
405         orb.getBasicAdapter().return_result(request);
406
407         if (logger.isWarnEnabled())
408         {
409             logger.warn("rid: " + request.requestId() +
410                         " opname: " + request.operation() +
411                         " request rejected with exception: " +
412                         exception.getMessage());
413         }
414     }
415
416     /**
417      * resets a previous waitForCompletion call,
418      * everybody who is waiting will notified
419      */

420
421     synchronized void resetPreviousCompletionCall()
422     {
423         if (logger.isDebugEnabled())
424             logger.debug("reset a previous completion call");
425
426         waitForCompletionCalled = false;
427         notifyAll(); /* maybe somebody waits for completion */
428     }
429
430     /**
431      * Sends the reply of the given request via the BasicAdapter.
432      */

433     void returnResult(ServerRequest request)
434     {
435         orb.getBasicAdapter().return_result(request);
436     }
437
438     /**
439      * Called from RequestProcessor when the request has been handled.
440      * The request is removed from the active request table.
441      */

442     synchronized void finish (ServerRequest request)
443     {
444         activeRequestTable.remove (request);
445         notifyAll();
446     }
447
448     /**
449      * the main loop for dispatching requests to request processors
450      */

451
452     public void run()
453     {
454         org.omg.PortableServer.POAManagerPackage.State JavaDoc state;
455         ServerRequest request;
456         org.omg.CORBA.OBJ_ADAPTER JavaDoc closed_connection_exception =
457             new org.omg.CORBA.OBJ_ADAPTER JavaDoc("connection closed: adapter inactive");
458
459         org.omg.CORBA.TRANSIENT JavaDoc transient_exception = new org.omg.CORBA.TRANSIENT JavaDoc();
460         while (!terminate)
461         {
462             state = poa.getState();
463             if (POAUtil.isActive(state))
464             {
465                 request = requestQueue.getFirst();
466
467                 /* Request available */
468                 if (request != null)
469                 {
470                     if (request.remainingPOAName() != null)
471                     {
472                         orb.getBasicAdapter().deliverRequest(request, poa);
473                         requestQueue.removeFirst();
474                     }
475                     else
476                     {
477                         try
478                         {
479                             processRequest(request);
480                             requestQueue.removeFirst();
481                         }
482                         catch (CompletionRequestedException e)
483                         {
484                             /* if waitForCompletion was called the poa
485                                state was changed to holding,
486                                discarding or inactive, the loop don't
487                                block in waitForContinue, the loop
488                                continues and will detect the changed
489                                state in the next turn (for this turn
490                                the request will not processed) */

491                         }
492                         catch (ShutdownInProgressException e)
493                         {
494                             /* waitForShutdown was called */
495                             waitForQueue();
496                         }
497                         catch (org.omg.CORBA.OBJ_ADAPTER JavaDoc e)
498                         {
499                             requestQueue.removeFirst();
500                             rejectRequest(request, e);
501                         }
502                         catch (org.omg.CORBA.OBJECT_NOT_EXIST JavaDoc e)
503                         {
504                             requestQueue.removeFirst();
505                             rejectRequest(request, e);
506                         }
507                     }
508                     continue;
509                 }
510             }
511             else
512             {
513                 if (!waitForShutdownCalled && (POAUtil.isDiscarding(state) || POAUtil.isInactive(state)))
514                 {
515                     request = requestQueue.removeLast();
516
517                     /* Request available */
518                     if (request != null)
519                     {
520                         if (POAUtil.isDiscarding(state))
521                         {
522                             rejectRequest(request, transient_exception);
523                         }
524                         else
525                         {
526                             rejectRequest(request, closed_connection_exception);
527                         }
528                         continue;
529                     }
530                 }
531             }
532             /* if waitForShutdown was called the RequestController
533                loop blocks for ALL TIME in waitForQueue (the poa
534                behaves as if he is in holding state now) ATTENTION,
535                it's a lazy synchronisation, a request could be
536                rejected if waitForShutdown was called but couldn't be
537                processed (it's save)
538             */

539             waitForQueue();
540         }
541     }
542
543     /**
544      * called from external thread for synchronizing with the
545      * request controller thread,
546      * a caller waits for completion of all active requests,
547      * no new requests will started from now on
548      */

549
550     synchronized void waitForCompletion()
551     {
552         waitForCompletionCalled = true;
553
554         while (waitForCompletionCalled && !activeRequestTable.isEmpty())
555         {
556             try
557             {
558                 if (logger.isDebugEnabled())
559                     logger.debug("somebody waits for completion and there are active processors");
560                 wait();
561             }
562             catch (InterruptedException JavaDoc e)
563             {
564             }
565         }
566     }
567
568     /**
569      * called from external thread for synchronizing with the request
570      * controller thread, a caller waits for completion of all active
571      * requests on this object. No new requests on this object will be
572      * started from now on because a steady stream of incomming
573      * requests could keep the object from being deactivated, a
574      * servant may invoke recursive method calls on the object it
575      * incarnates and deactivation should not necessarily prevent
576      * those invocations.
577      */

578
579     synchronized void waitForObjectCompletion( byte[] oid )
580     {
581         ByteArrayKey oidbak = new ByteArrayKey( oid );
582
583         while (activeRequestTable.contains(oidbak))
584         {
585             try
586             {
587                 wait();
588             }
589             catch (InterruptedException JavaDoc e)
590             {
591             }
592         }
593         if (logger.isDebugEnabled())
594         {
595             logger.debug( POAUtil.convert(oid) +
596                           "all active processors for this object have finished");
597
598         }
599
600         deactivationList.addElement( oidbak );
601     }
602
603     /**
604      * blocks the request controller thread if the queue is empty,
605      * the poa is in holding state or waitForShutdown was called,
606      * if waitForShutdown was called the RequestController loop blocks for
607      * ALL TIME in this method (the poa behaves as if he is in holding state now)
608      */

609
610     private void waitForQueue()
611     {
612         synchronized (queueLog)
613         {
614             if ((requestQueue.isEmpty() || poa.isHolding() || waitForShutdownCalled) &&
615                 !terminate)
616             {
617                 try
618                 {
619                     if (logger.isDebugEnabled())
620                     {
621                         logger.debug("waiting for queue");
622                     }
623                     queueLog.wait();
624                 }
625                 catch (java.lang.InterruptedException JavaDoc e)
626                 {
627                 }
628             }
629         }
630     }
631
632     /**
633      * called from external thread for synchronizing with the
634      * request controller thread,
635      * a caller waits for completion of all active requests,
636      * no new requests will started for ALL TIME
637      */

638     synchronized void waitForShutdown()
639     {
640         waitForShutdownCalled = true;
641
642         while ((waitForShutdownCalled && ! activeRequestTable.isEmpty())
643                || (localRequests != 0)
644         )
645         {
646             try
647             {
648                 if (logger.isDebugEnabled())
649                 {
650                     logger.debug("somebody waits for shutdown and there are active processors");
651                 }
652                 wait();
653             }
654             catch (InterruptedException JavaDoc e)
655             {
656             }
657         }
658     }
659
660     synchronized void addLocalRequest()
661     {
662         localRequests++;
663     }
664
665     synchronized void removeLocalRequest()
666     {
667         localRequests--;
668         notifyAll();
669     }
670 }
671
Popular Tags