1 16 17 package org.apache.catalina.cluster.tcp; 18 19 import java.net.InetAddress ; 20 21 import org.apache.catalina.cluster.util.SmartQueue; 22 23 41 public class AsyncSocketSender extends DataSender { 42 43 private static int threadCounter = 1; 44 45 private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 46 .getLog(AsyncSocketSender.class); 47 48 51 private static final String info = "AsyncSocketSender/1.2"; 52 53 55 58 private SmartQueue queue = new SmartQueue(); 59 60 63 private QueueThread queueThread = null; 64 65 68 private long inQueueCounter = 0; 69 70 73 private long outQueueCounter = 0; 74 75 78 private long queuedNrOfBytes = 0; 79 80 82 89 public AsyncSocketSender(InetAddress host, int port) { 90 super(host, port); 91 checkThread(); 92 } 93 94 96 101 public String getInfo() { 102 103 return (info); 104 105 } 106 107 110 public long getInQueueCounter() { 111 return inQueueCounter; 112 } 113 114 117 public long getOutQueueCounter() { 118 return outQueueCounter; 119 } 120 121 124 public int getQueueSize() { 125 return queue.size(); 126 } 127 128 131 public long getQueuedNrOfBytes() { 132 return queuedNrOfBytes; 133 } 134 135 137 142 public void connect() throws java.io.IOException { 143 super.connect(); 144 checkThread(); 145 } 146 147 152 public void disconnect() { 153 stopThread(); 154 super.disconnect(); 155 } 156 157 163 public synchronized void sendMessage(String messageid, byte[] data) 164 throws java.io.IOException { 165 SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(messageid, data); 166 queue.add(entry); 167 inQueueCounter++; 168 queuedNrOfBytes += data.length; 169 if (log.isTraceEnabled()) 170 log.trace(sm.getString("AsyncSocketSender.queue.message", 171 getAddress().getHostAddress(), new Integer (getPort()), messageid, new Long ( 172 data.length))); 173 } 174 175 178 public synchronized void resetStatistics() { 179 super.resetStatistics(); 180 inQueueCounter = queue.size(); 181 outQueueCounter = 0; 182 183 } 184 185 188 public String toString() { 189 StringBuffer buf = new StringBuffer ("AsyncSocketSender["); 190 buf.append(getAddress().getHostAddress()).append(":").append(getPort()).append("]"); 191 return buf.toString(); 192 } 193 194 196 199 protected void checkThread() { 200 if (queueThread == null) { 201 if (log.isInfoEnabled()) 202 log.info(sm.getString("AsyncSocketSender.create.thread", 203 getAddress(), new Integer (getPort()))); 204 queueThread = new QueueThread(this); 205 queueThread.setDaemon(true); 206 queueThread.start(); 207 } 208 } 209 210 213 protected void stopThread() { 214 if (queueThread != null) { 215 queueThread.stopRunning(); 216 queueThread = null; 217 } 218 } 219 220 223 protected void reduceQueuedCounter(int size) { 224 queuedNrOfBytes -= size; 225 } 226 227 229 private class QueueThread extends Thread { 230 AsyncSocketSender sender; 231 232 private boolean keepRunning = true; 233 234 public QueueThread(AsyncSocketSender sender) { 235 this.sender = sender; 236 setName("Cluster-AsyncSocketSender-" + (threadCounter++)); 237 } 238 239 public void stopRunning() { 240 keepRunning = false; 241 } 242 243 248 public void run() { 249 while (keepRunning) { 250 SmartQueue.SmartEntry entry = sender.queue.remove(5000); 251 if (entry != null) { 252 int messagesize = 0; 253 try { 254 byte[] data = (byte[]) entry.getValue(); 255 messagesize = data.length; 256 sender.pushMessage((String ) entry.getKey(), data); 257 outQueueCounter++; 258 } catch (Exception x) { 259 log.warn(sm.getString("AsyncSocketSender.send.error", 260 entry.getKey())); 261 } finally { 262 reduceQueuedCounter(messagesize); 263 } 264 } 265 } 266 } 267 } 268 } | Popular Tags |