1 package org.sapia.ubik.net.nio.dispatcher; 2 3 import java.io.IOException ; 4 5 import org.sapia.ubik.net.Pool; 6 import org.sapia.ubik.net.nio.Cycle; 7 import org.sapia.ubik.net.nio.Dispatcher; 8 9 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 10 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 11 12 29 public class RWPDispatcher implements Dispatcher { 30 31 private PooledExecutor _read; 32 private PooledExecutor _process; 33 private PooledExecutor _write; 34 private CycleCommandPool _pool = new CycleCommandPool(); 35 36 public RWPDispatcher(){ 37 _read = new PooledExecutor(new LinkedQueue()); 38 _write = new PooledExecutor(new LinkedQueue()); 39 _process = new PooledExecutor(new LinkedQueue()); 40 } 41 42 public RWPDispatcher(PooledExecutor read, 43 PooledExecutor write, 44 PooledExecutor process){ 45 _read = read; 46 _write = write; 47 _process = process; 48 } 49 50 53 public synchronized void dispatch(Cycle cycle) throws IOException { 54 switch(cycle.state()){ 55 case Cycle.STATE_READ: 56 try { 57 CycleCommand command = ((CycleCommand)_pool.acquire()).init(cycle); 58 _read.execute(command); 59 _pool.release(command); 60 } catch(Exception err) { 61 cycle.error(err); 62 cycle.next(); 63 } 64 break; 65 case Cycle.STATE_PROCESS: 66 try { 67 CycleCommand command = ((CycleCommand)_pool.acquire()).init(cycle); 68 _process.execute(command); 69 _pool.release(command); 70 } catch(Exception err) { 71 cycle.error(err); 72 cycle.next(); 73 } 74 break; 75 case Cycle.STATE_WRITE: 76 try { 77 CycleCommand command = ((CycleCommand)_pool.acquire()).init(cycle); 78 _write.execute(command); 79 _pool.release(command); 80 } catch(Exception err) { 81 cycle.error(err); 82 cycle.next(); 83 } 84 break; 85 default: 86 throw new IllegalStateException ( 87 "Expected STATE_READ, STATE_WRITE, STATE_PROCESS; got: " 88 + cycle.state()); 89 } 90 } 91 92 95 public void close() { 96 _read.shutdownAfterProcessingCurrentlyQueuedTasks(); 97 _process.shutdownAfterProcessingCurrentlyQueuedTasks(); 98 _write.shutdownAfterProcessingCurrentlyQueuedTasks(); 99 } 100 101 static final class CycleCommandPool extends Pool{ 102 103 106 protected Object doNewObject() throws Exception { 107 return new CycleCommand(); 108 } 109 } 110 111 } 112 | Popular Tags |