KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > ReplicationQueue


1 /*
2  * JBoss, the OpenSource J2EE webOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.cache;
8
9
10 import org.apache.commons.logging.Log;
11 import org.apache.commons.logging.LogFactory;
12 import org.jboss.cache.marshall.MethodCall;
13 import org.jboss.cache.marshall.MethodDeclarations;
14
15 import java.util.ArrayList JavaDoc;
16 import java.util.LinkedList JavaDoc;
17 import java.util.List JavaDoc;
18 import java.util.Timer JavaDoc;
19 import java.util.TimerTask JavaDoc;
20
21
22 /**
23  * Periodically (or when certain size is exceeded) takes elements and replicates them.
24  *
25  * @author <a HREF="mailto:bela@jboss.org">Bela Ban</a> May 24, 2003
26  * @version $Revision: 1.13 $
27  */

28 public class ReplicationQueue
29 {
30
31    private static Log log = LogFactory.getLog(ReplicationQueue.class);
32
33    private CacheImpl cache = null;
34
35    /**
36     * We flush every 5 seconds. Inactive if -1 or 0
37     */

38    private long interval = 5000;
39
40    /**
41     * Max elements before we flush
42     */

43    private long max_elements = 500;
44
45    /**
46     * Holds the replication jobs: LinkedList<MethodCall>
47     */

48    private final List JavaDoc<MethodCall> elements = new LinkedList JavaDoc<MethodCall>();
49
50    /**
51     * For periodical replication
52     */

53    private Timer JavaDoc timer = null;
54
55    /**
56     * The timer task, only calls flush() when executed by Timer
57     */

58    private MyTask task = null;
59
60    public ReplicationQueue()
61    {
62    }
63
64    /**
65     * Constructs a new ReplicationQueue.
66     */

67    public ReplicationQueue(CacheImpl cache, long interval, long max_elements)
68    {
69       this.cache = cache;
70       this.interval = interval;
71       this.max_elements = max_elements;
72    }
73
74    /**
75     * Returns the flush interval in milliseconds.
76     */

77    public long getInterval()
78    {
79       return interval;
80    }
81
82    /**
83     * Sets the flush interval in milliseconds.
84     */

85    public void setInterval(long interval)
86    {
87       this.interval = interval;
88       stop();
89       start();
90    }
91
92    /**
93     * Returns the maximum number of elements to hold.
94     * If the maximum number is reached, flushes in the calling thread.
95     */

96    public long getMax_elements()
97    {
98       return max_elements;
99    }
100
101    /**
102     * Sets the maximum number of elements to hold.
103     */

104    public void setMax_elements(long max_elements)
105    {
106       this.max_elements = max_elements;
107    }
108
109    /**
110     * Starts the asynchronous flush queue.
111     */

112    public synchronized void start()
113    {
114       if (interval > 0)
115       {
116          if (task == null)
117             task = new MyTask();
118          if (timer == null)
119          {
120             timer = new Timer JavaDoc(true);
121             timer.schedule(task,
122                     500, // delay before initial flush
123
interval); // interval between flushes
124
}
125       }
126    }
127
128    /**
129     * Stops the asynchronous flush queue.
130     */

131    public synchronized void stop()
132    {
133       if (task != null)
134       {
135          task.cancel();
136          task = null;
137       }
138       if (timer != null)
139       {
140          timer.cancel();
141          timer = null;
142       }
143    }
144
145
146    /**
147     * Adds a new method call.
148     */

149    public void add(MethodCall job)
150    {
151       if (job == null)
152          throw new NullPointerException JavaDoc("job is null");
153       synchronized (elements)
154       {
155          elements.add(job);
156          if (elements.size() >= max_elements)
157             flush();
158       }
159    }
160
161    /**
162     * Flushes existing method calls.
163     */

164    public void flush()
165    {
166       List JavaDoc<MethodCall> l;
167       synchronized (elements)
168       {
169          if (log.isTraceEnabled())
170             log.trace("flush(): flushing repl queue (num elements=" + elements.size() + ")");
171          l = new ArrayList JavaDoc<MethodCall>(elements);
172          elements.clear();
173       }
174
175       try
176       {
177          // send to all live nodes in the cluster
178
cache.callRemoteMethods(null, MethodDeclarations.replicateAllMethod, new Object JavaDoc[]{l}, false, true, 5000);
179       }
180       catch (Throwable JavaDoc t)
181       {
182          log.error("failed replicating " + l.size() + " elements in replication queue", t);
183       }
184    }
185
186    class MyTask extends TimerTask JavaDoc
187    {
188       public void run()
189       {
190          flush();
191       }
192    }
193
194 }
195
Popular Tags