|                                                                                                              1
 16
 17  package org.apache.catalina.cluster.tcp;
 18
 19  import java.io.IOException
  ; 20  import java.net.InetAddress
  ; 21  import java.net.Socket
  ; 22  import java.net.SocketException
  ; 23
 24  import org.apache.catalina.util.StringManager;
 25
 26
 34  public class DataSender implements IDataSender {
 35
 36      private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
 37              .getLog(DataSender.class);
 38
 39
 42      protected static StringManager sm = StringManager
 43              .getManager(Constants.Package);
 44
 45
 47
 50      private static final String
  info = "DataSender/1.2"; 51
 52      private InetAddress
  address; 53
 54      private int port;
 55
 56      private Socket
  sc = null; 57
 58      private boolean isSocketConnected = false;
 59
 60      private boolean suspect;
 61
 62      private long ackTimeout;
 63
 64      protected long nrOfRequests = 0;
 65
 66      protected long totalBytes = 0;
 67
 68      protected long connectCounter = 0;
 69
 70      protected long disconnectCounter = 0;
 71
 72      protected long missingAckCounter = 0;
 73
 74      protected long dataResendCounter = 0;
 75
 76
 79      protected boolean doProcessingStats = false;
 80
 81
 84      protected long processingTime = 0;
 85
 86
 89      protected long minProcessingTime = Long.MAX_VALUE ;
 90
 91
 94      protected long maxProcessingTime = 0;
 95
 96
 99      private long keepAliveTimeout = 60 * 1000;
 100
 101
 104     private int keepAliveMaxRequestCount = 100;
 105
 106
 109     private long keepAliveConnectTime = 0;
 110
 111
 114     private int keepAliveCount = 0;
 115
 116     private boolean waitForAck = true;
 117
 118     private int socketCloseCounter;
 119
 120     private int socketOpenCounter;
 121
 122
 124     public DataSender(InetAddress
  host, int port) { 125         this.address = host;
 126         this.port = port;
 127         if (log.isInfoEnabled())
 128             log.info(sm.getString("IDataSender.create", address, new Integer
  ( 129                     port)));
 130     }
 131
 132
 134
 139     public String
  getInfo() { 140
 141         return (info);
 142
 143     }
 144
 145
 148     public long getNrOfRequests() {
 149         return nrOfRequests;
 150     }
 151
 152
 155     public long getTotalBytes() {
 156         return totalBytes;
 157     }
 158
 159
 162     public long getAvgProcessingTime() {
 163         return processingTime / nrOfRequests;
 164     }
 165
 166
 169     public long getMaxProcessingTime() {
 170         return maxProcessingTime;
 171     }
 172
 173
 176     public long getMinProcessingTime() {
 177         return minProcessingTime;
 178     }
 179
 180
 183     public long getProcessingTime() {
 184         return processingTime;
 185     }
 186
 187
 190     public boolean isDoProcessingStats() {
 191         return doProcessingStats;
 192     }
 193
 196     public void setDoProcessingStats(boolean doProcessingStats) {
 197         this.doProcessingStats = doProcessingStats;
 198     }
 199
 200
 203     public long getConnectCounter() {
 204         return connectCounter;
 205     }
 206
 207
 210     public long getDisconnectCounter() {
 211         return disconnectCounter;
 212     }
 213
 214
 217     public long getMissingAckCounter() {
 218         return missingAckCounter;
 219     }
 220
 221
 224     public int getSocketOpenCounter() {
 225         return socketOpenCounter;
 226     }
 227
 228
 231     public int getSocketCloseCounter() {
 232         return socketCloseCounter;
 233     }
 234
 235
 238     public long getDataResendCounter() {
 239         return dataResendCounter;
 240     }
 241
 242     public InetAddress
  getAddress() { 243         return address;
 244     }
 245
 246     public int getPort() {
 247         return port;
 248     }
 249
 250     public boolean isConnected() {
 251         return isSocketConnected;
 252     }
 253
 254
 258     protected void setSocketConnected(boolean isSocketConnected) {
 259         this.isSocketConnected = isSocketConnected;
 260     }
 261
 262     public boolean isSuspect() {
 263         return suspect;
 264     }
 265
 266     public boolean getSuspect() {
 267         return suspect;
 268     }
 269
 270     public void setSuspect(boolean suspect) {
 271         this.suspect = suspect;
 272     }
 273
 274     public long getAckTimeout() {
 275         return ackTimeout;
 276     }
 277
 278     public void setAckTimeout(long ackTimeout) {
 279         this.ackTimeout = ackTimeout;
 280     }
 281
 282     public long getKeepAliveTimeout() {
 283         return keepAliveTimeout;
 284     }
 285
 286     public void setKeepAliveTimeout(long keepAliveTimeout) {
 287         this.keepAliveTimeout = keepAliveTimeout;
 288     }
 289
 290     public int getKeepAliveMaxRequestCount() {
 291         return keepAliveMaxRequestCount;
 292     }
 293
 294     public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
 295         this.keepAliveMaxRequestCount = keepAliveMaxRequestCount;
 296     }
 297
 298
 301     public long getKeepAliveConnectTime() {
 302         return keepAliveConnectTime;
 303     }
 304
 305
 308     public int getKeepAliveCount() {
 309         return keepAliveCount;
 310     }
 311
 312
 315     public boolean isWaitForAck() {
 316         return waitForAck;
 317     }
 318
 319
 323     public void setWaitForAck(boolean waitForAck) {
 324         this.waitForAck = waitForAck;
 325     }
 326
 327
 329     public void connect() throws java.io.IOException
  { 330         connectCounter++;
 331         if (log.isDebugEnabled())
 332             log.debug(sm.getString("IDataSender.connect", address.getHostAddress(),
 333                     new Integer
  (port))); 334         openSocket();
 335     }
 336
 337
 338
 344     public void disconnect() {
 345         disconnectCounter++;
 346         if (log.isDebugEnabled())
 347             log.debug(sm.getString("IDataSender.disconnect", address.getHostAddress(),
 348                     new Integer
  (port))); 349         closeSocket();
 350     }
 351
 352
 359     public boolean checkIfCloseSocket() {
 360         boolean isCloseSocket = true ;
 361         long ctime = System.currentTimeMillis() - this.keepAliveConnectTime;
 362         if ((keepAliveTimeout > -1 && ctime > this.keepAliveTimeout)
 363                 || (keepAliveMaxRequestCount > -1 && this.keepAliveCount >= this.keepAliveMaxRequestCount)) {
 364             closeSocket();
 365         } else
 366             isCloseSocket = false ;
 367         return isCloseSocket;
 368     }
 369
 370
 376     public synchronized void sendMessage(String
  messageid, byte[] data) 377             throws java.io.IOException
  { 378         pushMessage(messageid, data);
 379     }
 380
 381
 384     public synchronized void resetStatistics() {
 385         nrOfRequests = 0;
 386         totalBytes = 0;
 387         disconnectCounter = 0;
 388         connectCounter = isConnected() ? 1 : 0;
 389         missingAckCounter = 0;
 390         dataResendCounter = 0;
 391         socketOpenCounter =isConnected() ? 1 : 0;
 392         socketCloseCounter = 0;
 393         processingTime = 0 ;
 394         minProcessingTime = Long.MAX_VALUE ;
 395         maxProcessingTime = 0 ;
 396     }
 397
 398
 401     public String
  toString() { 402         StringBuffer
  buf = new StringBuffer  ("DataSender["); 403         buf.append(getAddress()).append(":").append(getPort()).append("]");
 404         return buf.toString();
 405     }
 406
 407
 410
 414     protected void openSocket() throws IOException
  , SocketException  { 415         socketOpenCounter++;
 416         if (log.isDebugEnabled())
 417             log.debug(sm.getString("IDataSender.openSocket", address.getHostAddress(), new Integer
  ( 418                     port)));
 419         sc = new Socket
  (getAddress(), getPort()); 420         if (isWaitForAck())
 421             sc.setSoTimeout((int) ackTimeout);
 422         isSocketConnected = true;
 423         this.keepAliveCount = 0;
 424         this.keepAliveConnectTime = System.currentTimeMillis();
 425     }
 426
 427
 433     protected void closeSocket() {
 434         if(isSocketConnected) {
 435             socketCloseCounter++;
 436             if (log.isDebugEnabled())
 437                 log.debug(sm.getString("IDataSender.closeSocket",
 438                         address.getHostAddress(), new Integer
  (port))); 439             try {
 440                 sc.close();
 441             } catch (Exception
  x) { 442             }
 443             isSocketConnected = false;
 444         }
 445     }
 446
 447
 452     protected void addStats(int length) {
 453         nrOfRequests++;
 454         totalBytes += length;
 455         if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
 456             log.debug(sm.getString("IDataSender.stats", new Object
  [] { 457                     getAddress().getHostAddress(), new Integer
  (getPort()), 458                     new Long
  (totalBytes), new Long  (nrOfRequests), 459                     new Long
  (totalBytes / nrOfRequests), 460                     new Long
  (getProcessingTime()), 461                     new Long
  (getAvgProcessingTime())})); 462         }
 463     }
 464
 465     protected void addProcessingStats(long startTime) {
 466         long time = System.currentTimeMillis() - startTime ;
 467         if(time < minProcessingTime)
 468             minProcessingTime = time ;
 469         if( time > maxProcessingTime)
 470             maxProcessingTime = time ;
 471         processingTime += time ;
 472     }
 473
 474
 483     protected synchronized void pushMessage(String
  messageid, byte[] data) 484             throws java.io.IOException
  { 485         long time = 0 ;
 486         if(doProcessingStats) {
 487             time = System.currentTimeMillis();
 488         }
 489         checkIfCloseSocket();
 490         if (!isConnected())
 491             openSocket();
 492         try {
 493             sc.getOutputStream().write(data);
 494             sc.getOutputStream().flush();
 495             if (isWaitForAck())
 496                 waitForAck(ackTimeout);
 497         } catch (java.io.IOException
  x) { 498                         dataResendCounter++;
 500             if (log.isTraceEnabled())
 501                 log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(),
 502                         new Integer
  (port))); 503             closeSocket();
 504             openSocket();
 505             sc.getOutputStream().write(data);
 506             sc.getOutputStream().flush();
 507             if (isWaitForAck())
 508                 waitForAck(ackTimeout);
 509         }
 510         this.keepAliveCount++;
 511         checkIfCloseSocket();
 512         if(doProcessingStats) {
 513             addProcessingStats(time);
 514         }
 515         addStats(data.length);
 516         if (log.isTraceEnabled())
 517             log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(),
 518                     new Integer
  (port), messageid, new Long  (data.length))); 519
 520     }
 521
 522
 528     protected void waitForAck(long timeout) throws java.io.IOException
  { 529         try {
 530             int i = sc.getInputStream().read();
 531             while ((i != -1) && (i != 3)) {
 532                 i = sc.getInputStream().read();
 533             }
 534         } catch (java.net.SocketTimeoutException
  x) { 535             missingAckCounter++;
 536             log.warn(sm.getString("IDataSender.missing.ack", getAddress(),
 537                     new Integer
  (getPort()), new Long  (this.ackTimeout))); 538             throw x;
 539         }
 540     }
 541 }
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |