1 20 package org.apache.mina.transport.socket.nio.support; 21 22 import java.net.SocketAddress ; 23 import java.net.SocketException ; 24 import java.nio.channels.DatagramChannel ; 25 import java.nio.channels.SelectionKey ; 26 import java.util.Queue ; 27 import java.util.concurrent.ConcurrentLinkedQueue ; 28 29 import org.apache.mina.common.BroadcastIoSession; 30 import org.apache.mina.common.ByteBuffer; 31 import org.apache.mina.common.IoFilter.WriteRequest; 32 import org.apache.mina.common.IoFilterChain; 33 import org.apache.mina.common.IoHandler; 34 import org.apache.mina.common.IoService; 35 import org.apache.mina.common.IoServiceConfig; 36 import org.apache.mina.common.IoSession; 37 import org.apache.mina.common.IoSessionConfig; 38 import org.apache.mina.common.RuntimeIOException; 39 import org.apache.mina.common.TransportType; 40 import org.apache.mina.common.WriteFuture; 41 import org.apache.mina.common.support.BaseIoSession; 42 import org.apache.mina.transport.socket.nio.DatagramServiceConfig; 43 import org.apache.mina.transport.socket.nio.DatagramSessionConfig; 44 45 51 class DatagramSessionImpl extends BaseIoSession implements BroadcastIoSession { 52 private final IoService wrapperManager; 53 54 private final IoServiceConfig serviceConfig; 55 56 private final DatagramSessionConfig config = new SessionConfigImpl(); 57 58 private final DatagramService managerDelegate; 59 60 private final DatagramFilterChain filterChain; 61 62 private final DatagramChannel ch; 63 64 private final Queue <WriteRequest> writeRequestQueue; 65 66 private final IoHandler handler; 67 68 private final SocketAddress localAddress; 69 70 private final SocketAddress serviceAddress; 71 72 private SocketAddress remoteAddress; 73 74 private SelectionKey key; 75 76 private int readBufferSize; 77 78 81 DatagramSessionImpl(IoService wrapperManager, 82 DatagramService managerDelegate, IoServiceConfig serviceConfig, 83 DatagramChannel ch, IoHandler defaultHandler, 84 SocketAddress serviceAddress, SocketAddress localAddress) { 85 this.wrapperManager = wrapperManager; 86 this.managerDelegate = managerDelegate; 87 this.filterChain = new DatagramFilterChain(this); 88 this.ch = ch; 89 this.writeRequestQueue = new ConcurrentLinkedQueue <WriteRequest>(); 90 this.handler = defaultHandler; 91 this.remoteAddress = ch.socket().getRemoteSocketAddress(); 92 93 this.serviceAddress = serviceAddress; 94 this.localAddress = localAddress; 95 this.serviceConfig = serviceConfig; 96 97 IoSessionConfig sessionConfig = serviceConfig.getSessionConfig(); 99 if (sessionConfig instanceof DatagramSessionConfig) { 100 DatagramSessionConfig cfg = (DatagramSessionConfig) sessionConfig; 101 this.config.setBroadcast(cfg.isBroadcast()); 102 this.config.setReceiveBufferSize(cfg.getReceiveBufferSize()); 103 this.config.setReuseAddress(cfg.isReuseAddress()); 104 this.config.setSendBufferSize(cfg.getSendBufferSize()); 105 106 if (this.config.getTrafficClass() != cfg.getTrafficClass()) { 107 this.config.setTrafficClass(cfg.getTrafficClass()); 108 } 109 } 110 } 111 112 public IoService getService() { 113 return wrapperManager; 114 } 115 116 public IoServiceConfig getServiceConfig() { 117 return serviceConfig; 118 } 119 120 public IoSessionConfig getConfig() { 121 return config; 122 } 123 124 DatagramService getManagerDelegate() { 125 return managerDelegate; 126 } 127 128 public IoFilterChain getFilterChain() { 129 return filterChain; 130 } 131 132 DatagramChannel getChannel() { 133 return ch; 134 } 135 136 SelectionKey getSelectionKey() { 137 return key; 138 } 139 140 void setSelectionKey(SelectionKey key) { 141 this.key = key; 142 } 143 144 public IoHandler getHandler() { 145 return handler; 146 } 147 148 @Override 149 protected void close0() { 150 IoServiceConfig config = getServiceConfig(); 151 if (config instanceof DatagramServiceConfig) { 152 ((DatagramServiceConfig) config).getSessionRecycler().remove(this); 153 } 154 filterChain.fireFilterClose(this); 155 } 156 157 Queue <WriteRequest> getWriteRequestQueue() { 158 return writeRequestQueue; 159 } 160 161 @Override 162 public WriteFuture write(Object message, SocketAddress destination) { 163 if (!this.config.isBroadcast()) { 164 throw new IllegalStateException ("Non-broadcast session"); 165 } 166 167 return super.write(message, destination); 168 } 169 170 @Override 171 protected void write0(WriteRequest writeRequest) { 172 filterChain.fireFilterWrite(this, writeRequest); 173 } 174 175 public int getScheduledWriteRequests() { 176 return writeRequestQueue.size(); 177 } 178 179 public int getScheduledWriteBytes() { 180 int byteSize = 0; 181 182 for (WriteRequest request : writeRequestQueue) { 183 byteSize += ((ByteBuffer) request.getMessage()).remaining(); 184 } 185 186 return byteSize; 187 } 188 189 public TransportType getTransportType() { 190 return TransportType.DATAGRAM; 191 } 192 193 public SocketAddress getRemoteAddress() { 194 return remoteAddress; 195 } 196 197 void setRemoteAddress(SocketAddress remoteAddress) { 198 this.remoteAddress = remoteAddress; 199 } 200 201 public SocketAddress getLocalAddress() { 202 return localAddress; 203 } 204 205 public SocketAddress getServiceAddress() { 206 return serviceAddress; 207 } 208 209 @Override 210 protected void updateTrafficMask() { 211 managerDelegate.updateTrafficMask(this); 212 } 213 214 int getReadBufferSize() { 215 return readBufferSize; 216 } 217 218 private class SessionConfigImpl extends DatagramSessionConfigImpl implements 219 DatagramSessionConfig { 220 @Override 221 public int getReceiveBufferSize() { 222 try { 223 return ch.socket().getReceiveBufferSize(); 224 } catch (SocketException e) { 225 throw new RuntimeIOException(e); 226 } 227 } 228 229 @Override 230 public void setReceiveBufferSize(int receiveBufferSize) { 231 if (DatagramSessionConfigImpl.isSetReceiveBufferSizeAvailable()) { 232 try { 233 ch.socket().setReceiveBufferSize(receiveBufferSize); 234 receiveBufferSize = ch.socket().getReceiveBufferSize(); 236 DatagramSessionImpl.this.readBufferSize = receiveBufferSize; 237 } catch (SocketException e) { 238 throw new RuntimeIOException(e); 239 } 240 } 241 } 242 243 @Override 244 public boolean isBroadcast() { 245 try { 246 return ch.socket().getBroadcast(); 247 } catch (SocketException e) { 248 throw new RuntimeIOException(e); 249 } 250 } 251 252 @Override 253 public void setBroadcast(boolean broadcast) { 254 try { 255 ch.socket().setBroadcast(broadcast); 256 } catch (SocketException e) { 257 throw new RuntimeIOException(e); 258 } 259 } 260 261 @Override 262 public int getSendBufferSize() { 263 try { 264 return ch.socket().getSendBufferSize(); 265 } catch (SocketException e) { 266 throw new RuntimeIOException(e); 267 } 268 } 269 270 @Override 271 public void setSendBufferSize(int sendBufferSize) { 272 if (DatagramSessionConfigImpl.isSetSendBufferSizeAvailable()) { 273 try { 274 ch.socket().setSendBufferSize(sendBufferSize); 275 } catch (SocketException e) { 276 throw new RuntimeIOException(e); 277 } 278 } 279 } 280 281 @Override 282 public boolean isReuseAddress() { 283 try { 284 return ch.socket().getReuseAddress(); 285 } catch (SocketException e) { 286 throw new RuntimeIOException(e); 287 } 288 } 289 290 @Override 291 public void setReuseAddress(boolean reuseAddress) { 292 try { 293 ch.socket().setReuseAddress(reuseAddress); 294 } catch (SocketException e) { 295 throw new RuntimeIOException(e); 296 } 297 } 298 299 @Override 300 public int getTrafficClass() { 301 if (DatagramSessionConfigImpl.isGetTrafficClassAvailable()) { 302 try { 303 return ch.socket().getTrafficClass(); 304 } catch (SocketException e) { 305 throw new RuntimeIOException(e); 306 } 307 } else { 308 return 0; 309 } 310 } 311 312 @Override 313 public void setTrafficClass(int trafficClass) { 314 if (DatagramSessionConfigImpl.isSetTrafficClassAvailable()) { 315 try { 316 ch.socket().setTrafficClass(trafficClass); 317 } catch (SocketException e) { 318 throw new RuntimeIOException(e); 319 } 320 } 321 } 322 } 323 } | Popular Tags |