KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > catalina > cluster > tcp > DataSender


1 /*
2  * Copyright 1999,2005 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5  * use this file except in compliance with the License. You may obtain a copy of
6  * the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations under
14  * the License.
15  */

16
17 package org.apache.catalina.cluster.tcp;
18
19 import java.io.IOException JavaDoc;
20 import java.net.InetAddress JavaDoc;
21 import java.net.Socket JavaDoc;
22 import java.net.SocketException JavaDoc;
23
24 import org.apache.catalina.util.StringManager;
25
26 /**
27  * Send cluster messages with only one socket. Ack and keep Alive Handling is
28  * supported
29  *
30  * @author Peter Rossbach
31  * @author Filip Hanik
32  * @version $Revision: 1.4 $ $Date: 2005/03/25 22:06:10 $
33  */

34 public class DataSender implements IDataSender {
35
36     private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
37             .getLog(DataSender.class);
38
39     /**
40      * The string manager for this package.
41      */

42     protected static StringManager sm = StringManager
43             .getManager(Constants.Package);
44
45     // ----------------------------------------------------- Instance Variables
46

47     /**
48      * The descriptive information about this implementation.
49      */

50     private static final String JavaDoc info = "DataSender/1.2";
51
52     private InetAddress JavaDoc address;
53
54     private int port;
55
56     private Socket JavaDoc sc = null;
57
58     private boolean isSocketConnected = false;
59
60     private boolean suspect;
61
62     private long ackTimeout;
63
64     protected long nrOfRequests = 0;
65
66     protected long totalBytes = 0;
67
68     protected long connectCounter = 0;
69
70     protected long disconnectCounter = 0;
71
72     protected long missingAckCounter = 0;
73
74     protected long dataResendCounter = 0;
75
76     /**
77      * doProcessingStats
78      */

79     protected boolean doProcessingStats = false;
80
81     /**
82      * proessingTime
83      */

84     protected long processingTime = 0;
85     
86     /**
87      * min proessingTime
88      */

89     protected long minProcessingTime = Long.MAX_VALUE ;
90
91     /**
92      * max proessingTime
93      */

94     protected long maxProcessingTime = 0;
95    
96     /**
97      * keep socket open for no more than one min
98      */

99     private long keepAliveTimeout = 60 * 1000;
100
101     /**
102      * max 100 requests before reconnecting
103      */

104     private int keepAliveMaxRequestCount = 100;
105
106     /**
107      * Last connect timestamp
108      */

109     private long keepAliveConnectTime = 0;
110
111     /**
112      * keepalive counter
113      */

114     private int keepAliveCount = 0;
115
116     private boolean waitForAck = true;
117
118     private int socketCloseCounter;
119
120     private int socketOpenCounter;
121
122     // ------------------------------------------------------------- Constructor
123

124     public DataSender(InetAddress JavaDoc host, int port) {
125         this.address = host;
126         this.port = port;
127         if (log.isInfoEnabled())
128             log.info(sm.getString("IDataSender.create", address, new Integer JavaDoc(
129                     port)));
130     }
131
132     // ------------------------------------------------------------- Properties
133

134     /**
135      * Return descriptive information about this implementation and the
136      * corresponding version number, in the format
137      * <code>&lt;description&gt;/&lt;version&gt;</code>.
138      */

139     public String JavaDoc getInfo() {
140
141         return (info);
142
143     }
144
145     /**
146      * @return Returns the nrOfRequests.
147      */

148     public long getNrOfRequests() {
149         return nrOfRequests;
150     }
151
152     /**
153      * @return Returns the totalBytes.
154      */

155     public long getTotalBytes() {
156         return totalBytes;
157     }
158
159     /**
160      * @return Returns the avg processingTime/nrOfRequests.
161      */

162     public long getAvgProcessingTime() {
163         return processingTime / nrOfRequests;
164     }
165  
166     /**
167      * @return Returns the maxProcessingTime.
168      */

169     public long getMaxProcessingTime() {
170         return maxProcessingTime;
171     }
172     
173     /**
174      * @return Returns the minProcessingTime.
175      */

176     public long getMinProcessingTime() {
177         return minProcessingTime;
178     }
179     
180     /**
181      * @return Returns the processingTime.
182      */

183     public long getProcessingTime() {
184         return processingTime;
185     }
186     
187     /**
188      * @return Returns the doProcessingStats.
189      */

190     public boolean isDoProcessingStats() {
191         return doProcessingStats;
192     }
193     /**
194      * @param doProcessingStats The doProcessingStats to set.
195      */

196     public void setDoProcessingStats(boolean doProcessingStats) {
197         this.doProcessingStats = doProcessingStats;
198     }
199  
200     /**
201      * @return Returns the connectCounter.
202      */

203     public long getConnectCounter() {
204         return connectCounter;
205     }
206
207     /**
208      * @return Returns the disconnectCounter.
209      */

210     public long getDisconnectCounter() {
211         return disconnectCounter;
212     }
213
214     /**
215      * @return Returns the missingAckCounter.
216      */

217     public long getMissingAckCounter() {
218         return missingAckCounter;
219     }
220
221     /**
222      * @return Returns the socketOpenCounter.
223      */

224     public int getSocketOpenCounter() {
225         return socketOpenCounter;
226     }
227     
228     /**
229      * @return Returns the socketCloseCounter.
230      */

231     public int getSocketCloseCounter() {
232         return socketCloseCounter;
233     }
234
235     /**
236      * @return Returns the dataResendCounter.
237      */

238     public long getDataResendCounter() {
239         return dataResendCounter;
240     }
241
242     public InetAddress JavaDoc getAddress() {
243         return address;
244     }
245
246     public int getPort() {
247         return port;
248     }
249
250     public boolean isConnected() {
251         return isSocketConnected;
252     }
253
254     /**
255      * @param isSocketConnected
256      * The isSocketConnected to set.
257      */

258     protected void setSocketConnected(boolean isSocketConnected) {
259         this.isSocketConnected = isSocketConnected;
260     }
261
262     public boolean isSuspect() {
263         return suspect;
264     }
265
266     public boolean getSuspect() {
267         return suspect;
268     }
269
270     public void setSuspect(boolean suspect) {
271         this.suspect = suspect;
272     }
273
274     public long getAckTimeout() {
275         return ackTimeout;
276     }
277
278     public void setAckTimeout(long ackTimeout) {
279         this.ackTimeout = ackTimeout;
280     }
281
282     public long getKeepAliveTimeout() {
283         return keepAliveTimeout;
284     }
285
286     public void setKeepAliveTimeout(long keepAliveTimeout) {
287         this.keepAliveTimeout = keepAliveTimeout;
288     }
289
290     public int getKeepAliveMaxRequestCount() {
291         return keepAliveMaxRequestCount;
292     }
293
294     public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
295         this.keepAliveMaxRequestCount = keepAliveMaxRequestCount;
296     }
297
298     /**
299      * @return Returns the keepAliveConnectTime.
300      */

301     public long getKeepAliveConnectTime() {
302         return keepAliveConnectTime;
303     }
304
305     /**
306      * @return Returns the keepAliveCount.
307      */

308     public int getKeepAliveCount() {
309         return keepAliveCount;
310     }
311
312     /**
313      * @return Returns the waitForAck.
314      */

315     public boolean isWaitForAck() {
316         return waitForAck;
317     }
318
319     /**
320      * @param waitForAck
321      * The waitForAck to set.
322      */

323     public void setWaitForAck(boolean waitForAck) {
324         this.waitForAck = waitForAck;
325     }
326
327     // --------------------------------------------------------- Public Methods
328

329     public void connect() throws java.io.IOException JavaDoc {
330         connectCounter++;
331         if (log.isDebugEnabled())
332             log.debug(sm.getString("IDataSender.connect", address.getHostAddress(),
333                     new Integer JavaDoc(port)));
334         openSocket();
335     }
336
337  
338     /**
339      * close socket
340      *
341      * @see org.apache.catalina.cluster.tcp.IDataSender#disconnect()
342      * @see DataSender#closeSocket()
343      */

344     public void disconnect() {
345         disconnectCounter++;
346         if (log.isDebugEnabled())
347             log.debug(sm.getString("IDataSender.disconnect", address.getHostAddress(),
348                     new Integer JavaDoc(port)));
349         closeSocket();
350     }
351
352     /**
353      * Check, if time to close socket! Important for AsyncSocketSender that
354      * replication thread is not fork again! <b>Only work when keepAliveTimeout
355      * or keepAliveMaxRequestCount greater -1 </b>
356      * @return true, is socket close
357      * @see DataSender#closeSocket()
358      */

359     public boolean checkIfCloseSocket() {
360         boolean isCloseSocket = true ;
361         long ctime = System.currentTimeMillis() - this.keepAliveConnectTime;
362         if ((keepAliveTimeout > -1 && ctime > this.keepAliveTimeout)
363                 || (keepAliveMaxRequestCount > -1 && this.keepAliveCount >= this.keepAliveMaxRequestCount)) {
364             closeSocket();
365         } else
366             isCloseSocket = false ;
367         return isCloseSocket;
368     }
369
370     /*
371      * Send message
372      *
373      * @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(java.lang.String,
374      * byte[])
375      */

376     public synchronized void sendMessage(String JavaDoc messageid, byte[] data)
377             throws java.io.IOException JavaDoc {
378         pushMessage(messageid, data);
379     }
380
381     /*
382      * Reset sender statistics
383      */

384     public synchronized void resetStatistics() {
385         nrOfRequests = 0;
386         totalBytes = 0;
387         disconnectCounter = 0;
388         connectCounter = isConnected() ? 1 : 0;
389         missingAckCounter = 0;
390         dataResendCounter = 0;
391         socketOpenCounter =isConnected() ? 1 : 0;
392         socketCloseCounter = 0;
393         processingTime = 0 ;
394         minProcessingTime = Long.MAX_VALUE ;
395         maxProcessingTime = 0 ;
396     }
397
398     /**
399      * Name of this SockerSender
400      */

401     public String JavaDoc toString() {
402         StringBuffer JavaDoc buf = new StringBuffer JavaDoc("DataSender[");
403         buf.append(getAddress()).append(":").append(getPort()).append("]");
404         return buf.toString();
405     }
406
407     // --------------------------------------------------------- Protected
408
// Methods
409

410     /**
411      * @throws IOException
412      * @throws SocketException
413      */

414     protected void openSocket() throws IOException JavaDoc, SocketException JavaDoc {
415         socketOpenCounter++;
416         if (log.isDebugEnabled())
417             log.debug(sm.getString("IDataSender.openSocket", address.getHostAddress(), new Integer JavaDoc(
418                     port)));
419         sc = new Socket JavaDoc(getAddress(), getPort());
420         if (isWaitForAck())
421             sc.setSoTimeout((int) ackTimeout);
422         isSocketConnected = true;
423         this.keepAliveCount = 0;
424         this.keepAliveConnectTime = System.currentTimeMillis();
425     }
426
427     /**
428      * close socket
429      *
430      * @see DataSender#disconnect()
431      * @see DataSender#checkIfCloseSocket()
432      */

433     protected void closeSocket() {
434         if(isSocketConnected) {
435             socketCloseCounter++;
436             if (log.isDebugEnabled())
437                 log.debug(sm.getString("IDataSender.closeSocket",
438                         address.getHostAddress(), new Integer JavaDoc(port)));
439             try {
440                 sc.close();
441             } catch (Exception JavaDoc x) {
442             }
443             isSocketConnected = false;
444         }
445     }
446
447     /**
448      * Add statistic for this socket instance
449      *
450      * @param length
451      */

452     protected void addStats(int length) {
453         nrOfRequests++;
454         totalBytes += length;
455         if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
456             log.debug(sm.getString("IDataSender.stats", new Object JavaDoc[] {
457                     getAddress().getHostAddress(), new Integer JavaDoc(getPort()),
458                     new Long JavaDoc(totalBytes), new Long JavaDoc(nrOfRequests),
459                     new Long JavaDoc(totalBytes / nrOfRequests),
460                     new Long JavaDoc(getProcessingTime()),
461                     new Long JavaDoc(getAvgProcessingTime())}));
462         }
463     }
464
465     protected void addProcessingStats(long startTime) {
466         long time = System.currentTimeMillis() - startTime ;
467         if(time < minProcessingTime)
468             minProcessingTime = time ;
469         if( time > maxProcessingTime)
470             maxProcessingTime = time ;
471         processingTime += time ;
472     }
473     
474     /**
475      * push messages with only one socket at a time
476      *
477      * @param messageid
478      * unique message id
479      * @param data
480      * data to send
481      * @throws java.io.IOException
482      */

483     protected synchronized void pushMessage(String JavaDoc messageid, byte[] data)
484             throws java.io.IOException JavaDoc {
485         long time = 0 ;
486         if(doProcessingStats) {
487             time = System.currentTimeMillis();
488         }
489         checkIfCloseSocket();
490         if (!isConnected())
491             openSocket();
492         try {
493             sc.getOutputStream().write(data);
494             sc.getOutputStream().flush();
495             if (isWaitForAck())
496                 waitForAck(ackTimeout);
497         } catch (java.io.IOException JavaDoc x) {
498             // second try with fresh connection
499
dataResendCounter++;
500             if (log.isTraceEnabled())
501                 log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(),
502                         new Integer JavaDoc(port)));
503             closeSocket();
504             openSocket();
505             sc.getOutputStream().write(data);
506             sc.getOutputStream().flush();
507             if (isWaitForAck())
508                 waitForAck(ackTimeout);
509         }
510         this.keepAliveCount++;
511         checkIfCloseSocket();
512         if(doProcessingStats) {
513             addProcessingStats(time);
514         }
515         addStats(data.length);
516         if (log.isTraceEnabled())
517             log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(),
518                     new Integer JavaDoc(port), messageid, new Long JavaDoc(data.length)));
519
520     }
521
522     /**
523      * Wait for Acknowledgement from other server
524      *
525      * @param timeout
526      * @throws java.io.IOException
527      */

528     protected void waitForAck(long timeout) throws java.io.IOException JavaDoc {
529         try {
530             int i = sc.getInputStream().read();
531             while ((i != -1) && (i != 3)) {
532                 i = sc.getInputStream().read();
533             }
534         } catch (java.net.SocketTimeoutException JavaDoc x) {
535             missingAckCounter++;
536             log.warn(sm.getString("IDataSender.missing.ack", getAddress(),
537                     new Integer JavaDoc(getPort()), new Long JavaDoc(this.ackTimeout)));
538             throw x;
539         }
540     }
541 }
Popular Tags