1 16 17 package org.apache.catalina.cluster.tcp; 18 19 import java.io.IOException ; 20 import java.net.InetAddress ; 21 import java.net.Socket ; 22 import java.net.SocketException ; 23 24 import org.apache.catalina.util.StringManager; 25 26 34 public class DataSender implements IDataSender { 35 36 private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 37 .getLog(DataSender.class); 38 39 42 protected static StringManager sm = StringManager 43 .getManager(Constants.Package); 44 45 47 50 private static final String info = "DataSender/1.2"; 51 52 private InetAddress address; 53 54 private int port; 55 56 private Socket sc = null; 57 58 private boolean isSocketConnected = false; 59 60 private boolean suspect; 61 62 private long ackTimeout; 63 64 protected long nrOfRequests = 0; 65 66 protected long totalBytes = 0; 67 68 protected long connectCounter = 0; 69 70 protected long disconnectCounter = 0; 71 72 protected long missingAckCounter = 0; 73 74 protected long dataResendCounter = 0; 75 76 79 protected boolean doProcessingStats = false; 80 81 84 protected long processingTime = 0; 85 86 89 protected long minProcessingTime = Long.MAX_VALUE ; 90 91 94 protected long maxProcessingTime = 0; 95 96 99 private long keepAliveTimeout = 60 * 1000; 100 101 104 private int keepAliveMaxRequestCount = 100; 105 106 109 private long keepAliveConnectTime = 0; 110 111 114 private int keepAliveCount = 0; 115 116 private boolean waitForAck = true; 117 118 private int socketCloseCounter; 119 120 private int socketOpenCounter; 121 122 124 public DataSender(InetAddress host, int port) { 125 this.address = host; 126 this.port = port; 127 if (log.isInfoEnabled()) 128 log.info(sm.getString("IDataSender.create", address, new Integer ( 129 port))); 130 } 131 132 134 139 public String getInfo() { 140 141 return (info); 142 143 } 144 145 148 public long getNrOfRequests() { 149 return nrOfRequests; 150 } 151 152 155 public long getTotalBytes() { 156 return totalBytes; 157 } 158 159 162 public long getAvgProcessingTime() { 163 return processingTime / nrOfRequests; 164 } 165 166 169 public long getMaxProcessingTime() { 170 return maxProcessingTime; 171 } 172 173 176 public long getMinProcessingTime() { 177 return minProcessingTime; 178 } 179 180 183 public long getProcessingTime() { 184 return processingTime; 185 } 186 187 190 public boolean isDoProcessingStats() { 191 return doProcessingStats; 192 } 193 196 public void setDoProcessingStats(boolean doProcessingStats) { 197 this.doProcessingStats = doProcessingStats; 198 } 199 200 203 public long getConnectCounter() { 204 return connectCounter; 205 } 206 207 210 public long getDisconnectCounter() { 211 return disconnectCounter; 212 } 213 214 217 public long getMissingAckCounter() { 218 return missingAckCounter; 219 } 220 221 224 public int getSocketOpenCounter() { 225 return socketOpenCounter; 226 } 227 228 231 public int getSocketCloseCounter() { 232 return socketCloseCounter; 233 } 234 235 238 public long getDataResendCounter() { 239 return dataResendCounter; 240 } 241 242 public InetAddress getAddress() { 243 return address; 244 } 245 246 public int getPort() { 247 return port; 248 } 249 250 public boolean isConnected() { 251 return isSocketConnected; 252 } 253 254 258 protected void setSocketConnected(boolean isSocketConnected) { 259 this.isSocketConnected = isSocketConnected; 260 } 261 262 public boolean isSuspect() { 263 return suspect; 264 } 265 266 public boolean getSuspect() { 267 return suspect; 268 } 269 270 public void setSuspect(boolean suspect) { 271 this.suspect = suspect; 272 } 273 274 public long getAckTimeout() { 275 return ackTimeout; 276 } 277 278 public void setAckTimeout(long ackTimeout) { 279 this.ackTimeout = ackTimeout; 280 } 281 282 public long getKeepAliveTimeout() { 283 return keepAliveTimeout; 284 } 285 286 public void setKeepAliveTimeout(long keepAliveTimeout) { 287 this.keepAliveTimeout = keepAliveTimeout; 288 } 289 290 public int getKeepAliveMaxRequestCount() { 291 return keepAliveMaxRequestCount; 292 } 293 294 public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) { 295 this.keepAliveMaxRequestCount = keepAliveMaxRequestCount; 296 } 297 298 301 public long getKeepAliveConnectTime() { 302 return keepAliveConnectTime; 303 } 304 305 308 public int getKeepAliveCount() { 309 return keepAliveCount; 310 } 311 312 315 public boolean isWaitForAck() { 316 return waitForAck; 317 } 318 319 323 public void setWaitForAck(boolean waitForAck) { 324 this.waitForAck = waitForAck; 325 } 326 327 329 public void connect() throws java.io.IOException { 330 connectCounter++; 331 if (log.isDebugEnabled()) 332 log.debug(sm.getString("IDataSender.connect", address.getHostAddress(), 333 new Integer (port))); 334 openSocket(); 335 } 336 337 338 344 public void disconnect() { 345 disconnectCounter++; 346 if (log.isDebugEnabled()) 347 log.debug(sm.getString("IDataSender.disconnect", address.getHostAddress(), 348 new Integer (port))); 349 closeSocket(); 350 } 351 352 359 public boolean checkIfCloseSocket() { 360 boolean isCloseSocket = true ; 361 long ctime = System.currentTimeMillis() - this.keepAliveConnectTime; 362 if ((keepAliveTimeout > -1 && ctime > this.keepAliveTimeout) 363 || (keepAliveMaxRequestCount > -1 && this.keepAliveCount >= this.keepAliveMaxRequestCount)) { 364 closeSocket(); 365 } else 366 isCloseSocket = false ; 367 return isCloseSocket; 368 } 369 370 376 public synchronized void sendMessage(String messageid, byte[] data) 377 throws java.io.IOException { 378 pushMessage(messageid, data); 379 } 380 381 384 public synchronized void resetStatistics() { 385 nrOfRequests = 0; 386 totalBytes = 0; 387 disconnectCounter = 0; 388 connectCounter = isConnected() ? 1 : 0; 389 missingAckCounter = 0; 390 dataResendCounter = 0; 391 socketOpenCounter =isConnected() ? 1 : 0; 392 socketCloseCounter = 0; 393 processingTime = 0 ; 394 minProcessingTime = Long.MAX_VALUE ; 395 maxProcessingTime = 0 ; 396 } 397 398 401 public String toString() { 402 StringBuffer buf = new StringBuffer ("DataSender["); 403 buf.append(getAddress()).append(":").append(getPort()).append("]"); 404 return buf.toString(); 405 } 406 407 410 414 protected void openSocket() throws IOException , SocketException { 415 socketOpenCounter++; 416 if (log.isDebugEnabled()) 417 log.debug(sm.getString("IDataSender.openSocket", address.getHostAddress(), new Integer ( 418 port))); 419 sc = new Socket (getAddress(), getPort()); 420 if (isWaitForAck()) 421 sc.setSoTimeout((int) ackTimeout); 422 isSocketConnected = true; 423 this.keepAliveCount = 0; 424 this.keepAliveConnectTime = System.currentTimeMillis(); 425 } 426 427 433 protected void closeSocket() { 434 if(isSocketConnected) { 435 socketCloseCounter++; 436 if (log.isDebugEnabled()) 437 log.debug(sm.getString("IDataSender.closeSocket", 438 address.getHostAddress(), new Integer (port))); 439 try { 440 sc.close(); 441 } catch (Exception x) { 442 } 443 isSocketConnected = false; 444 } 445 } 446 447 452 protected void addStats(int length) { 453 nrOfRequests++; 454 totalBytes += length; 455 if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) { 456 log.debug(sm.getString("IDataSender.stats", new Object [] { 457 getAddress().getHostAddress(), new Integer (getPort()), 458 new Long (totalBytes), new Long (nrOfRequests), 459 new Long (totalBytes / nrOfRequests), 460 new Long (getProcessingTime()), 461 new Long (getAvgProcessingTime())})); 462 } 463 } 464 465 protected void addProcessingStats(long startTime) { 466 long time = System.currentTimeMillis() - startTime ; 467 if(time < minProcessingTime) 468 minProcessingTime = time ; 469 if( time > maxProcessingTime) 470 maxProcessingTime = time ; 471 processingTime += time ; 472 } 473 474 483 protected synchronized void pushMessage(String messageid, byte[] data) 484 throws java.io.IOException { 485 long time = 0 ; 486 if(doProcessingStats) { 487 time = System.currentTimeMillis(); 488 } 489 checkIfCloseSocket(); 490 if (!isConnected()) 491 openSocket(); 492 try { 493 sc.getOutputStream().write(data); 494 sc.getOutputStream().flush(); 495 if (isWaitForAck()) 496 waitForAck(ackTimeout); 497 } catch (java.io.IOException x) { 498 dataResendCounter++; 500 if (log.isTraceEnabled()) 501 log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(), 502 new Integer (port))); 503 closeSocket(); 504 openSocket(); 505 sc.getOutputStream().write(data); 506 sc.getOutputStream().flush(); 507 if (isWaitForAck()) 508 waitForAck(ackTimeout); 509 } 510 this.keepAliveCount++; 511 checkIfCloseSocket(); 512 if(doProcessingStats) { 513 addProcessingStats(time); 514 } 515 addStats(data.length); 516 if (log.isTraceEnabled()) 517 log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(), 518 new Integer (port), messageid, new Long (data.length))); 519 520 } 521 522 528 protected void waitForAck(long timeout) throws java.io.IOException { 529 try { 530 int i = sc.getInputStream().read(); 531 while ((i != -1) && (i != 3)) { 532 i = sc.getInputStream().read(); 533 } 534 } catch (java.net.SocketTimeoutException x) { 535 missingAckCounter++; 536 log.warn(sm.getString("IDataSender.missing.ack", getAddress(), 537 new Integer (getPort()), new Long (this.ackTimeout))); 538 throw x; 539 } 540 } 541 } | Popular Tags |