1 4 package com.tc.net.protocol.tcm; 5 6 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap; 7 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet; 8 import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef; 9 10 import com.tc.async.api.Sink; 11 import com.tc.bytes.TCByteBuffer; 12 import com.tc.logging.TCLogger; 13 import com.tc.net.MaxConnectionsExceededException; 14 import com.tc.net.TCSocketAddress; 15 import com.tc.net.protocol.NetworkLayer; 16 import com.tc.net.protocol.NetworkStackID; 17 import com.tc.net.protocol.TCNetworkMessage; 18 import com.tc.net.protocol.transport.MessageTransport; 19 import com.tc.util.Assert; 20 import com.tc.util.TCTimeoutException; 21 22 import java.io.IOException ; 23 import java.net.UnknownHostException ; 24 import java.util.Iterator ; 25 import java.util.Map ; 26 import java.util.Set ; 27 28 31 abstract class AbstractMessageChannel implements MessageChannel, MessageChannelInternal { 32 33 private final Map attachments = new ConcurrentReaderHashMap(); 34 private final Object attachmentLock = new Object (); 35 private final Set listeners = new CopyOnWriteArraySet(); 36 private final ChannelStatus status = new ChannelStatus(); 37 private final SynchronizedRef remoteAddr = new SynchronizedRef(null); 38 private final SynchronizedRef localAddr = new SynchronizedRef(null); 39 private final TCMessageFactory msgFactory; 40 private final TCMessageRouter router; 41 private final TCMessageParser parser; 42 private final TCLogger logger; 43 44 protected NetworkLayer sendLayer; 45 46 AbstractMessageChannel(TCMessageRouter router, TCLogger logger, TCMessageFactory msgFactory) { 47 this.router = router; 48 this.logger = logger; 49 this.msgFactory = msgFactory; 50 this.parser = new TCMessageParser(this.msgFactory); 51 } 52 53 public void addAttachment(String key, Object value, boolean replace) { 54 synchronized (attachmentLock) { 55 boolean exists = attachments.containsKey(key); 56 if (replace || !exists) { 57 attachments.put(key, value); 58 } 59 } 60 } 61 62 public Object removeAttachment(String key) { 63 return this.attachments.remove(key); 64 } 65 66 public Object getAttachment(String key) { 67 return this.attachments.get(key); 68 } 69 70 public boolean isOpen() { 71 return this.status.isOpen(); 72 } 73 74 public boolean isClosed() { 75 return this.status.isClosed(); 76 } 77 78 public void addListener(ChannelEventListener listener) { 79 if (listener == null) { return; } 80 81 listeners.add(listener); 82 } 83 84 public TCMessage createMessage(TCMessageType type) { 85 TCMessage rv = this.msgFactory.createMessage(this, type); 86 88 return rv; 89 } 90 91 public void routeMessageType(TCMessageType messageType, TCMessageSink dest) { 92 router.routeMessageType(messageType, dest); 93 } 94 95 public void unrouteMessageType(TCMessageType messageType) { 96 router.unrouteMessageType(messageType); 97 } 98 99 public abstract NetworkStackID open() throws MaxConnectionsExceededException, TCTimeoutException, UnknownHostException , IOException ; 100 101 104 public void routeMessageType(TCMessageType messageType, Sink destSink, Sink hydrateSink) { 105 routeMessageType(messageType, new TCMessageSinkToSedaSink(destSink, hydrateSink)); 106 } 107 108 public void close() { 109 synchronized (status) { 110 if (!status.isClosed()) { 111 Assert.assertNotNull(this.sendLayer); 112 this.sendLayer.close(); 113 } 114 status.close(); 115 } 116 } 117 118 public final boolean isConnected() { 119 return this.sendLayer != null && this.sendLayer.isConnected(); 120 } 121 122 public final void setSendLayer(NetworkLayer layer) { 123 this.sendLayer = layer; 124 } 125 126 public final void setReceiveLayer(NetworkLayer layer) { 127 throw new UnsupportedOperationException (); 128 } 129 130 public final void send(final TCNetworkMessage message) { 131 if (logger.isDebugEnabled()) { 132 final Runnable logMsg = new Runnable () { 133 public void run() { 134 logger.debug("Message Sent: " + message.toString()); 135 } 136 }; 137 138 final Runnable existingCallback = message.getSentCallback(); 139 final Runnable newCallback; 140 141 if (existingCallback != null) { 142 newCallback = new Runnable () { 143 public void run() { 144 try { 145 existingCallback.run(); 146 } catch (Exception e) { 147 logger.error(e); 148 } finally { 149 logMsg.run(); 150 } 151 } 152 }; 153 } else { 154 newCallback = logMsg; 155 } 156 157 message.setSentCallback(newCallback); 158 } 159 160 this.sendLayer.send(message); 161 } 162 163 public final void receive(TCByteBuffer[] msgData) { 164 this.router.putMessage(parser.parseMessage(this, msgData)); 165 } 166 167 protected final ChannelStatus getStatus() { 168 return status; 169 } 170 171 public void notifyTransportDisconnected(MessageTransport transport) { 172 this.remoteAddr.set(null); 173 this.localAddr.set(null); 174 fireTransportDisconnectedEvent(); 175 } 176 177 protected void fireTransportDisconnectedEvent() { 178 fireEvent(new ChannelEventImpl(ChannelEventType.TRANSPORT_DISCONNECTED_EVENT, AbstractMessageChannel.this)); 179 } 180 181 public void notifyTransportConnected(MessageTransport transport) { 182 this.remoteAddr.set(transport.getRemoteAddress()); 183 this.localAddr.set(transport.getLocalAddress()); 184 fireEvent(new ChannelEventImpl(ChannelEventType.TRANSPORT_CONNECTED_EVENT, AbstractMessageChannel.this)); 185 } 186 187 public void notifyTransportConnectAttempt(MessageTransport transport) { 188 return; 189 } 190 191 public void notifyTransportClosed(MessageTransport transport) { 192 return; 194 } 195 196 public TCSocketAddress getLocalAddress() { 197 return (TCSocketAddress) this.localAddr.get(); 198 } 199 200 public TCSocketAddress getRemoteAddress() { 201 return (TCSocketAddress) this.remoteAddr.get(); 202 } 203 204 private void fireEvent(ChannelEventImpl event) { 205 for (Iterator i = listeners.iterator(); i.hasNext();) { 206 ((ChannelEventListener) i.next()).notifyChannelEvent(event); 207 } 208 } 209 210 class ChannelStatus { 211 private ChannelState state; 212 213 public ChannelStatus() { 214 this.state = ChannelState.CLOSED; 215 } 216 217 synchronized void close() { 219 changeState(ChannelState.CLOSED); 220 fireEvent(new ChannelEventImpl(ChannelEventType.CHANNEL_CLOSED_EVENT, AbstractMessageChannel.this)); 221 } 222 223 synchronized void open() { 225 changeState(ChannelState.OPEN); 226 fireEvent(new ChannelEventImpl(ChannelEventType.CHANNEL_OPENED_EVENT, AbstractMessageChannel.this)); 227 } 228 229 synchronized boolean isOpen() { 230 return ChannelState.OPEN.equals(state); 231 } 232 233 synchronized boolean isClosed() { 234 return ChannelState.CLOSED.equals(state); 235 } 236 237 private synchronized void changeState(ChannelState newState) { 238 state = newState; 239 } 240 } 241 242 private static class ChannelState { 243 private static final int STATE_OPEN = 1; 244 private static final int STATE_CLOSED = 2; 245 246 static final ChannelState OPEN = new ChannelState(STATE_OPEN); 247 static final ChannelState CLOSED = new ChannelState(STATE_CLOSED); 248 249 private final int state; 250 251 private ChannelState(int state) { 252 this.state = state; 253 } 254 255 public String toString() { 256 switch (state) { 257 case STATE_OPEN: 258 return "OPEN"; 259 case STATE_CLOSED: 260 return "CLOSED"; 261 default: 262 return "UNKNOWN"; 263 } 264 } 265 } 266 } | Popular Tags |