KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > sapia > ubik > rmi > server > command > OutQueue


1 package org.sapia.ubik.rmi.server.command;
2
3 import org.sapia.ubik.net.Timer;
4 import org.sapia.ubik.rmi.server.ShutdownException;
5
6 import java.util.*;
7
8
9 /**
10  * This queue stores outgoing <code>Response</code> objects until
11  * they are processed.
12  *
13  * @author Yanick Duchesne
14  * <dl>
15  * <dt><b>Copyright:</b><dd>Copyright &#169; 2002-2003 <a HREF="http://www.sapia-oss.org">Sapia Open Source Software</a>. All Rights Reserved.</dd></dt>
16  * <dt><b>License:</b><dd>Read the license.txt file of the jar or visit the
17  * <a HREF="http://www.sapia-oss.org/license.html">license page</a> at the Sapia OSS web site</dd></dt>
18  * </dl>
19  */

20 public class OutQueue extends ExecQueue {
21   static Map _queuesByHost = Collections.synchronizedMap(new HashMap());
22   static ResponseSender _sender = new LocalResponseSender();
23   static OutQueueMonitor _monitor;
24   static boolean _added;
25
26   static {
27     _monitor = new OutQueueMonitor();
28     _monitor.setName("ubik.rmi.outqueue.Monitor");
29     _monitor.setDaemon(true);
30     _monitor.start();
31   }
32
33   /**
34    * Constructor for OutQueue.
35    */

36   private OutQueue() {
37   }
38
39   /**
40    * Shuts down all statically kept <code>OutQueue</code> instances.
41    *
42    * @param timeout a shutdown timeout - in millis.
43    */

44   public static void shutdownAll(long timeout) throws InterruptedException JavaDoc {
45     Iterator queues = _queuesByHost.values().iterator();
46     OutQueue queue;
47
48     while (queues.hasNext()) {
49       ((OutQueue) queues.next()).shutdown(timeout);
50     }
51
52     _monitor.shutdown(timeout);
53   }
54
55   /**
56    * @see org.sapia.ubik.rmi.server.command.ExecQueue#add(Executable)
57    */

58   public final void add(Executable cmd) {
59     super.add(cmd);
60     _monitor.wakeUp();
61   }
62
63   /**
64    * Returns the <code>OutQueue</code> instance corresponding to the specific hosts.
65    * <code>Response</code> objects are indeed kept on a per-host basis, so all responses
66    * corresponding to a given host are sent at once to the latter, in the same trip - this
67    * eventually spares remote calls.
68    *
69    * @return an <code>OutQueue</code> for the given <code>Destination</code>.
70    */

71   public static synchronized final OutQueue getQueueFor(Destination dest) {
72     OutQueue out = (OutQueue) _queuesByHost.get(dest);
73
74     if (out == null) {
75       out = new OutQueue();
76       _queuesByHost.put(dest, out);
77     }
78
79     return out;
80   }
81
82   static void setResponseSender(ResponseSender s) {
83     _sender = s;
84   }
85
86   /*////////////////////////////////////////////////////////////////////
87                              INNER CLASSES
88   ////////////////////////////////////////////////////////////////////*/

89   static final class OutQueueMonitor extends Thread JavaDoc {
90     Destination[] hosts;
91     OutQueue queue;
92     List resps;
93     boolean shutdown;
94     boolean shutdownRequested;
95
96     public void run() {
97       while (true) {
98         hosts = (Destination[]) _queuesByHost.keySet().toArray(new Destination[_queuesByHost.size()]);
99
100         for (int i = 0; i < hosts.length; i++) {
101           queue = (OutQueue) _queuesByHost.get(hosts[i]);
102
103           if (queue.size() > 0) {
104             try {
105               try {
106                 resps = queue.removeAll();
107               } catch (ShutdownException e) {
108                 shutdownRequested = true;
109
110                 continue;
111               }
112
113               if (_sender != null) {
114                 _sender.sendResponses(hosts[i], resps);
115               }
116             } catch (InterruptedException JavaDoc e) {
117               return;
118             }
119           }
120         }
121
122         if (shutdownRequested) {
123           doNotifyShutDown();
124
125           return;
126         }
127
128         Thread.yield();
129
130         try {
131           waitAdded();
132         } catch (InterruptedException JavaDoc e) {
133           break;
134         }
135       }
136     }
137
138     synchronized void shutdown(long timeout) throws InterruptedException JavaDoc {
139       Timer timer = new Timer(timeout);
140       shutdownRequested = true;
141       notify();
142
143       while (!shutdown) {
144         wait(timeout);
145
146         if (timer.isOver()) {
147           break;
148         }
149       }
150     }
151
152     private synchronized void doNotifyShutDown() {
153       shutdown = true;
154       notifyAll();
155     }
156
157     private synchronized void waitAdded() throws InterruptedException JavaDoc {
158       while (!_added && !shutdownRequested) {
159         wait();
160       }
161
162       _added = false;
163     }
164
165     synchronized void wakeUp() {
166       _added = true;
167       notify();
168     }
169   }
170 }
171
Popular Tags