1 22 package org.xsocket.stream.io.impl; 23 24 25 import java.io.IOException ; 26 import java.util.ArrayList ; 27 import java.util.LinkedList ; 28 import java.util.List ; 29 import java.util.logging.Level ; 30 import java.util.logging.Logger ; 31 32 33 import org.xsocket.IDispatcher; 34 35 36 42 final class IoSocketDispatcherPool { 43 44 private static final Logger LOG = Logger.getLogger(IoSocketDispatcherPool.class.getName()); 45 46 private boolean isRunning = true; 47 48 49 50 private int preallocationSize = 65536; 52 private boolean useDirect = false; 53 54 55 private final List <IIoSocketDispatcherPoolListener> listeners = new ArrayList <IIoSocketDispatcherPoolListener>(); 57 58 59 60 private final LinkedList <IoSocketDispatcher> dispatchers = new LinkedList <IoSocketDispatcher>(); 62 private int size = 0; 63 private int pointer = 0; 64 65 66 IoSocketDispatcherPool(int preallocationSize, boolean useDirect) { 67 this.preallocationSize = preallocationSize; 68 this.useDirect = useDirect; 69 } 70 71 72 synchronized void setSize(int size) { 73 this.size = size; 74 updateDispatcher(); 75 } 76 77 synchronized void addListener(IIoSocketDispatcherPoolListener listener) { 78 listeners.add(listener); 79 } 80 81 82 synchronized boolean removeListener(IIoSocketDispatcherPoolListener listener) { 83 return listeners.remove(listener); 84 } 85 86 87 void run() { 88 isRunning = true; 89 updateDispatcher(); 90 } 91 92 93 94 100 synchronized int getReceiveBufferPreallocationSize() { 101 return preallocationSize; 102 } 103 104 110 synchronized void setReceiveBufferPreallocationSize(int size) { 111 preallocationSize = size; 112 } 113 114 115 @SuppressWarnings ("unchecked") 116 List <IDispatcher<IoSocketHandler>> getDispatchers() { 117 List <IDispatcher<IoSocketHandler>> result = null; 118 synchronized (dispatchers) { 119 result = (List <IDispatcher<IoSocketHandler>>) dispatchers.clone(); 120 } 121 return result; 122 } 123 124 125 126 127 @SuppressWarnings ("unchecked") 128 private synchronized void updateDispatcher() { 129 if (isRunning) { 130 int currentRunning = dispatchers.size(); 131 132 if (currentRunning != size) { 133 if (currentRunning > size) { 134 for (int i = size; i < currentRunning; i++) { 135 IDispatcher<IoSocketHandler> dispatcher = dispatchers.getLast(); 136 dispatchers.remove(dispatcher); 137 try { 138 dispatcher.close(); 139 } catch (IOException ioe) { 140 if (LOG.isLoggable(Level.FINE)) { 141 LOG.fine("error occured by closing the dispatcher " + dispatcher + ". reason " + ioe.toString()); 142 } 143 } 144 145 for (IIoSocketDispatcherPoolListener listener : listeners) { 146 listener.onDispatcherRemoved(dispatcher); 147 } 148 } 149 150 } else if ( currentRunning < size) { 151 for (int i = currentRunning; i < size; i++) { 152 IoSocketDispatcher dispatcher = new IoSocketDispatcher(new UnsynchronizedMemoryManager(preallocationSize, useDirect)); 153 dispatchers.addLast(dispatcher); 154 155 Thread t = new Thread (dispatcher); 156 t.setDaemon(false); 157 t.setName(IoSocketDispatcher.DISPATCHER_PREFIX + "#" + i); 158 t.start(); 159 160 for (IIoSocketDispatcherPoolListener listener : listeners) { 161 listener.onDispatcherAdded(dispatcher); 162 } 163 164 } 165 } 166 } 167 168 IDispatcher<IoSocketHandler>[] connectionDispatchers = new IDispatcher[dispatchers.size()]; 169 for (int i = 0; i < connectionDispatchers.length; i++) { 170 connectionDispatchers[i] = dispatchers.get(i); 171 } 172 } 173 } 174 175 176 177 181 void shutdown() { 182 isRunning = false; 183 184 if (LOG.isLoggable(Level.FINER)) { 185 LOG.fine("terminate dispatchers"); 186 } 187 188 for (IDispatcher<IoSocketHandler> dispatcher : dispatchers) { 189 try { 190 dispatcher.close(); 191 192 for (IIoSocketDispatcherPoolListener listener : listeners) { 193 listener.onDispatcherRemoved(dispatcher); 194 } 195 196 } catch (IOException ioe) { 197 if (LOG.isLoggable(Level.FINE)) { 198 LOG.fine("error occured by closing the dispatcher " + dispatcher + ". reason " + ioe.toString()); 199 } 200 } 201 } 202 203 dispatchers.clear(); 204 } 205 206 207 208 209 IoSocketDispatcher nextDispatcher() { 210 pointer++; 212 if (pointer >= size) { 213 pointer = 0; 214 } 215 216 return dispatchers.get(pointer); 217 } 218 219 220 221 @SuppressWarnings ("unchecked") 222 long getNumberOfConnectionTimeouts() { 223 long timeouts = 0; 224 225 LinkedList <IoSocketDispatcher> copy = (LinkedList <IoSocketDispatcher>) dispatchers.clone(); 226 for (IoSocketDispatcher dispatcher : copy) { 227 timeouts += dispatcher.getCountConnectionTimeout(); 228 } 229 return timeouts; 230 } 231 232 233 @SuppressWarnings ("unchecked") 234 public long getNumberOfIdleTimeouts() { 235 long timeouts = 0; 236 237 LinkedList <IoSocketDispatcher> copy = (LinkedList <IoSocketDispatcher>) dispatchers.clone(); 238 for (IoSocketDispatcher dispatcher : copy) { 239 timeouts += dispatcher.getCountIdleTimeout(); 240 } 241 return timeouts; 242 } 243 244 } 245 | Popular Tags |