1 16 17 package org.apache.catalina.cluster.tcp; 18 19 import java.io.IOException ; 20 import java.net.InetAddress ; 21 import java.util.LinkedList ; 22 23 32 33 public class PooledSocketSender extends DataSender { 34 35 private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 36 .getLog(org.apache.catalina.cluster.tcp.PooledSocketSender.class); 37 38 41 private static final String info = "PooledSocketSender/1.2"; 42 43 45 private int maxPoolSocketLimit = 25; 46 47 private SenderQueue senderQueue = null; 48 49 51 public PooledSocketSender(InetAddress host, int port) { 52 super(host, port); 53 senderQueue = new SenderQueue(this, maxPoolSocketLimit); 54 } 55 56 58 63 public String getInfo() { 64 65 return (info); 66 67 } 68 69 public void setMaxPoolSocketLimit(int limit) { 70 maxPoolSocketLimit = limit; 71 senderQueue.setLimit(limit); 72 } 73 74 public int getMaxPoolSocketLimit() { 75 return maxPoolSocketLimit; 76 } 77 78 public int getInPoolSize() { 79 return senderQueue.getInPoolSize(); 80 } 81 82 public int getInUsePoolSize() { 83 return senderQueue.getInUsePoolSize(); 84 } 85 86 88 public void connect() throws java.io.IOException { 89 senderQueue.open(); 91 setSocketConnected(true); 92 connectCounter++; 93 } 94 95 public void disconnect() { 96 senderQueue.close(); 97 setSocketConnected(false); 98 disconnectCounter++; 99 } 100 101 108 public void sendMessage(String messageId, byte[] data) throws IOException { 109 SocketSender sender = senderQueue.getSender(0); 111 if (sender == null) { 112 log.warn(sm.getString("PoolSocketSender.noMoreSender", this 113 .getAddress(), new Integer (this.getPort()))); 114 return; 115 } 116 try { 118 sender.sendMessage(messageId, data); 119 } finally { 120 senderQueue.returnSender(sender); 122 } 123 addStats(data.length); 124 } 125 126 public String toString() { 127 StringBuffer buf = new StringBuffer ("PooledSocketSender["); 128 buf.append(getAddress()).append(":").append(getPort()).append("]"); 129 return buf.toString(); 130 } 131 132 134 private class SenderQueue { 135 private int limit = 25; 136 137 PooledSocketSender parent = null; 138 139 private LinkedList queue = new LinkedList (); 140 141 private LinkedList inuse = new LinkedList (); 142 143 private Object mutex = new Object (); 144 145 private boolean isOpen = true; 146 147 public SenderQueue(PooledSocketSender parent, int limit) { 148 this.limit = limit; 149 this.parent = parent; 150 } 151 152 155 public int getLimit() { 156 return limit; 157 } 158 161 public void setLimit(int limit) { 162 this.limit = limit; 163 } 164 167 public int getInUsePoolSize() { 168 return inuse.size(); 169 } 170 171 174 public int getInPoolSize() { 175 return queue.size(); 176 } 177 178 public SocketSender getSender(long timeout) { 179 SocketSender sender = null; 180 long start = System.currentTimeMillis(); 181 long delta = 0; 182 do { 183 synchronized (mutex) { 184 if (!isOpen) 185 throw new IllegalStateException ( 186 "Socket pool is closed."); 187 if (queue.size() > 0) { 188 sender = (SocketSender) queue.removeFirst(); 189 } else if (inuse.size() < limit) { 190 sender = getNewSocketSender(); 191 } else { 192 try { 193 mutex.wait(timeout); 194 } catch (Exception x) { 195 PooledSocketSender.log 196 .warn( 197 sm 198 .getString( 199 "PoolSocketSender.senderQueue.sender.failed", 200 parent.getAddress(), 201 new Integer (parent 202 .getPort())), 203 x); 204 } } if (sender != null) { 207 inuse.add(sender); 208 } 209 } delta = System.currentTimeMillis() - start; 211 } while ((isOpen) && (sender == null) 212 && (timeout == 0 ? true : (delta < timeout))); 213 return sender; 215 } 216 217 public void returnSender(SocketSender sender) { 218 synchronized (mutex) { 220 queue.add(sender); 221 inuse.remove(sender); 222 mutex.notify(); 223 } 224 } 225 226 private SocketSender getNewSocketSender() { 227 SocketSender sender = new SocketSender(parent.getAddress(), parent 229 .getPort()); 230 sender.setKeepAliveMaxRequestCount(parent 231 .getKeepAliveMaxRequestCount()); 232 sender.setKeepAliveTimeout(parent.getKeepAliveTimeout()); 233 sender.setAckTimeout(parent.getAckTimeout()); 234 sender.setWaitForAck(parent.isWaitForAck()); 235 return sender; 236 237 } 238 239 public void close() { 240 synchronized (mutex) { 241 for (int i = 0; i < queue.size(); i++) { 242 SocketSender sender = (SocketSender) queue.get(i); 243 sender.disconnect(); 244 } for (int i = 0; i < inuse.size(); i++) { 246 SocketSender sender = (SocketSender) inuse.get(i); 247 sender.disconnect(); 248 } queue.clear(); 250 inuse.clear(); 251 isOpen = false; 252 mutex.notifyAll(); 253 } 254 } 255 256 public void open() { 257 synchronized (mutex) { 258 isOpen = true; 259 mutex.notifyAll(); 260 } 261 } 262 } 263 } | Popular Tags |