KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > catalina > cluster > tcp > FastAsyncSocketSender


1 /*
2  * Copyright 1999,2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5  * use this file except in compliance with the License. You may obtain a copy of
6  * the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations under
14  * the License.
15  */

16
17 package org.apache.catalina.cluster.tcp;
18
19 import java.net.InetAddress JavaDoc;
20
21 import org.apache.catalina.cluster.util.FastQueue;
22 import org.apache.catalina.cluster.util.LinkObject;
23 import org.apache.catalina.cluster.util.IQueue;
24
25 /**
26  * Send cluster messages from a Message queue with only one socket. Ack and keep
27  * Alive Handling is supported. Fast Queue can limit queue size and consume all messages at queue at one block.<br/>
28  * Limit the queue lock contention under high load!<br/>
29  * <ul>
30  * <li>With autoConnect=false at ReplicationTransmitter, you can disconnect the
31  * sender and all messages are queued. Only use this for small maintaince
32  * isuses!</li>
33  * <li>waitForAck=true, means that receiver ack the transfer</li>
34  * <li>after one minute idle time, or number of request (100) the connection is
35  * reconnected with next request. Change this for production use!</li>
36  * <li>default ackTimeout is 15 sec: this is very low for big all session
37  * replication messages after restart a node</li>
38  * <li>disable keepAlive: keepAliveTimeout="-1" and
39  * keepAliveMaxRequestCount="-1"</li>
40  * <li>maxQueueLength: Limit the sender queue length (membership goes well, but transfer is failure!!)</li>
41  * </ul>
42  * FIXME: refactor code duplications with AsyncSocketSender => configurable or extract super class
43  * @author Peter Rossbach ( idea comes form Rainer Jung)
44  * @version $Revision: 1.2 $ $Date: 2005/03/25 22:07:20 $
45  * @since 5.5.9
46  */

47 public class FastAsyncSocketSender extends DataSender {
48
49     private static int threadCounter = 1;
50
51     private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
52             .getLog(FastAsyncSocketSender.class);
53
54     /**
55      * The descriptive information about this implementation.
56      */

57     private static final String JavaDoc info = "FastAsyncSocketSender/1.1";
58
59     // ----------------------------------------------------- Instance Variables
60

61     /**
62      * Message Queue
63      */

64     private FastQueue queue = new FastQueue();
65
66     /**
67      * Active thread to push messages asynchronous to the other replication node
68      */

69     private FastQueueThread queueThread = null;
70
71     /**
72      * Count number of queue message
73      */

74     private long inQueueCounter = 0;
75
76     /**
77      * Count all successfull push messages from queue
78      */

79     private long outQueueCounter = 0;
80
81     /**
82      * Current number of bytes from all queued messages
83      */

84     private long queuedNrOfBytes = 0;
85
86     // ------------------------------------------------------------- Constructor
87

88     /**
89      * start background thread to push incomming cluster messages to replication
90      * node
91      *
92      * @param host
93      * replication node tcp address
94      * @param port
95      * replication node tcp port
96      */

97     public FastAsyncSocketSender(InetAddress JavaDoc host, int port) {
98         super(host, port);
99         checkThread();
100     }
101
102     // ------------------------------------------------------------- Properties
103

104     /**
105      * Return descriptive information about this implementation and the
106      * corresponding version number, in the format
107      * <code>&lt;description&gt;/&lt;version&gt;</code>.
108      */

109     public String JavaDoc getInfo() {
110
111         return (info);
112
113     }
114  
115     /**
116      * get current add wait timeout
117      * @return current wait timeout
118      */

119     public long getQueueAddWaitTimeout() {
120         
121         return queue.getAddWaitTimeout();
122     }
123
124     /**
125      * Set add wait timeout (default 10000 msec)
126      * @param timeout
127      */

128     public void setQueueAddWaitTimeout(long timeout) {
129         queue.setAddWaitTimeout(timeout);
130     }
131
132     /**
133      * get current remove wait timeout
134      * @return
135      */

136     public long getQueueRemoveWaitTimeout() {
137         return queue.getRemoveWaitTimeout();
138     }
139
140     /**
141      * set remove wait timeout ( default 30000 msec)
142      * @param timeout
143      */

144     public void setRemoveWaitTimeout(long timeout) {
145         queue.setRemoveWaitTimeout(timeout);
146     }
147
148     /**
149      * @return Returns the checkLock.
150      */

151     public boolean isQueueCheckLock() {
152         return queue.isCheckLock();
153     }
154     /**
155      * @param checkLock The checkLock to set.
156      */

157     public void setQueueCheckLock(boolean checkLock) {
158         queue.setCheckLock(checkLock);
159     }
160     /**
161      * @return Returns the doStats.
162      */

163     public boolean isQueueDoStats() {
164         return queue.isDoStats();
165     }
166     /**
167      * @param doStats The doStats to set.
168      */

169     public void setQueueDoStats(boolean doStats) {
170         queue.setDoStats(doStats);
171     }
172     /**
173      * @return Returns the timeWait.
174      */

175     public boolean isQueueTimeWait() {
176         return queue.isTimeWait();
177     }
178     /**
179      * @param timeWait The timeWait to set.
180      */

181     public void setQueueTimeWait(boolean timeWait) {
182         queue.setTimeWait(timeWait);
183     }
184         
185     /**
186      * @return Returns the inQueueCounter.
187      */

188     public int getMaxQueueLength() {
189         return queue.getMaxQueueLength();
190     }
191
192     /**
193      * @param set max
194      * Queue length
195      */

196     public void setMaxQueueLength(int length) {
197         queue.setMaxQueueLength(length);
198     }
199
200     /**
201      * @return Returns the add wait times.
202      */

203     public long getQueueAddWaitTime() {
204         return queue.getAddWait();
205     }
206
207     /**
208      * @return Returns the add wait times.
209      */

210     public long getQueueRemoveWaitTime() {
211         return queue.getRemoveWait();
212     }
213
214     /**
215      * @return Returns the inQueueCounter.
216      */

217     public long getInQueueCounter() {
218         return inQueueCounter;
219     }
220
221     /**
222      * @return Returns the outQueueCounter.
223      */

224     public long getOutQueueCounter() {
225         return outQueueCounter;
226     }
227
228     /**
229      * @return Returns the queueSize.
230      */

231     public int getQueueSize() {
232         return queue.getSize();
233     }
234
235     /**
236      * @return Returns the queuedNrOfBytes.
237      */

238     public long getQueuedNrOfBytes() {
239         return queuedNrOfBytes;
240     }
241
242     // --------------------------------------------------------- Public Methods
243

244     /*
245      * Connect to socket and start background thread to ppush queued messages
246      *
247      * @see org.apache.catalina.cluster.tcp.IDataSender#connect()
248      */

249     public void connect() throws java.io.IOException JavaDoc {
250         super.connect();
251         checkThread();
252         queue.start() ;
253     }
254
255     /**
256      * Disconnect socket ad stop queue thread
257      *
258      * @see org.apache.catalina.cluster.tcp.IDataSender#disconnect()
259      */

260     public void disconnect() {
261         stopThread();
262         queue.stop() ; // FIXME what is when message in queue => auto reconnect after one sending failure?
263
super.disconnect();
264     }
265
266     /*
267      * Send message to queue for later sending
268      *
269      * @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(java.lang.String,
270      * byte[])
271      */

272     public synchronized void sendMessage(String JavaDoc messageid, byte[] data)
273             throws java.io.IOException JavaDoc {
274         queue.add(messageid, data);
275         inQueueCounter++;
276         queuedNrOfBytes += data.length;
277         if (log.isTraceEnabled())
278             log.trace(sm.getString("AsyncSocketSender.queue.message",
279                     getAddress().getHostAddress(), new Integer JavaDoc(getPort()), messageid, new Long JavaDoc(
280                             data.length)));
281     }
282
283     /*
284      * Reset sender statistics
285      */

286     public synchronized void resetStatistics() {
287         super.resetStatistics();
288         inQueueCounter = queue.getSize();
289         outQueueCounter = 0;
290         queue.resetStatistics();
291     }
292
293     /**
294      * Name of this SockerSender
295      */

296     public String JavaDoc toString() {
297         StringBuffer JavaDoc buf = new StringBuffer JavaDoc("FastAsyncSocketSender[");
298         buf.append(getAddress().getHostAddress()).append(":").append(getPort()).append("]");
299         return buf.toString();
300     }
301
302     // --------------------------------------------------------- Public Methods
303

304     /**
305      * Start Queue thread as daemon
306      */

307     protected void checkThread() {
308         if (queueThread == null) {
309             if (log.isInfoEnabled())
310                 log.info(sm.getString("AsyncSocketSender.create.thread",
311                         getAddress(), new Integer JavaDoc(getPort())));
312             queueThread = new FastQueueThread(this, queue);
313             queueThread.setDaemon(true);
314             queueThread.start();
315         }
316     }
317
318     /**
319      * stop queue worker thread
320      */

321     protected void stopThread() {
322         if (queueThread != null) {
323             queueThread.stopRunning();
324             queueThread = null;
325         }
326     }
327
328     /*
329      * Reduce queued message date size counter
330      */

331     protected void reduceQueuedCounter(int size) {
332         queuedNrOfBytes -= size;
333     }
334
335     // -------------------------------------------------------- Inner Class
336

337     private class FastQueueThread extends Thread JavaDoc {
338
339         
340         /**
341          * Sender queue
342          */

343         private IQueue queue = null;
344
345         /**
346          * Active sender
347          */

348         private FastAsyncSocketSender sender = null;
349
350         /**
351          * Thread is active
352          */

353         private boolean keepRunning = true;
354
355         /**
356          * Only use inside FastAsyncSocketSender
357          * @param sender
358          * @param queue
359          */

360         private FastQueueThread(FastAsyncSocketSender sender, IQueue queue) {
361             setName("Cluster-FastAsyncSocketSender-" + (threadCounter++));
362             this.queue = queue;
363             this.sender = sender;
364         }
365
366         public void stopRunning() {
367             keepRunning = false;
368         }
369
370         /* Get the objects from queue and send all mesages to the sender.
371          * @see java.lang.Runnable#run()
372          */

373         public void run() {
374             while (keepRunning) {
375                 // get a link list of all queued objects
376
LinkObject entry = queue.remove();
377                 if (entry != null) {
378                     do {
379                         int messagesize = 0;
380                         try {
381                             byte[] data = (byte[]) entry.data();
382                             messagesize = data.length;
383                             sender.pushMessage((String JavaDoc) entry.getKey(), data);
384                             outQueueCounter++;
385                         } catch (Exception JavaDoc x) {
386                             log.warn(sm.getString(
387                                     "AsyncSocketSender.send.error", entry
388                                             .getKey()),x);
389                         } finally {
390                             reduceQueuedCounter(messagesize);
391                         }
392                         entry = entry.next();
393                     } while (entry != null);
394                 } else {
395                     log.error(sm.getString("AsyncSocketSender.queue.empty",sender.getAddress(), new Integer JavaDoc(sender.getPort())));
396                 }
397             }
398         }
399
400     }
401
402 }
Popular Tags