KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > catalina > cluster > tcp > AsyncSocketSender


1 /*
2  * Copyright 1999,2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5  * use this file except in compliance with the License. You may obtain a copy of
6  * the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations under
14  * the License.
15  */

16
17 package org.apache.catalina.cluster.tcp;
18
19 import java.net.InetAddress JavaDoc;
20
21 import org.apache.catalina.cluster.util.SmartQueue;
22
23 /**
24  * Send cluster messages from a Message queue with only one socket. Ack and keep
25  * Alive Handling is supported.
26  * <ul>
27  * <li>With autoConnect=false at ReplicationTransmitter, you can disconnect the
28  * sender and all messages are queued. Only use this for small maintaince
29  * isuses!</li>
30  * <li>waitForAck=true, means that receiver ack the transfer</li>
31  * <li>after one minute idle time, or number of request (100) the connection is
32  * reconnected with next request. Change this for production use!</li>
33  * <li>default ackTimeout is 15 sec: this is very low for big all session replication messages after restart a node</li>
34  * <li>disable keepAlive: keepAliveTimeout="-1" and keepAliveMaxRequestCount="-1"</li>
35  * </ul>
36  *
37  * @author Filip Hanik
38  * @author Peter Rossbach
39  * @version $Revision: 1.11 $ $Date: 2005/03/14 21:24:30 $
40  */

41 public class AsyncSocketSender extends DataSender {
42     
43     private static int threadCounter = 1;
44
45     private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
46             .getLog(AsyncSocketSender.class);
47
48     /**
49      * The descriptive information about this implementation.
50      */

51     private static final String JavaDoc info = "AsyncSocketSender/1.2";
52
53     // ----------------------------------------------------- Instance Variables
54

55     /**
56      * Message Queue
57      */

58     private SmartQueue queue = new SmartQueue();
59
60     /**
61      * Active thread to push messages asynchronous to the other replication node
62      */

63     private QueueThread queueThread = null;
64
65     /**
66      * Count number of queue message
67      */

68     private long inQueueCounter = 0;
69
70     /**
71      * Count all successfull push messages from queue
72      */

73     private long outQueueCounter = 0;
74
75     /**
76      * Current number of bytes from all queued messages
77      */

78     private long queuedNrOfBytes = 0;
79
80     // ------------------------------------------------------------- Constructor
81

82     /**
83      * start background thread to push incomming cluster messages to replication
84      * node
85      *
86      * @param host replication node tcp address
87      * @param port replication node tcp port
88      */

89     public AsyncSocketSender(InetAddress JavaDoc host, int port) {
90         super(host, port);
91         checkThread();
92     }
93
94     // ------------------------------------------------------------- Properties
95

96     /**
97      * Return descriptive information about this implementation and the
98      * corresponding version number, in the format
99      * <code>&lt;description&gt;/&lt;version&gt;</code>.
100      */

101     public String JavaDoc getInfo() {
102
103         return (info);
104
105     }
106
107     /**
108      * @return Returns the inQueueCounter.
109      */

110     public long getInQueueCounter() {
111         return inQueueCounter;
112     }
113
114     /**
115      * @return Returns the outQueueCounter.
116      */

117     public long getOutQueueCounter() {
118         return outQueueCounter;
119     }
120
121     /**
122      * @return Returns the queueSize.
123      */

124     public int getQueueSize() {
125         return queue.size();
126     }
127
128     /**
129      * @return Returns the queuedNrOfBytes.
130      */

131     public long getQueuedNrOfBytes() {
132         return queuedNrOfBytes;
133     }
134
135     // --------------------------------------------------------- Public Methods
136

137     /*
138      * Connect to socket and start background thread to ppush queued messages
139      *
140      * @see org.apache.catalina.cluster.tcp.IDataSender#connect()
141      */

142     public void connect() throws java.io.IOException JavaDoc {
143         super.connect();
144         checkThread();
145     }
146
147     /**
148      * Disconnect socket ad stop queue thread
149      *
150      * @see org.apache.catalina.cluster.tcp.IDataSender#disconnect()
151      */

152     public void disconnect() {
153         stopThread();
154         super.disconnect();
155     }
156
157     /*
158      * Send message to queue for later sending
159      *
160      * @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(java.lang.String,
161      * byte[])
162      */

163     public synchronized void sendMessage(String JavaDoc messageid, byte[] data)
164             throws java.io.IOException JavaDoc {
165         SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(messageid, data);
166         queue.add(entry);
167         inQueueCounter++;
168         queuedNrOfBytes += data.length;
169         if (log.isTraceEnabled())
170             log.trace(sm.getString("AsyncSocketSender.queue.message",
171                     getAddress().getHostAddress(), new Integer JavaDoc(getPort()), messageid, new Long JavaDoc(
172                             data.length)));
173     }
174
175     /*
176      * Reset sender statistics
177      */

178     public synchronized void resetStatistics() {
179         super.resetStatistics();
180         inQueueCounter = queue.size();
181         outQueueCounter = 0;
182
183     }
184
185     /**
186      * Name of this SockerSender
187      */

188     public String JavaDoc toString() {
189         StringBuffer JavaDoc buf = new StringBuffer JavaDoc("AsyncSocketSender[");
190         buf.append(getAddress().getHostAddress()).append(":").append(getPort()).append("]");
191         return buf.toString();
192     }
193
194     // --------------------------------------------------------- Public Methods
195

196     /**
197      * Start Queue thread as daemon
198      */

199     protected void checkThread() {
200         if (queueThread == null) {
201             if (log.isInfoEnabled())
202                 log.info(sm.getString("AsyncSocketSender.create.thread",
203                         getAddress(), new Integer JavaDoc(getPort())));
204             queueThread = new QueueThread(this);
205             queueThread.setDaemon(true);
206             queueThread.start();
207         }
208     }
209
210     /**
211      * stop queue worker thread
212      */

213     protected void stopThread() {
214         if (queueThread != null) {
215             queueThread.stopRunning();
216             queueThread = null;
217         }
218     }
219
220     /*
221      * Reduce queued message date size counter
222      */

223     protected void reduceQueuedCounter(int size) {
224         queuedNrOfBytes -= size;
225     }
226
227     // -------------------------------------------------------- Inner Class
228

229     private class QueueThread extends Thread JavaDoc {
230         AsyncSocketSender sender;
231
232         private boolean keepRunning = true;
233
234         public QueueThread(AsyncSocketSender sender) {
235             this.sender = sender;
236             setName("Cluster-AsyncSocketSender-" + (threadCounter++));
237         }
238
239         public void stopRunning() {
240             keepRunning = false;
241         }
242
243         /**
244          * Get one queued message and push it to the replication node
245          *
246          * @see DataSender#pushMessage(String, byte[])
247          */

248         public void run() {
249             while (keepRunning) {
250                 SmartQueue.SmartEntry entry = sender.queue.remove(5000);
251                 if (entry != null) {
252                     int messagesize = 0;
253                     try {
254                         byte[] data = (byte[]) entry.getValue();
255                         messagesize = data.length;
256                         sender.pushMessage((String JavaDoc) entry.getKey(), data);
257                         outQueueCounter++;
258                     } catch (Exception JavaDoc x) {
259                         log.warn(sm.getString("AsyncSocketSender.send.error",
260                                 entry.getKey()));
261                     } finally {
262                         reduceQueuedCounter(messagesize);
263                     }
264                 }
265             }
266         }
267     }
268 }
Popular Tags