1 package org.sapia.ubik.rmi.server.command; 2 3 import org.sapia.ubik.net.Timer; 4 import org.sapia.ubik.rmi.server.ShutdownException; 5 6 import java.util.*; 7 8 9 20 public class OutQueue extends ExecQueue { 21 static Map _queuesByHost = Collections.synchronizedMap(new HashMap()); 22 static ResponseSender _sender = new LocalResponseSender(); 23 static OutQueueMonitor _monitor; 24 static boolean _added; 25 26 static { 27 _monitor = new OutQueueMonitor(); 28 _monitor.setName("ubik.rmi.outqueue.Monitor"); 29 _monitor.setDaemon(true); 30 _monitor.start(); 31 } 32 33 36 private OutQueue() { 37 } 38 39 44 public static void shutdownAll(long timeout) throws InterruptedException { 45 Iterator queues = _queuesByHost.values().iterator(); 46 OutQueue queue; 47 48 while (queues.hasNext()) { 49 ((OutQueue) queues.next()).shutdown(timeout); 50 } 51 52 _monitor.shutdown(timeout); 53 } 54 55 58 public final void add(Executable cmd) { 59 super.add(cmd); 60 _monitor.wakeUp(); 61 } 62 63 71 public static synchronized final OutQueue getQueueFor(Destination dest) { 72 OutQueue out = (OutQueue) _queuesByHost.get(dest); 73 74 if (out == null) { 75 out = new OutQueue(); 76 _queuesByHost.put(dest, out); 77 } 78 79 return out; 80 } 81 82 static void setResponseSender(ResponseSender s) { 83 _sender = s; 84 } 85 86 89 static final class OutQueueMonitor extends Thread { 90 Destination[] hosts; 91 OutQueue queue; 92 List resps; 93 boolean shutdown; 94 boolean shutdownRequested; 95 96 public void run() { 97 while (true) { 98 hosts = (Destination[]) _queuesByHost.keySet().toArray(new Destination[_queuesByHost.size()]); 99 100 for (int i = 0; i < hosts.length; i++) { 101 queue = (OutQueue) _queuesByHost.get(hosts[i]); 102 103 if (queue.size() > 0) { 104 try { 105 try { 106 resps = queue.removeAll(); 107 } catch (ShutdownException e) { 108 shutdownRequested = true; 109 110 continue; 111 } 112 113 if (_sender != null) { 114 _sender.sendResponses(hosts[i], resps); 115 } 116 } catch (InterruptedException e) { 117 return; 118 } 119 } 120 } 121 122 if (shutdownRequested) { 123 doNotifyShutDown(); 124 125 return; 126 } 127 128 Thread.yield(); 129 130 try { 131 waitAdded(); 132 } catch (InterruptedException e) { 133 break; 134 } 135 } 136 } 137 138 synchronized void shutdown(long timeout) throws InterruptedException { 139 Timer timer = new Timer(timeout); 140 shutdownRequested = true; 141 notify(); 142 143 while (!shutdown) { 144 wait(timeout); 145 146 if (timer.isOver()) { 147 break; 148 } 149 } 150 } 151 152 private synchronized void doNotifyShutDown() { 153 shutdown = true; 154 notifyAll(); 155 } 156 157 private synchronized void waitAdded() throws InterruptedException { 158 while (!_added && !shutdownRequested) { 159 wait(); 160 } 161 162 _added = false; 163 } 164 165 synchronized void wakeUp() { 166 _added = true; 167 notify(); 168 } 169 } 170 } 171 | Popular Tags |