1 20 package org.apache.mina.transport.vmpipe.support; 21 22 import java.net.SocketAddress ; 23 import java.util.ArrayList ; 24 import java.util.List ; 25 import java.util.concurrent.BlockingQueue ; 26 import java.util.concurrent.LinkedBlockingQueue ; 27 28 import org.apache.mina.common.IoFilter.WriteRequest; 29 import org.apache.mina.common.IoFilterChain; 30 import org.apache.mina.common.IoHandler; 31 import org.apache.mina.common.IoService; 32 import org.apache.mina.common.IoServiceConfig; 33 import org.apache.mina.common.IoSession; 34 import org.apache.mina.common.IoSessionConfig; 35 import org.apache.mina.common.TransportType; 36 import org.apache.mina.common.support.BaseIoSession; 37 import org.apache.mina.common.support.BaseIoSessionConfig; 38 import org.apache.mina.common.support.IoServiceListenerSupport; 39 40 46 public class VmPipeSessionImpl extends BaseIoSession { 47 private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() { 48 }; 49 50 private final IoService service; 51 52 private final IoServiceConfig serviceConfig; 53 54 private final IoServiceListenerSupport serviceListeners; 55 56 private final SocketAddress localAddress; 57 58 private final SocketAddress remoteAddress; 59 60 private final SocketAddress serviceAddress; 61 62 private final IoHandler handler; 63 64 private final VmPipeFilterChain filterChain; 65 66 private final VmPipeSessionImpl remoteSession; 67 68 final Object lock; 69 70 final BlockingQueue <Object > pendingDataQueue; 71 72 75 public VmPipeSessionImpl(IoService service, IoServiceConfig serviceConfig, 76 IoServiceListenerSupport serviceListeners, Object lock, 77 SocketAddress localAddress, IoHandler handler, VmPipe remoteEntry) { 78 this.service = service; 79 this.serviceConfig = serviceConfig; 80 this.serviceListeners = serviceListeners; 81 this.lock = lock; 82 this.localAddress = localAddress; 83 this.remoteAddress = this.serviceAddress = remoteEntry.getAddress(); 84 this.handler = handler; 85 this.filterChain = new VmPipeFilterChain(this); 86 this.pendingDataQueue = new LinkedBlockingQueue <Object >(); 87 88 remoteSession = new VmPipeSessionImpl(this, remoteEntry); 89 } 90 91 94 private VmPipeSessionImpl(VmPipeSessionImpl remoteSession, VmPipe entry) { 95 this.service = entry.getAcceptor(); 96 this.serviceConfig = entry.getConfig(); 97 this.serviceListeners = entry.getListeners(); 98 this.lock = remoteSession.lock; 99 this.localAddress = this.serviceAddress = remoteSession.remoteAddress; 100 this.remoteAddress = remoteSession.localAddress; 101 this.handler = entry.getHandler(); 102 this.filterChain = new VmPipeFilterChain(this); 103 this.remoteSession = remoteSession; 104 this.pendingDataQueue = new LinkedBlockingQueue <Object >(); 105 } 106 107 public IoService getService() { 108 return service; 109 } 110 111 IoServiceListenerSupport getServiceListeners() { 112 return serviceListeners; 113 } 114 115 public IoServiceConfig getServiceConfig() { 116 return serviceConfig; 117 } 118 119 public IoSessionConfig getConfig() { 120 return CONFIG; 121 } 122 123 public IoFilterChain getFilterChain() { 124 return filterChain; 125 } 126 127 public VmPipeSessionImpl getRemoteSession() { 128 return remoteSession; 129 } 130 131 public IoHandler getHandler() { 132 return handler; 133 } 134 135 @Override 136 protected void close0() { 137 filterChain.fireFilterClose(this); 138 } 139 140 @Override 141 protected void write0(WriteRequest writeRequest) { 142 this.filterChain.fireFilterWrite(this, writeRequest); 143 } 144 145 public int getScheduledWriteRequests() { 146 return 0; 147 } 148 149 public int getScheduledWriteBytes() { 150 return 0; 151 } 152 153 public TransportType getTransportType() { 154 return TransportType.VM_PIPE; 155 } 156 157 public SocketAddress getRemoteAddress() { 158 return remoteAddress; 159 } 160 161 public SocketAddress getLocalAddress() { 162 return localAddress; 163 } 164 165 public SocketAddress getServiceAddress() { 166 return serviceAddress; 167 } 168 169 @Override 170 protected void updateTrafficMask() { 171 if (getTrafficMask().isReadable() || getTrafficMask().isWritable()) { 172 List <Object > data = new ArrayList <Object >(); 173 174 pendingDataQueue.drainTo(data); 175 176 for (Object aData : data) { 177 if (aData instanceof WriteRequest) { 178 WriteRequest wr = (WriteRequest) aData; 182 filterChain.doWrite(this, wr); 183 } else { 184 filterChain.fireMessageReceived(this, aData); 188 } 189 } 190 } 191 } 192 } 193 | Popular Tags |