1 5 package com.tc.management.remote.protocol.terracotta; 6 7 import com.tc.async.api.AbstractEventHandler; 8 import com.tc.async.api.EventContext; 9 import com.tc.async.api.EventHandlerException; 10 import com.tc.async.api.Sink; 11 import com.tc.logging.TCLogger; 12 import com.tc.logging.TCLogging; 13 import com.tc.net.protocol.tcm.ChannelID; 14 import com.tc.net.protocol.tcm.MessageChannel; 15 import com.tc.net.protocol.tcm.TCMessageType; 16 import com.tc.object.net.DSOChannelManagerEventListener; 17 18 import java.util.HashMap ; 19 import java.util.Map ; 20 21 import javax.management.MBeanServer ; 22 import javax.management.MBeanServerFactory ; 23 import javax.management.remote.message.Message; 24 25 public class ClientTunnelingEventHandler extends AbstractEventHandler implements DSOChannelManagerEventListener { 26 27 public static final class L1ConnectionMessage implements EventContext { 28 29 private final MBeanServer mbs; 30 private final MessageChannel channel; 31 private final Map channelIdToJmxConnector; 32 private final Map channelIdToMsgConnection; 33 private final boolean isConnectingMsg; 34 35 public L1ConnectionMessage(MBeanServer mbs, MessageChannel channel, Map channelIdToJmxConnector, 36 Map channelIdToMsgConnection, boolean isConnectingMsg) { 37 this.mbs = mbs; 38 this.channel = channel; 39 this.channelIdToJmxConnector = channelIdToJmxConnector; 40 this.channelIdToMsgConnection = channelIdToMsgConnection; 41 this.isConnectingMsg = isConnectingMsg; 42 43 if (isConnectingMsg && mbs == null) { 44 final AssertionError ae = new AssertionError ("Attempting to create a L1-connecting-message without" 45 + " a valid mBeanServer."); 46 throw ae; 47 } 48 } 49 50 public MBeanServer getMBeanServer() { 51 return mbs; 52 } 53 54 public MessageChannel getChannel() { 55 return channel; 56 } 57 58 public Map getChannelIdToJmxConnector() { 59 return channelIdToJmxConnector; 60 } 61 62 public Map getChannelIdToMsgConnector() { 63 return channelIdToMsgConnection; 64 } 65 66 public boolean isConnectingMsg() { 67 return isConnectingMsg; 68 } 69 } 70 71 private static final TCLogger logger = TCLogging.getLogger(ClientTunnelingEventHandler.class); 72 73 private final Map channelIdToJmxConnector; 74 private final Map channelIdToMsgConnection; 75 private final MBeanServer l2MBeanServer; 76 private final Object sinkLock; 77 private Sink connectStageSink; 78 79 public ClientTunnelingEventHandler() { 80 l2MBeanServer = (MBeanServer ) MBeanServerFactory.findMBeanServer(null).get(0); 81 channelIdToJmxConnector = new HashMap (); 82 channelIdToMsgConnection = new HashMap (); 83 sinkLock = new Object (); 84 } 85 86 public void handleEvent(final EventContext context) throws EventHandlerException { 87 if (context instanceof L1JmxReady) { 88 final L1JmxReady readyMessage = (L1JmxReady) context; 89 connectToL1JmxServer(readyMessage.getChannel()); 90 } else { 91 final JmxRemoteTunnelMessage messageEnvelope = (JmxRemoteTunnelMessage) context; 92 if (messageEnvelope.getCloseConnection()) { 93 channelRemoved(messageEnvelope.getChannel()); 94 } else if (messageEnvelope.getInitConnection()) { 95 logger.warn("Received a JMX tunneled connection init from the remote" 96 + " JMX server, only the JMX client should do this"); 97 } else { 98 routeTunneledMessage(messageEnvelope); 99 } 100 } 101 } 102 103 private void connectToL1JmxServer(final MessageChannel channel) { 104 logger.info("L1[" + channel.getChannelID() + "] notified us that their JMX server is now available"); 105 EventContext msg = new L1ConnectionMessage(l2MBeanServer, channel, channelIdToJmxConnector, 106 channelIdToMsgConnection, true); 107 synchronized (sinkLock) { 108 if (connectStageSink == null) { throw new AssertionError ("ConnectStageSink was not set."); } 109 connectStageSink.add(msg); 110 } 111 } 112 113 private void routeTunneledMessage(final JmxRemoteTunnelMessage messageEnvelope) { 114 final Message message = messageEnvelope.getTunneledMessage(); 115 final MessageChannel channel = messageEnvelope.getChannel(); 116 final ChannelID channelID = channel.getChannelID(); 117 final TunnelingMessageConnection tmc; 118 synchronized (channelIdToMsgConnection) { 119 tmc = (TunnelingMessageConnection) channelIdToMsgConnection.get(channelID); 120 } 121 if (tmc != null) { 122 tmc.incomingNetworkMessage(message); 123 } else { 124 logger.warn("Received tunneled JMX message with no associated message connection," 125 + " sending close() to remote JMX server"); 126 final JmxRemoteTunnelMessage closeMessage = (JmxRemoteTunnelMessage) channel 127 .createMessage(TCMessageType.JMXREMOTE_MESSAGE_CONNECTION_MESSAGE); 128 closeMessage.setCloseConnection(); 129 closeMessage.send(); 130 } 131 } 132 133 public void channelCreated(final MessageChannel channel) { 134 } 137 138 public void channelRemoved(final MessageChannel channel) { 139 EventContext msg = new L1ConnectionMessage(null, channel, channelIdToJmxConnector, channelIdToMsgConnection, false); 140 synchronized (sinkLock) { 141 if (connectStageSink == null) { throw new AssertionError ("ConnectStageSink was not set."); } 142 connectStageSink.add(msg); 143 } 144 } 145 146 public void setConnectStageSink(Sink sink) { 147 synchronized (sinkLock) { 148 if (connectStageSink != null) { 149 logger.warn("Attempted to set ConnectStageSink more than once."); 150 return; 151 } 152 connectStageSink = sink; 153 } 154 } 155 } 156 | Popular Tags |