1 22 package org.xsocket.stream.io.impl; 23 24 import java.io.IOException ; 25 import java.nio.ByteBuffer ; 26 import java.util.LinkedList ; 27 import java.util.Queue ; 28 import java.util.concurrent.ConcurrentLinkedQueue ; 29 import java.util.logging.Level ; 30 import java.util.logging.Logger ; 31 32 import org.xsocket.stream.io.spi.IIoHandlerCallback; 33 import org.xsocket.stream.io.spi.IIoHandlerContext; 34 35 36 37 38 43 final class IoMultithreadedHandler extends ChainableIoHandler { 44 45 private static final Logger LOG = Logger.getLogger(IoMultithreadedHandler.class.getName()); 46 47 48 private String id = "<null>"; 49 private IIoHandlerContext ctx = null; 50 private final TaskQueue taskQueue = new TaskQueue(); 51 52 private final IOEventHandler eventHandler = new IOEventHandler(); 53 54 55 59 IoMultithreadedHandler(ChainableIoHandler successor, IIoHandlerContext ctx) { 60 super(successor); 61 this.ctx = ctx; 62 63 setSuccessor(successor); 64 } 65 66 public void init(IIoHandlerCallback callbackHandler) throws IOException { 67 setPreviousCallback(callbackHandler); 68 getSuccessor().init(eventHandler); 69 } 70 71 72 73 74 75 76 79 public LinkedList <ByteBuffer > drainIncoming() { 80 return getSuccessor().drainIncoming(); 81 } 82 83 84 87 public void close(boolean immediate) throws IOException { 88 if (!immediate) { 89 flushOutgoing(); 90 } 91 92 getSuccessor().close(immediate); 93 } 94 95 96 99 public void writeOutgoing(ByteBuffer buffer) throws IOException { 100 getSuccessor().writeOutgoing(buffer); 101 } 102 103 104 107 public void writeOutgoing(LinkedList <ByteBuffer > buffers) throws IOException { 108 getSuccessor().writeOutgoing(buffers); 109 } 110 111 112 113 116 public void flushOutgoing() throws IOException { 117 getSuccessor().flushOutgoing(); 118 } 119 120 121 private final class IOEventHandler implements IIoHandlerCallback { 122 123 public void onWriteException(IOException ioException) { 124 getPreviousCallback().onWriteException(ioException); 125 } 126 127 public void onWritten() { 128 getPreviousCallback().onWritten(); 129 } 130 131 132 public void onConnectionAbnormalTerminated() { 133 getPreviousCallback().onConnectionAbnormalTerminated(); 134 } 135 136 137 public void onConnect() { 138 if (ctx.isAppHandlerListenForConnectEvent()) { 139 Runnable task = new Runnable () { 140 public void run() { 141 try { 142 getPreviousCallback().onConnect(); 143 } catch (Exception e) { 144 if (LOG.isLoggable(Level.FINE)) { 145 LOG.fine("[" + id + "] error occured by handling connect. Reason: " + e.toString()); 146 } 147 } 148 } 149 }; 150 151 taskQueue.processTask(task); 152 } 153 } 154 155 156 public void onDataRead() { 157 if (ctx.isAppHandlerListenForDataEvent()) { 158 Runnable task = new Runnable () { 159 public void run() { 160 try { 161 getPreviousCallback().onDataRead(); 162 } catch (Exception e) { 163 if (LOG.isLoggable(Level.FINE)) { 164 LOG.fine("[" + id + "] error occured by handling data. Reason: " + e.toString()); 165 } 166 } 167 } 168 }; 169 170 taskQueue.processTask(task); 171 } 172 } 173 174 175 176 public void onDisconnect() { 177 if (ctx.isAppHandlerListenforDisconnectEvent()) { 178 Runnable task = new Runnable () { 179 public void run() { 180 try { 181 getPreviousCallback().onDisconnect(); 182 } catch (Exception e) { 183 if (LOG.isLoggable(Level.FINE)) { 184 LOG.fine("[" + id + "] error occured by handling connect. Reason: " + e.toString()); 185 } 186 } 187 } 188 }; 189 190 taskQueue.processTask(task); 191 } 192 193 } 194 195 196 public void onConnectionTimeout() { 197 Runnable task = new Runnable () { 198 public void run() { 199 try { 200 getPreviousCallback().onConnectionTimeout(); 201 } catch (Exception e) { 202 if (LOG.isLoggable(Level.FINE)) { 203 LOG.fine("[" + id + "] error occured by handling onConnectionTimeout. Reason: " + e.toString()); 204 } 205 } 206 } 207 }; 208 209 taskQueue.processTask(task); 210 } 211 212 213 public void onIdleTimeout() { 214 Runnable task = new Runnable () { 215 public void run() { 216 try { 217 getPreviousCallback().onIdleTimeout(); 218 } catch (Exception e) { 219 if (LOG.isLoggable(Level.FINE)) { 220 LOG.fine("[" + id + "] error occured by handling onIdleTimeout. Reason: " + e.toString()); 221 } 222 } 223 } 224 }; 225 226 taskQueue.processTask(task); 227 } 228 } 229 230 231 232 233 private final class TaskQueue { 234 235 private final TaskQueueProcessor taskProcessor = new TaskQueueProcessor(); 236 237 private final Queue <Runnable > tasks = new ConcurrentLinkedQueue <Runnable >(); 238 239 public void processTask(Runnable task) { 240 241 if (ctx.getWorkerpool() != null) { 243 244 tasks.offer(task); 246 247 ctx.getWorkerpool().execute(taskProcessor); 249 250 251 } else { 253 task.run(); 254 } 255 } 256 } 257 258 259 private final class TaskQueueProcessor implements Runnable { 260 261 public void run() { 262 if (!ctx.isAppHandlerThreadSafe()) { 263 synchronized (IoMultithreadedHandler.this) { 264 Runnable task = taskQueue.tasks.poll(); 265 processTask(task); 266 } 267 } else { 268 Runnable task = null; 269 task = taskQueue.tasks.poll(); 270 processTask(task); 271 } 272 } 273 274 private void processTask(Runnable task) { 275 if (task != null) { 276 try { 277 task.run(); 278 } catch (Exception e) { 279 if (LOG.isLoggable(Level.FINE)) { 280 LOG.fine("error occured by proccesing task " + task); 281 } 282 } 283 } 284 } 285 } 286 287 } 288 | Popular Tags |