1 package org.sapia.ubik.rmi.server.command; 2 3 import org.sapia.ubik.net.PooledThread; 4 import org.sapia.ubik.net.ThreadPool; 5 import org.sapia.ubik.rmi.server.Log; 6 import org.sapia.ubik.rmi.server.ShutdownException; 7 8 9 28 public class InQueue extends ExecQueue { 29 CmdProcessorThreadPool _pool; 30 31 34 InQueue() throws Exception { 35 this(1); 36 } 37 38 43 InQueue(int maxThreads) throws Exception { 44 super(); 45 46 if (maxThreads <= 0) { 47 maxThreads = 1; 48 } 49 50 _pool = new CmdProcessorThreadPool(maxThreads); 51 _pool.fill(maxThreads); 52 53 PooledThread pt; 54 55 for (int count = 0; count < maxThreads; count++) { 56 pt = (PooledThread) _pool.acquire(); 57 pt.exec(this); 58 } 59 } 60 61 public void shutdown(long timeout) throws InterruptedException { 62 super.shutdown(timeout); 63 _pool.shutdown(timeout); 64 } 65 66 69 static class CmdProcessorThread extends PooledThread { 70 73 protected void doExec(Object task) { 74 InQueue queue = (InQueue) task; 75 76 while (true) { 77 AsyncCommand async; 78 Object toReturn; 79 80 try { 81 async = (AsyncCommand) queue.remove(); 82 83 try { 84 toReturn = async.execute(); 85 } catch (ShutdownException e) { 86 Log.warning(getName(), "Shutting down..."); 87 88 break; 89 } catch (Throwable t) { 90 toReturn = t; 91 } 92 93 OutQueue.getQueueFor(new Destination(async.getFrom(), 94 async.getCallerVmId())).add(new Response(async.getCmdId(), 95 toReturn)); 96 Thread.yield(); 97 } catch (InterruptedException e) { 98 break; 99 } 100 } 101 } 102 103 106 public void shutdown() { 107 Log.warning(getName(), "Shut down signal received..."); 108 super.shutdown(); 109 } 110 } 111 112 static class CmdProcessorThreadPool extends ThreadPool { 113 118 public CmdProcessorThreadPool(int maxSize) { 119 super("ubik.rmi.CallbackThread", true, maxSize); 120 } 121 122 125 protected PooledThread newThread() throws Exception { 126 return new CmdProcessorThread(); 127 } 128 } 129 } 130 | Popular Tags |