1 16 17 package org.apache.catalina.cluster.tcp; 18 19 import java.net.InetAddress ; 20 21 import org.apache.catalina.cluster.util.FastQueue; 22 import org.apache.catalina.cluster.util.LinkObject; 23 import org.apache.catalina.cluster.util.IQueue; 24 25 47 public class FastAsyncSocketSender extends DataSender { 48 49 private static int threadCounter = 1; 50 51 private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 52 .getLog(FastAsyncSocketSender.class); 53 54 57 private static final String info = "FastAsyncSocketSender/1.1"; 58 59 61 64 private FastQueue queue = new FastQueue(); 65 66 69 private FastQueueThread queueThread = null; 70 71 74 private long inQueueCounter = 0; 75 76 79 private long outQueueCounter = 0; 80 81 84 private long queuedNrOfBytes = 0; 85 86 88 97 public FastAsyncSocketSender(InetAddress host, int port) { 98 super(host, port); 99 checkThread(); 100 } 101 102 104 109 public String getInfo() { 110 111 return (info); 112 113 } 114 115 119 public long getQueueAddWaitTimeout() { 120 121 return queue.getAddWaitTimeout(); 122 } 123 124 128 public void setQueueAddWaitTimeout(long timeout) { 129 queue.setAddWaitTimeout(timeout); 130 } 131 132 136 public long getQueueRemoveWaitTimeout() { 137 return queue.getRemoveWaitTimeout(); 138 } 139 140 144 public void setRemoveWaitTimeout(long timeout) { 145 queue.setRemoveWaitTimeout(timeout); 146 } 147 148 151 public boolean isQueueCheckLock() { 152 return queue.isCheckLock(); 153 } 154 157 public void setQueueCheckLock(boolean checkLock) { 158 queue.setCheckLock(checkLock); 159 } 160 163 public boolean isQueueDoStats() { 164 return queue.isDoStats(); 165 } 166 169 public void setQueueDoStats(boolean doStats) { 170 queue.setDoStats(doStats); 171 } 172 175 public boolean isQueueTimeWait() { 176 return queue.isTimeWait(); 177 } 178 181 public void setQueueTimeWait(boolean timeWait) { 182 queue.setTimeWait(timeWait); 183 } 184 185 188 public int getMaxQueueLength() { 189 return queue.getMaxQueueLength(); 190 } 191 192 196 public void setMaxQueueLength(int length) { 197 queue.setMaxQueueLength(length); 198 } 199 200 203 public long getQueueAddWaitTime() { 204 return queue.getAddWait(); 205 } 206 207 210 public long getQueueRemoveWaitTime() { 211 return queue.getRemoveWait(); 212 } 213 214 217 public long getInQueueCounter() { 218 return inQueueCounter; 219 } 220 221 224 public long getOutQueueCounter() { 225 return outQueueCounter; 226 } 227 228 231 public int getQueueSize() { 232 return queue.getSize(); 233 } 234 235 238 public long getQueuedNrOfBytes() { 239 return queuedNrOfBytes; 240 } 241 242 244 249 public void connect() throws java.io.IOException { 250 super.connect(); 251 checkThread(); 252 queue.start() ; 253 } 254 255 260 public void disconnect() { 261 stopThread(); 262 queue.stop() ; super.disconnect(); 264 } 265 266 272 public synchronized void sendMessage(String messageid, byte[] data) 273 throws java.io.IOException { 274 queue.add(messageid, data); 275 inQueueCounter++; 276 queuedNrOfBytes += data.length; 277 if (log.isTraceEnabled()) 278 log.trace(sm.getString("AsyncSocketSender.queue.message", 279 getAddress().getHostAddress(), new Integer (getPort()), messageid, new Long ( 280 data.length))); 281 } 282 283 286 public synchronized void resetStatistics() { 287 super.resetStatistics(); 288 inQueueCounter = queue.getSize(); 289 outQueueCounter = 0; 290 queue.resetStatistics(); 291 } 292 293 296 public String toString() { 297 StringBuffer buf = new StringBuffer ("FastAsyncSocketSender["); 298 buf.append(getAddress().getHostAddress()).append(":").append(getPort()).append("]"); 299 return buf.toString(); 300 } 301 302 304 307 protected void checkThread() { 308 if (queueThread == null) { 309 if (log.isInfoEnabled()) 310 log.info(sm.getString("AsyncSocketSender.create.thread", 311 getAddress(), new Integer (getPort()))); 312 queueThread = new FastQueueThread(this, queue); 313 queueThread.setDaemon(true); 314 queueThread.start(); 315 } 316 } 317 318 321 protected void stopThread() { 322 if (queueThread != null) { 323 queueThread.stopRunning(); 324 queueThread = null; 325 } 326 } 327 328 331 protected void reduceQueuedCounter(int size) { 332 queuedNrOfBytes -= size; 333 } 334 335 337 private class FastQueueThread extends Thread { 338 339 340 343 private IQueue queue = null; 344 345 348 private FastAsyncSocketSender sender = null; 349 350 353 private boolean keepRunning = true; 354 355 360 private FastQueueThread(FastAsyncSocketSender sender, IQueue queue) { 361 setName("Cluster-FastAsyncSocketSender-" + (threadCounter++)); 362 this.queue = queue; 363 this.sender = sender; 364 } 365 366 public void stopRunning() { 367 keepRunning = false; 368 } 369 370 373 public void run() { 374 while (keepRunning) { 375 LinkObject entry = queue.remove(); 377 if (entry != null) { 378 do { 379 int messagesize = 0; 380 try { 381 byte[] data = (byte[]) entry.data(); 382 messagesize = data.length; 383 sender.pushMessage((String ) entry.getKey(), data); 384 outQueueCounter++; 385 } catch (Exception x) { 386 log.warn(sm.getString( 387 "AsyncSocketSender.send.error", entry 388 .getKey()),x); 389 } finally { 390 reduceQueuedCounter(messagesize); 391 } 392 entry = entry.next(); 393 } while (entry != null); 394 } else { 395 log.error(sm.getString("AsyncSocketSender.queue.empty",sender.getAddress(), new Integer (sender.getPort()))); 396 } 397 } 398 } 399 400 } 401 402 } | Popular Tags |