1 5 package com.tc.object.net; 6 7 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 8 9 import com.tc.logging.TCLogger; 10 import com.tc.logging.TCLogging; 11 import com.tc.net.TCSocketAddress; 12 import com.tc.net.protocol.tcm.ChannelID; 13 import com.tc.net.protocol.tcm.ChannelManager; 14 import com.tc.net.protocol.tcm.ChannelManagerEventListener; 15 import com.tc.net.protocol.tcm.MessageChannel; 16 import com.tc.net.protocol.tcm.MessageChannelInternal; 17 import com.tc.net.protocol.tcm.TCMessageType; 18 import com.tc.object.msg.BatchTransactionAcknowledgeMessage; 19 import com.tc.object.msg.ClientHandshakeAckMessage; 20 21 import java.util.Collection ; 22 import java.util.Collections ; 23 import java.util.HashMap ; 24 import java.util.Iterator ; 25 import java.util.List ; 26 import java.util.Map ; 27 28 32 public class DSOChannelManagerImpl implements DSOChannelManager, DSOChannelManagerMBean { 33 private static final TCLogger logger = TCLogging.getLogger(DSOChannelManager.class); 34 private static final MessageChannel[] EMPTY_CHANNEL_ARRAY = new MessageChannel[] {}; 35 36 private final Map activeChannels = new HashMap (); 37 private final List eventListeners = new CopyOnWriteArrayList(); 38 39 private final ChannelManager genericChannelManager; 40 41 public DSOChannelManagerImpl(ChannelManager genericChannelManager) { 42 this.genericChannelManager = genericChannelManager; 43 this.genericChannelManager.addEventListener(new GenericChannelEventListener()); 44 } 45 46 public MessageChannel getActiveChannel(ChannelID id) throws NoSuchChannelException { 47 final MessageChannel rv; 48 synchronized (activeChannels) { 49 rv = (MessageChannel) activeChannels.get(id); 50 } 51 if (rv == null) { throw new NoSuchChannelException("No such channel: " + id); } 52 return rv; 53 } 54 55 public void closeAll(Collection channelIDs) { 56 for (Iterator i = channelIDs.iterator(); i.hasNext();) { 57 ChannelID id = (ChannelID) i.next(); 58 59 MessageChannel channel = genericChannelManager.getChannel(id); 60 if (channel != null) { 61 channel.close(); 62 } 63 } 64 } 65 66 public MessageChannel[] getActiveChannels() { 67 synchronized (activeChannels) { 68 return (MessageChannel[]) activeChannels.values().toArray(EMPTY_CHANNEL_ARRAY); 69 } 70 } 71 72 public boolean isActiveID(ChannelID channelID) { 73 synchronized (activeChannels) { 74 return activeChannels.containsKey(channelID); 75 } 76 } 77 78 public String getChannelAddress(ChannelID channelID) { 79 try { 80 MessageChannel channel = getActiveChannel(channelID); 81 TCSocketAddress addr = channel.getRemoteAddress(); 82 return addr.getStringForm(); 83 } catch (NoSuchChannelException e) { 84 return "no longer connected"; 85 } 86 } 87 88 public BatchTransactionAcknowledgeMessage newBatchTransactionAcknowledgeMessage(ChannelID channelID) 89 throws NoSuchChannelException { 90 return (BatchTransactionAcknowledgeMessage) getActiveChannel(channelID) 91 .createMessage(TCMessageType.BATCH_TRANSACTION_ACK_MESSAGE); 92 } 93 94 private ClientHandshakeAckMessage newClientHandshakeAckMessage(ChannelID channelID) throws NoSuchChannelException { 95 MessageChannelInternal channel = genericChannelManager.getChannel(channelID); 96 if (channel == null) { throw new NoSuchChannelException(); } 97 return (ClientHandshakeAckMessage) channel.createMessage(TCMessageType.CLIENT_HANDSHAKE_ACK_MESSAGE); 98 } 99 100 public Collection getAllActiveChannelIDs() { 101 synchronized (activeChannels) { 102 return Collections.unmodifiableCollection(activeChannels.keySet()); 103 } 104 } 105 106 public void makeChannelActive(ChannelID channelID, long startIDs, long endIDs, boolean persistent) { 107 try { 108 ClientHandshakeAckMessage ackMsg = newClientHandshakeAckMessage(channelID); 109 MessageChannel channel = ackMsg.getChannel(); 110 synchronized (activeChannels) { 111 activeChannels.put(channel.getChannelID(), channel); 112 ackMsg.initialize(startIDs, endIDs, persistent, getActiveChannels()); 113 ackMsg.send(); 114 115 } 116 fireChannelCreatedEvent(channel); 117 } catch (NoSuchChannelException nsce) { 118 logger.warn("Not sending handshake message to disconnected client: " + channelID); 119 } 120 } 121 122 public void makeChannelActiveNoAck(MessageChannel channel) { 123 synchronized (activeChannels) { 124 activeChannels.put(channel.getChannelID(), channel); 125 } 126 } 127 128 public void addEventListener(DSOChannelManagerEventListener listener) { 129 if (listener == null) { throw new NullPointerException ("listener cannot be be null"); } 130 eventListeners.add(listener); 131 } 132 133 public Collection getRawChannelIDs() { 134 return genericChannelManager.getAllChannelIDs(); 135 } 136 137 private void fireChannelCreatedEvent(MessageChannel channel) { 138 for (Iterator iter = eventListeners.iterator(); iter.hasNext();) { 139 DSOChannelManagerEventListener eventListener = (DSOChannelManagerEventListener) iter.next(); 140 eventListener.channelCreated(channel); 141 } 142 } 143 144 private void fireChannelRemovedEvent(MessageChannel channel) { 145 for (Iterator iter = eventListeners.iterator(); iter.hasNext();) { 146 DSOChannelManagerEventListener eventListener = (DSOChannelManagerEventListener) iter.next(); 147 eventListener.channelRemoved(channel); 148 } 149 } 150 151 private class GenericChannelEventListener implements ChannelManagerEventListener { 152 153 public void channelCreated(MessageChannel channel) { 154 } 156 157 public void channelRemoved(MessageChannel channel) { 158 synchronized (activeChannels) { 159 activeChannels.remove(channel.getChannelID()); 160 } 161 fireChannelRemovedEvent(channel); 162 } 163 164 } 165 166 } 167 | Popular Tags |